#!/usr/bin/env python3 import os import re import pandas as pd import geoip2.database from fastapi import FastAPI, Query from fastapi.middleware.cors import CORSMiddleware from typing import Optional, List from datetime import datetime from compressor import * #logging import logging logger = logging.getLogger('uvicorn.error') # ---------------------------- # Configuration # ---------------------------- LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") LOG_PREFIX = "filtered_" # matches cron-generated files GEO_DB_PATH = "GeoLite2-City.mmdb" FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.bin") # ---------------------------- # FastAPI Setup # ---------------------------- app = FastAPI(title="Reverse Proxy Connections Map API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) # ---------------------------- # GeoIP Setup # ---------------------------- reader = geoip2.database.Reader(GEO_DB_PATH) geo_cache = {} # cache IP lookups to save CPU def ip_to_geo(ip): if ip in geo_cache: return geo_cache[ip] try: response = reader.city(ip) latlon = (response.location.latitude, response.location.longitude) except Exception: latlon = (None, None) geo_cache[ip] = latlon return latlon # ---------------------------- # Helper: Parse timestamp from line # ---------------------------- def line_timestamp(line: str): try: ts_str = line.split(" ", 1)[0] return pd.to_datetime(ts_str) except Exception: return None # ---------------------------- # Binary search on lines # ---------------------------- def find_line_index(lines, target_time, seek_start=True): lo, hi = 0, len(lines) - 1 best_idx = None while lo <= hi: mid = (lo + hi) // 2 ts = line_timestamp(lines[mid]) if ts is None: if seek_start: lo = mid + 1 else: hi = mid - 1 continue if seek_start: if ts >= target_time: best_idx = mid hi = mid - 1 else: lo = mid + 1 else: if ts <= target_time: best_idx = mid lo = mid + 1 else: hi = mid - 1 if best_idx is None: return len(lines) - 1 if not seek_start else 0 return best_idx # ---------------------------- # List log files and parse dates # ---------------------------- def list_log_files() -> List[tuple[str, datetime]]: files = [] for f in os.listdir(LOG_DIR): if f.startswith(LOG_PREFIX) and f.endswith(".bin"): match = FILENAME_RE.match(f) if not match: continue date_str = match.group(1) try: date = datetime.strptime(date_str, "%Y-%m-%d") files.append((os.path.join(LOG_DIR, f), date)) except Exception: continue # sort by date and index return sorted(files, key=lambda x: (x[1], x[0])) def read_compressed_log(log_file): """Helper to read and decompress a log file.""" compressor = load_or_create_compressor() entries = [] print(log_file) with open(log_file, "rb") as f: while chunk := f.read(13): if len(chunk) < 13: # Incomplete entry at end of file print(f"Warning: Incomplete entry at end of {log_file} ({len(chunk)} bytes), skipping") break try: iso_time, ip, method, path = compressor.decompress_entry(chunk) entries.append(f"{iso_time} {ip} {method} {path}") except Exception as e: print(f"Warning: Failed to decompress entry: {e}") continue return entries # ---------------------------- # Load logs efficiently using filename dates # ---------------------------- def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]): start_dt = pd.to_datetime(start) if start else None end_dt = pd.to_datetime(end) if end else None records = [] files = list_log_files() logger.error(files) if not files: return [] for file_path, file_date in files: # Skip file if outside range based on filename date if start_dt and file_date.date() < start_dt.date(): continue if end_dt and file_date.date() > end_dt.date(): continue #with open(file_path, "r", errors="ignore") as f: #lines = f.readlines() lines = read_compressed_log(file_path) if not lines: continue start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0 end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1 for line in lines[start_idx:end_idx + 1]: try: parts = line.strip().split(" ", 3) if len(parts) != 4: continue timestamp, ip, method, path = parts ts = pd.to_datetime(timestamp) if start_dt and ts < start_dt: continue if end_dt and ts > end_dt: break if service and service not in path: continue lat, lon = ip_to_geo(ip) if lat is None or lon is None: continue records.append({ "timestamp": ts.isoformat(), "path": path, "lat": lat, "lon": lon }) except Exception: continue return records # ---------------------------- # API Endpoints # ---------------------------- @app.get("/connections") def get_connections( service: Optional[str] = Query(None, description="Filter by service path"), start: Optional[str] = Query(None, description="Start datetime (ISO format)"), end: Optional[str] = Query(None, description="End datetime (ISO format)") ): logger.error("Endpoint hit!") return load_logs_binary(service, start, end) @app.get("/health") def health(): files = list_log_files() total_size = sum(os.path.getsize(f[0]) for f in files) return { "status": "ok", "log_files": len(files), "total_log_size_bytes": total_size, "cached_ips": len(geo_cache) } # ---------------------------- # Run with Uvicorn # ---------------------------- if __name__ == "__main__": import uvicorn uvicorn.run("backend2:app", host="0.0.0.0", port=8000, reload=True)