76 lines
2.8 KiB
Python
76 lines
2.8 KiB
Python
import struct
|
|
from datetime import datetime, timezone
|
|
import pickle
|
|
import os
|
|
|
|
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
|
|
# Global compressor instance (needs to persist across runs)
|
|
COMPRESSOR_FILE = os.path.join(LOG_DIR, "compressor_state.pkl")
|
|
|
|
def load_or_create_compressor():
|
|
"""Load existing compressor state or create new one."""
|
|
if os.path.exists(COMPRESSOR_FILE):
|
|
with open(COMPRESSOR_FILE, "rb") as f:
|
|
return pickle.load(f)
|
|
return LogCompressor()
|
|
|
|
def save_compressor(compressor):
|
|
"""Save compressor state to preserve path dictionary."""
|
|
with open(COMPRESSOR_FILE, "wb") as f:
|
|
pickle.dump(compressor, f)
|
|
|
|
class LogCompressor:
|
|
def __init__(self):
|
|
self.path_to_id = {}
|
|
self.id_to_path = {}
|
|
self.next_path_id = 0
|
|
self.method_map = {"GET": 0, "POST": 1, "PUT": 2, "DELETE": 3, "HEAD": 4, "PATCH": 5, "OPTIONS": 6}
|
|
|
|
def get_path_id(self, path):
|
|
"""Get or create ID for a path."""
|
|
if path not in self.path_to_id:
|
|
self.path_to_id[path] = self.next_path_id
|
|
self.id_to_path[self.next_path_id] = path
|
|
self.next_path_id += 1
|
|
return self.path_to_id[path]
|
|
|
|
def compress_entry(self, iso_time, ip, method, path):
|
|
"""
|
|
Compress a log entry to bytes.
|
|
Format: 4 bytes (timestamp) + 4 bytes (IP) + 1 byte (method) + 4 bytes (path_id) = 13 bytes
|
|
"""
|
|
# Timestamp: Unix timestamp as 4-byte unsigned int (good until 2106)
|
|
dt = datetime.strptime(iso_time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
|
|
timestamp = int(dt.timestamp())
|
|
|
|
# IP: Convert to 4 bytes
|
|
ip_parts = [int(part) for part in ip.split('.')]
|
|
ip_int = (ip_parts[0] << 24) + (ip_parts[1] << 16) + (ip_parts[2] << 8) + ip_parts[3]
|
|
|
|
# Method: 1 byte
|
|
method_id = self.method_map.get(method, 255) # 255 for unknown
|
|
|
|
# Path: Get ID (4 bytes for path index)
|
|
path_id = self.get_path_id(path)
|
|
|
|
# Pack into bytes: I=unsigned int (4 bytes), B=unsigned char (1 byte)
|
|
return struct.pack('<IIBI', timestamp, ip_int, method_id, path_id)
|
|
|
|
def decompress_entry(self, data):
|
|
"""Decompress bytes back to log entry."""
|
|
timestamp, ip_int, method_id, path_id = struct.unpack('<IIBI', data)
|
|
|
|
# Timestamp
|
|
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
|
|
iso_time = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
# IP
|
|
ip = f"{(ip_int >> 24) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 8) & 0xFF}.{ip_int & 0xFF}"
|
|
|
|
# Method
|
|
method = {v: k for k, v in self.method_map.items()}.get(method_id, "UNKNOWN")
|
|
|
|
# Path
|
|
path = self.id_to_path.get(path_id, "UNKNOWN")
|
|
|
|
return iso_time, ip, method, path
|