geolog/tail_cron.py
2025-10-26 11:31:11 +01:00

111 lines
3.3 KiB
Python

#!/usr/bin/env python3
import re
import ipaddress
from datetime import datetime, timezone
import os
from local import * # Make sure this defines ACCESS_LOG, LOG_DIR, etc.
from compressor import *
import pickle
# ==== CONFIGURATION ====
MAX_LOG_LINES = 50000 # adjust as needed
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(LOG_DIR, exist_ok=True)
COMPRESSOR_FILE = os.path.join(LOG_DIR, "compressor_state.pkl")
INTERNAL_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("172.16.0.0/12"),
]
log_line_re = re.compile(
r'(?P<ip>\S+) - (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+"'
)
def is_external(ip):
ip_addr = ipaddress.ip_address(ip)
return not any(ip_addr in net for net in INTERNAL_NETWORKS)
def parse_and_compress_nginx_line(line):
match = log_line_re.match(line)
if not match:
return None
data = match.groupdict()
if not is_external(data["ip"]):
return None
dt = datetime.strptime(data["time"], "%d/%b/%Y:%H:%M:%S %z")
dt_utc = dt.astimezone(timezone.utc)
iso_time = dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# Return compressed bytes instead of string
return compressor.compress_entry(iso_time, data["ip"], data["method"], data["path"])
def get_current_logfile():
"""Find or create the latest log file with entry limit."""
today = datetime.now().strftime("%Y-%m-%d")
base_name = os.path.join(LOG_DIR, f"filtered_{today}")
# Find the highest index by looping until file doesn't exist
index = 1
while os.path.exists(f"{base_name}_{index}.bin"):
index += 1
# index is now one past the last existing file
# Check if the last existing file (index - 1) has room
if index > 1:
last_file = f"{base_name}_{index - 1}.bin"
# Count entries (13 bytes each)
file_size = os.path.getsize(last_file)
entry_count = file_size // 13
if entry_count < MAX_LOG_LINES:
return last_file
# Either no files exist or last file is full, create new one
return f"{base_name}_{index}.bin"
def process_log():
"""Process and compress nginx logs."""
compressor = load_or_create_compressor()
output_file = get_current_logfile()
buffer = []
with open(ACCESS_LOG, "r") as f:
for line in f:
match = log_line_re.match(line)
if not match:
continue
data = match.groupdict()
if not is_external(data["ip"]):
continue
# Parse and convert to UTC
dt = datetime.strptime(data["time"], "%d/%b/%Y:%H:%M:%S %z")
dt_utc = dt.astimezone(timezone.utc)
iso_time = dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# Compress entry
compressed = compressor.compress_entry(
iso_time, data["ip"], data["method"], data["path"]
)
buffer.append(compressed)
# Write compressed binary data
if buffer:
with open(output_file, "ab") as out: # 'ab' for append binary
for entry in buffer:
out.write(entry)
# Save compressor state (path dictionary)
save_compressor(compressor)
def main():
process_log()
if __name__ == "__main__":
main()