geolog/compressor.py
2025-10-26 11:31:11 +01:00

76 lines
2.8 KiB
Python

import struct
from datetime import datetime, timezone
import pickle
import os
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
# Global compressor instance (needs to persist across runs)
COMPRESSOR_FILE = os.path.join(LOG_DIR, "compressor_state.pkl")
def load_or_create_compressor():
"""Load existing compressor state or create new one."""
if os.path.exists(COMPRESSOR_FILE):
with open(COMPRESSOR_FILE, "rb") as f:
return pickle.load(f)
return LogCompressor()
def save_compressor(compressor):
"""Save compressor state to preserve path dictionary."""
with open(COMPRESSOR_FILE, "wb") as f:
pickle.dump(compressor, f)
class LogCompressor:
def __init__(self):
self.path_to_id = {}
self.id_to_path = {}
self.next_path_id = 0
self.method_map = {"GET": 0, "POST": 1, "PUT": 2, "DELETE": 3, "HEAD": 4, "PATCH": 5, "OPTIONS": 6}
def get_path_id(self, path):
"""Get or create ID for a path."""
if path not in self.path_to_id:
self.path_to_id[path] = self.next_path_id
self.id_to_path[self.next_path_id] = path
self.next_path_id += 1
return self.path_to_id[path]
def compress_entry(self, iso_time, ip, method, path):
"""
Compress a log entry to bytes.
Format: 4 bytes (timestamp) + 4 bytes (IP) + 1 byte (method) + 4 bytes (path_id) = 13 bytes
"""
# Timestamp: Unix timestamp as 4-byte unsigned int (good until 2106)
dt = datetime.strptime(iso_time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
timestamp = int(dt.timestamp())
# IP: Convert to 4 bytes
ip_parts = [int(part) for part in ip.split('.')]
ip_int = (ip_parts[0] << 24) + (ip_parts[1] << 16) + (ip_parts[2] << 8) + ip_parts[3]
# Method: 1 byte
method_id = self.method_map.get(method, 255) # 255 for unknown
# Path: Get ID (4 bytes for path index)
path_id = self.get_path_id(path)
# Pack into bytes: I=unsigned int (4 bytes), B=unsigned char (1 byte)
return struct.pack('<IIBI', timestamp, ip_int, method_id, path_id)
def decompress_entry(self, data):
"""Decompress bytes back to log entry."""
timestamp, ip_int, method_id, path_id = struct.unpack('<IIBI', data)
# Timestamp
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
iso_time = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# IP
ip = f"{(ip_int >> 24) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 8) & 0xFF}.{ip_int & 0xFF}"
# Method
method = {v: k for k, v in self.method_map.items()}.get(method_id, "UNKNOWN")
# Path
path = self.id_to_path.get(path_id, "UNKNOWN")
return iso_time, ip, method, path