geolog/backend.py
2025-08-30 23:49:30 +02:00

152 lines
4.4 KiB
Python

import os
import pandas as pd
import geoip2.database
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
# ----------------------------
# FastAPI Setup
# ----------------------------
app = FastAPI(title="Reverse Proxy Connections Map API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------------------
# GeoIP Setup
# ----------------------------
reader = geoip2.database.Reader("GeoLite2-City.mmdb")
geo_cache = {} # cache IP lookups to save CPU
def ip_to_geo(ip):
if ip in geo_cache:
return geo_cache[ip]
try:
response = reader.city(ip)
latlon = (response.location.latitude, response.location.longitude)
except Exception:
latlon = (None, None)
geo_cache[ip] = latlon
return latlon
# ----------------------------
# Helper: Parse timestamp from line
# ----------------------------
def line_timestamp(line: str):
try:
ts_str = line.split(" ", 1)[0]
return pd.to_datetime(ts_str)
except Exception:
return None
# ----------------------------
# Binary search on lines
# ----------------------------
def find_line_index(lines, target_time, seek_start=True):
lo, hi = 0, len(lines) - 1
best_idx = None
while lo <= hi:
mid = (lo + hi) // 2
ts = line_timestamp(lines[mid])
if ts is None:
# skip malformed line: move lo forward for start, hi backward for end
if seek_start:
lo = mid + 1
else:
hi = mid - 1
continue
if seek_start:
if ts >= target_time:
best_idx = mid
hi = mid - 1 # search earlier
else:
lo = mid + 1 # search later
else:
if ts <= target_time:
best_idx = mid
lo = mid + 1 # search later
else:
hi = mid - 1 # search earlier
# For end search, make sure we return the **last index ≤ target**
if best_idx is None:
return len(lines) - 1 if not seek_start else 0
return best_idx
# ----------------------------
# Load logs using binary search on lines
# ----------------------------
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
start_dt = pd.to_datetime(start) if start else None
end_dt = pd.to_datetime(end) if end else None
records = []
with open("access.log", "r", errors="ignore") as f:
lines = f.readlines()
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
for line in lines[start_idx:end_idx+1]:
try:
parts = line.strip().split(" ", 3)
if len(parts) != 4:
continue
timestamp, ip, method, path = parts
ts = pd.to_datetime(timestamp)
if start_dt and ts < start_dt:
continue
if end_dt and ts > end_dt:
break
if service and service not in path:
continue
lat, lon = ip_to_geo(ip)
if lat is None or lon is None:
continue
records.append({
"timestamp": ts.isoformat(),
"ip": ip,
"path": path,
"lat": lat,
"lon": lon
})
except Exception:
continue
return records
# ----------------------------
# API Endpoint
# ----------------------------
@app.get("/connections")
def get_connections(
service: Optional[str] = Query(None, description="Filter by service path"),
start: Optional[str] = Query(None, description="Start datetime in ISO format"),
end: Optional[str] = Query(None, description="End datetime in ISO format")
):
return load_logs_binary(service, start, end)
# ----------------------------
# Healthcheck
# ----------------------------
@app.get("/health")
def health():
size = os.path.getsize("access.log")
return {"status": "ok", "log_size_bytes": size, "cached_ips": len(geo_cache)}
# ----------------------------
# Run with Uvicorn
# ----------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("backend:app", host="0.0.0.0", port=8000, reload=True)