#!/usr/bin/env python3 import os import re import pandas as pd import geoip2.database from fastapi import FastAPI, Query from fastapi.middleware.cors import CORSMiddleware from typing import Optional, List from datetime import datetime #logging import logging # ---------------------------- # Logging Setup # ---------------------------- LOG_FILE = os.path.join(os.path.dirname(__file__), "backend.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() # also print to console ] ) logger = logging.getLogger(__name__) # ---------------------------- # Configuration # ---------------------------- LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") LOG_PREFIX = "filtered_" # matches cron-generated files GEO_DB_PATH = "GeoLite2-City.mmdb" FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.log") # ---------------------------- # FastAPI Setup # ---------------------------- app = FastAPI(title="Reverse Proxy Connections Map API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) # ---------------------------- # GeoIP Setup # ---------------------------- reader = geoip2.database.Reader(GEO_DB_PATH) geo_cache = {} # cache IP lookups to save CPU def ip_to_geo(ip): if ip in geo_cache: return geo_cache[ip] try: response = reader.city(ip) latlon = (response.location.latitude, response.location.longitude) except Exception: latlon = (None, None) geo_cache[ip] = latlon return latlon # ---------------------------- # Helper: Parse timestamp from line # ---------------------------- def line_timestamp(line: str): try: ts_str = line.split(" ", 1)[0] return pd.to_datetime(ts_str) except Exception: return None # ---------------------------- # Binary search on lines # ---------------------------- def find_line_index(lines, target_time, seek_start=True): lo, hi = 0, len(lines) - 1 best_idx = None while lo <= hi: mid = (lo + hi) // 2 ts = line_timestamp(lines[mid]) if ts is None: if seek_start: lo = mid + 1 else: hi = mid - 1 continue if seek_start: if ts >= target_time: best_idx = mid hi = mid - 1 else: lo = mid + 1 else: if ts <= target_time: best_idx = mid lo = mid + 1 else: hi = mid - 1 if best_idx is None: return len(lines) - 1 if not seek_start else 0 return best_idx # ---------------------------- # List log files and parse dates # ---------------------------- def list_log_files() -> List[tuple[str, datetime]]: files = [] for f in os.listdir(LOG_DIR): if f.startswith(LOG_PREFIX) and f.endswith(".log"): match = FILENAME_RE.match(f) if not match: continue date_str = match.group(1) try: date = datetime.strptime(date_str, "%Y-%m-%d") files.append((os.path.join(LOG_DIR, f), date)) except Exception: continue # sort by date and index return sorted(files, key=lambda x: (x[1], x[0])) # ---------------------------- # Load logs efficiently using filename dates # ---------------------------- def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]): start_dt = pd.to_datetime(start) if start else None end_dt = pd.to_datetime(end) if end else None records = [] files = list_log_files() if not files: return [] for file_path, file_date in files: # Skip file if outside range based on filename date if start_dt and file_date.date() < start_dt.date(): continue if end_dt and file_date.date() > end_dt.date(): continue with open(file_path, "r", errors="ignore") as f: lines = f.readlines() if not lines: continue start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0 end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1 for line in lines[start_idx:end_idx + 1]: try: parts = line.strip().split(" ", 3) if len(parts) != 4: continue timestamp, ip, method, path = parts ts = pd.to_datetime(timestamp) if start_dt and ts < start_dt: continue if end_dt and ts > end_dt: break if service and service not in path: continue lat, lon = ip_to_geo(ip) if lat is None or lon is None: continue records.append({ "timestamp": ts.isoformat(), "path": path, "lat": lat, "lon": lon }) except Exception: continue return records # ---------------------------- # API Endpoints # ---------------------------- @app.get("/connections") def get_connections( service: Optional[str] = Query(None, description="Filter by service path"), start: Optional[str] = Query(None, description="Start datetime (ISO format)"), end: Optional[str] = Query(None, description="End datetime (ISO format)") ): print("Endpoint hit!", flush=True) return load_logs_binary(service, start, end) @app.get("/health") def health(): files = list_log_files() total_size = sum(os.path.getsize(f[0]) for f in files) return { "status": "ok", "log_files": len(files), "total_log_size_bytes": total_size, "cached_ips": len(geo_cache) } # ---------------------------- # Run with Uvicorn # ---------------------------- if __name__ == "__main__": import uvicorn uvicorn.run("backend:app", host="0.0.0.0", port=8000, reload=True)