geolog/backend2.py
2025-10-26 11:31:11 +01:00

226 lines
6.6 KiB
Python

#!/usr/bin/env python3
import os
import re
import pandas as pd
import geoip2.database
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional, List
from datetime import datetime
from compressor import *
#logging
import logging
logger = logging.getLogger('uvicorn.error')
# ----------------------------
# Configuration
# ----------------------------
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
LOG_PREFIX = "filtered_" # matches cron-generated files
GEO_DB_PATH = "GeoLite2-City.mmdb"
FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.bin")
# ----------------------------
# FastAPI Setup
# ----------------------------
app = FastAPI(title="Reverse Proxy Connections Map API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------------------
# GeoIP Setup
# ----------------------------
reader = geoip2.database.Reader(GEO_DB_PATH)
geo_cache = {} # cache IP lookups to save CPU
def ip_to_geo(ip):
if ip in geo_cache:
return geo_cache[ip]
try:
response = reader.city(ip)
latlon = (response.location.latitude, response.location.longitude)
except Exception:
latlon = (None, None)
geo_cache[ip] = latlon
return latlon
# ----------------------------
# Helper: Parse timestamp from line
# ----------------------------
def line_timestamp(line: str):
try:
ts_str = line.split(" ", 1)[0]
return pd.to_datetime(ts_str)
except Exception:
return None
# ----------------------------
# Binary search on lines
# ----------------------------
def find_line_index(lines, target_time, seek_start=True):
lo, hi = 0, len(lines) - 1
best_idx = None
while lo <= hi:
mid = (lo + hi) // 2
ts = line_timestamp(lines[mid])
if ts is None:
if seek_start:
lo = mid + 1
else:
hi = mid - 1
continue
if seek_start:
if ts >= target_time:
best_idx = mid
hi = mid - 1
else:
lo = mid + 1
else:
if ts <= target_time:
best_idx = mid
lo = mid + 1
else:
hi = mid - 1
if best_idx is None:
return len(lines) - 1 if not seek_start else 0
return best_idx
# ----------------------------
# List log files and parse dates
# ----------------------------
def list_log_files() -> List[tuple[str, datetime]]:
files = []
for f in os.listdir(LOG_DIR):
if f.startswith(LOG_PREFIX) and f.endswith(".bin"):
match = FILENAME_RE.match(f)
if not match:
continue
date_str = match.group(1)
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
files.append((os.path.join(LOG_DIR, f), date))
except Exception:
continue
# sort by date and index
return sorted(files, key=lambda x: (x[1], x[0]))
def read_compressed_log(log_file):
"""Helper to read and decompress a log file."""
compressor = load_or_create_compressor()
entries = []
print(log_file)
with open(log_file, "rb") as f:
while chunk := f.read(13):
if len(chunk) < 13: # Incomplete entry at end of file
print(f"Warning: Incomplete entry at end of {log_file} ({len(chunk)} bytes), skipping")
break
try:
iso_time, ip, method, path = compressor.decompress_entry(chunk)
entries.append(f"{iso_time} {ip} {method} {path}")
except Exception as e:
print(f"Warning: Failed to decompress entry: {e}")
continue
return entries
# ----------------------------
# Load logs efficiently using filename dates
# ----------------------------
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
start_dt = pd.to_datetime(start) if start else None
end_dt = pd.to_datetime(end) if end else None
records = []
files = list_log_files()
logger.error(files)
if not files:
return []
for file_path, file_date in files:
# Skip file if outside range based on filename date
if start_dt and file_date.date() < start_dt.date():
continue
if end_dt and file_date.date() > end_dt.date():
continue
#with open(file_path, "r", errors="ignore") as f:
#lines = f.readlines()
lines = read_compressed_log(file_path)
if not lines:
continue
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
for line in lines[start_idx:end_idx + 1]:
try:
parts = line.strip().split(" ", 3)
if len(parts) != 4:
continue
timestamp, ip, method, path = parts
ts = pd.to_datetime(timestamp)
if start_dt and ts < start_dt:
continue
if end_dt and ts > end_dt:
break
if service and service not in path:
continue
lat, lon = ip_to_geo(ip)
if lat is None or lon is None:
continue
records.append({
"timestamp": ts.isoformat(),
"path": path,
"lat": lat,
"lon": lon
})
except Exception:
continue
return records
# ----------------------------
# API Endpoints
# ----------------------------
@app.get("/connections")
def get_connections(
service: Optional[str] = Query(None, description="Filter by service path"),
start: Optional[str] = Query(None, description="Start datetime (ISO format)"),
end: Optional[str] = Query(None, description="End datetime (ISO format)")
):
logger.error("Endpoint hit!")
return load_logs_binary(service, start, end)
@app.get("/health")
def health():
files = list_log_files()
total_size = sum(os.path.getsize(f[0]) for f in files)
return {
"status": "ok",
"log_files": len(files),
"total_log_size_bytes": total_size,
"cached_ips": len(geo_cache)
}
# ----------------------------
# Run with Uvicorn
# ----------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("backend2:app", host="0.0.0.0", port=8000, reload=True)