226 lines
6.6 KiB
Python
226 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import pandas as pd
|
|
import geoip2.database
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
from compressor import *
|
|
|
|
|
|
#logging
|
|
import logging
|
|
logger = logging.getLogger('uvicorn.error')
|
|
# ----------------------------
|
|
# Configuration
|
|
# ----------------------------
|
|
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
|
|
LOG_PREFIX = "filtered_" # matches cron-generated files
|
|
GEO_DB_PATH = "GeoLite2-City.mmdb"
|
|
FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.bin")
|
|
|
|
# ----------------------------
|
|
# FastAPI Setup
|
|
# ----------------------------
|
|
app = FastAPI(title="Reverse Proxy Connections Map API")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"]
|
|
)
|
|
|
|
# ----------------------------
|
|
# GeoIP Setup
|
|
# ----------------------------
|
|
reader = geoip2.database.Reader(GEO_DB_PATH)
|
|
geo_cache = {} # cache IP lookups to save CPU
|
|
|
|
def ip_to_geo(ip):
|
|
if ip in geo_cache:
|
|
return geo_cache[ip]
|
|
try:
|
|
response = reader.city(ip)
|
|
latlon = (response.location.latitude, response.location.longitude)
|
|
except Exception:
|
|
latlon = (None, None)
|
|
geo_cache[ip] = latlon
|
|
return latlon
|
|
|
|
|
|
# ----------------------------
|
|
# Helper: Parse timestamp from line
|
|
# ----------------------------
|
|
def line_timestamp(line: str):
|
|
try:
|
|
ts_str = line.split(" ", 1)[0]
|
|
return pd.to_datetime(ts_str)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ----------------------------
|
|
# Binary search on lines
|
|
# ----------------------------
|
|
def find_line_index(lines, target_time, seek_start=True):
|
|
lo, hi = 0, len(lines) - 1
|
|
best_idx = None
|
|
|
|
while lo <= hi:
|
|
mid = (lo + hi) // 2
|
|
ts = line_timestamp(lines[mid])
|
|
if ts is None:
|
|
if seek_start:
|
|
lo = mid + 1
|
|
else:
|
|
hi = mid - 1
|
|
continue
|
|
|
|
if seek_start:
|
|
if ts >= target_time:
|
|
best_idx = mid
|
|
hi = mid - 1
|
|
else:
|
|
lo = mid + 1
|
|
else:
|
|
if ts <= target_time:
|
|
best_idx = mid
|
|
lo = mid + 1
|
|
else:
|
|
hi = mid - 1
|
|
|
|
if best_idx is None:
|
|
return len(lines) - 1 if not seek_start else 0
|
|
return best_idx
|
|
|
|
|
|
# ----------------------------
|
|
# List log files and parse dates
|
|
# ----------------------------
|
|
def list_log_files() -> List[tuple[str, datetime]]:
|
|
files = []
|
|
for f in os.listdir(LOG_DIR):
|
|
if f.startswith(LOG_PREFIX) and f.endswith(".bin"):
|
|
match = FILENAME_RE.match(f)
|
|
if not match:
|
|
continue
|
|
date_str = match.group(1)
|
|
try:
|
|
date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
files.append((os.path.join(LOG_DIR, f), date))
|
|
except Exception:
|
|
continue
|
|
# sort by date and index
|
|
return sorted(files, key=lambda x: (x[1], x[0]))
|
|
|
|
def read_compressed_log(log_file):
|
|
"""Helper to read and decompress a log file."""
|
|
compressor = load_or_create_compressor()
|
|
entries = []
|
|
print(log_file)
|
|
|
|
with open(log_file, "rb") as f:
|
|
while chunk := f.read(13):
|
|
if len(chunk) < 13: # Incomplete entry at end of file
|
|
print(f"Warning: Incomplete entry at end of {log_file} ({len(chunk)} bytes), skipping")
|
|
break
|
|
try:
|
|
iso_time, ip, method, path = compressor.decompress_entry(chunk)
|
|
entries.append(f"{iso_time} {ip} {method} {path}")
|
|
except Exception as e:
|
|
print(f"Warning: Failed to decompress entry: {e}")
|
|
continue
|
|
|
|
return entries
|
|
# ----------------------------
|
|
# Load logs efficiently using filename dates
|
|
# ----------------------------
|
|
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
|
|
start_dt = pd.to_datetime(start) if start else None
|
|
end_dt = pd.to_datetime(end) if end else None
|
|
records = []
|
|
|
|
files = list_log_files()
|
|
logger.error(files)
|
|
if not files:
|
|
return []
|
|
|
|
for file_path, file_date in files:
|
|
# Skip file if outside range based on filename date
|
|
if start_dt and file_date.date() < start_dt.date():
|
|
continue
|
|
if end_dt and file_date.date() > end_dt.date():
|
|
continue
|
|
|
|
#with open(file_path, "r", errors="ignore") as f:
|
|
#lines = f.readlines()
|
|
lines = read_compressed_log(file_path)
|
|
|
|
if not lines:
|
|
continue
|
|
|
|
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
|
|
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
|
|
|
|
for line in lines[start_idx:end_idx + 1]:
|
|
try:
|
|
parts = line.strip().split(" ", 3)
|
|
if len(parts) != 4:
|
|
continue
|
|
timestamp, ip, method, path = parts
|
|
ts = pd.to_datetime(timestamp)
|
|
if start_dt and ts < start_dt:
|
|
continue
|
|
if end_dt and ts > end_dt:
|
|
break
|
|
if service and service not in path:
|
|
continue
|
|
lat, lon = ip_to_geo(ip)
|
|
if lat is None or lon is None:
|
|
continue
|
|
records.append({
|
|
"timestamp": ts.isoformat(),
|
|
"path": path,
|
|
"lat": lat,
|
|
"lon": lon
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
return records
|
|
|
|
|
|
# ----------------------------
|
|
# API Endpoints
|
|
# ----------------------------
|
|
@app.get("/connections")
|
|
def get_connections(
|
|
service: Optional[str] = Query(None, description="Filter by service path"),
|
|
start: Optional[str] = Query(None, description="Start datetime (ISO format)"),
|
|
end: Optional[str] = Query(None, description="End datetime (ISO format)")
|
|
):
|
|
logger.error("Endpoint hit!")
|
|
return load_logs_binary(service, start, end)
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
files = list_log_files()
|
|
total_size = sum(os.path.getsize(f[0]) for f in files)
|
|
return {
|
|
"status": "ok",
|
|
"log_files": len(files),
|
|
"total_log_size_bytes": total_size,
|
|
"cached_ips": len(geo_cache)
|
|
}
|
|
|
|
|
|
# ----------------------------
|
|
# Run with Uvicorn
|
|
# ----------------------------
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("backend2:app", host="0.0.0.0", port=8000, reload=True)
|