completed
This commit is contained in:
parent
26985812a4
commit
5bbe935ba2
4 changed files with 0 additions and 436 deletions
219
backend-new.py
219
backend-new.py
|
|
@ -1,219 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import pandas as pd
|
|
||||||
import geoip2.database
|
|
||||||
from fastapi import FastAPI, Query
|
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
|
||||||
from typing import Optional, List
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
#logging
|
|
||||||
import logging
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Logging Setup
|
|
||||||
# ----------------------------
|
|
||||||
LOG_FILE = os.path.join(os.path.dirname(__file__), "backend.log")
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
||||||
handlers=[
|
|
||||||
logging.FileHandler(LOG_FILE),
|
|
||||||
logging.StreamHandler() # also print to console
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
# ----------------------------
|
|
||||||
# Configuration
|
|
||||||
# ----------------------------
|
|
||||||
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
|
|
||||||
LOG_PREFIX = "filtered_" # matches cron-generated files
|
|
||||||
GEO_DB_PATH = "GeoLite2-City.mmdb"
|
|
||||||
FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.log")
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# FastAPI Setup
|
|
||||||
# ----------------------------
|
|
||||||
app = FastAPI(title="Reverse Proxy Connections Map API")
|
|
||||||
|
|
||||||
app.add_middleware(
|
|
||||||
CORSMiddleware,
|
|
||||||
allow_origins=["*"],
|
|
||||||
allow_methods=["*"],
|
|
||||||
allow_headers=["*"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# GeoIP Setup
|
|
||||||
# ----------------------------
|
|
||||||
reader = geoip2.database.Reader(GEO_DB_PATH)
|
|
||||||
geo_cache = {} # cache IP lookups to save CPU
|
|
||||||
|
|
||||||
def ip_to_geo(ip):
|
|
||||||
if ip in geo_cache:
|
|
||||||
return geo_cache[ip]
|
|
||||||
try:
|
|
||||||
response = reader.city(ip)
|
|
||||||
latlon = (response.location.latitude, response.location.longitude)
|
|
||||||
except Exception:
|
|
||||||
latlon = (None, None)
|
|
||||||
geo_cache[ip] = latlon
|
|
||||||
return latlon
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Helper: Parse timestamp from line
|
|
||||||
# ----------------------------
|
|
||||||
def line_timestamp(line: str):
|
|
||||||
try:
|
|
||||||
ts_str = line.split(" ", 1)[0]
|
|
||||||
return pd.to_datetime(ts_str)
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Binary search on lines
|
|
||||||
# ----------------------------
|
|
||||||
def find_line_index(lines, target_time, seek_start=True):
|
|
||||||
lo, hi = 0, len(lines) - 1
|
|
||||||
best_idx = None
|
|
||||||
|
|
||||||
while lo <= hi:
|
|
||||||
mid = (lo + hi) // 2
|
|
||||||
ts = line_timestamp(lines[mid])
|
|
||||||
if ts is None:
|
|
||||||
if seek_start:
|
|
||||||
lo = mid + 1
|
|
||||||
else:
|
|
||||||
hi = mid - 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if seek_start:
|
|
||||||
if ts >= target_time:
|
|
||||||
best_idx = mid
|
|
||||||
hi = mid - 1
|
|
||||||
else:
|
|
||||||
lo = mid + 1
|
|
||||||
else:
|
|
||||||
if ts <= target_time:
|
|
||||||
best_idx = mid
|
|
||||||
lo = mid + 1
|
|
||||||
else:
|
|
||||||
hi = mid - 1
|
|
||||||
|
|
||||||
if best_idx is None:
|
|
||||||
return len(lines) - 1 if not seek_start else 0
|
|
||||||
return best_idx
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# List log files and parse dates
|
|
||||||
# ----------------------------
|
|
||||||
def list_log_files() -> List[tuple[str, datetime]]:
|
|
||||||
files = []
|
|
||||||
for f in os.listdir(LOG_DIR):
|
|
||||||
if f.startswith(LOG_PREFIX) and f.endswith(".log"):
|
|
||||||
match = FILENAME_RE.match(f)
|
|
||||||
if not match:
|
|
||||||
continue
|
|
||||||
date_str = match.group(1)
|
|
||||||
try:
|
|
||||||
date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
||||||
files.append((os.path.join(LOG_DIR, f), date))
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
# sort by date and index
|
|
||||||
return sorted(files, key=lambda x: (x[1], x[0]))
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Load logs efficiently using filename dates
|
|
||||||
# ----------------------------
|
|
||||||
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
|
|
||||||
start_dt = pd.to_datetime(start) if start else None
|
|
||||||
end_dt = pd.to_datetime(end) if end else None
|
|
||||||
records = []
|
|
||||||
|
|
||||||
files = list_log_files()
|
|
||||||
if not files:
|
|
||||||
return []
|
|
||||||
|
|
||||||
for file_path, file_date in files:
|
|
||||||
# Skip file if outside range based on filename date
|
|
||||||
if start_dt and file_date.date() < start_dt.date():
|
|
||||||
continue
|
|
||||||
if end_dt and file_date.date() > end_dt.date():
|
|
||||||
continue
|
|
||||||
|
|
||||||
with open(file_path, "r", errors="ignore") as f:
|
|
||||||
lines = f.readlines()
|
|
||||||
|
|
||||||
if not lines:
|
|
||||||
continue
|
|
||||||
|
|
||||||
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
|
|
||||||
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
|
|
||||||
|
|
||||||
for line in lines[start_idx:end_idx + 1]:
|
|
||||||
try:
|
|
||||||
parts = line.strip().split(" ", 3)
|
|
||||||
if len(parts) != 4:
|
|
||||||
continue
|
|
||||||
timestamp, ip, method, path = parts
|
|
||||||
ts = pd.to_datetime(timestamp)
|
|
||||||
if start_dt and ts < start_dt:
|
|
||||||
continue
|
|
||||||
if end_dt and ts > end_dt:
|
|
||||||
break
|
|
||||||
if service and service not in path:
|
|
||||||
continue
|
|
||||||
lat, lon = ip_to_geo(ip)
|
|
||||||
if lat is None or lon is None:
|
|
||||||
continue
|
|
||||||
records.append({
|
|
||||||
"timestamp": ts.isoformat(),
|
|
||||||
"path": path,
|
|
||||||
"lat": lat,
|
|
||||||
"lon": lon
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return records
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# API Endpoints
|
|
||||||
# ----------------------------
|
|
||||||
@app.get("/connections")
|
|
||||||
def get_connections(
|
|
||||||
service: Optional[str] = Query(None, description="Filter by service path"),
|
|
||||||
start: Optional[str] = Query(None, description="Start datetime (ISO format)"),
|
|
||||||
end: Optional[str] = Query(None, description="End datetime (ISO format)")
|
|
||||||
):
|
|
||||||
print("Endpoint hit!", flush=True)
|
|
||||||
return load_logs_binary(service, start, end)
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
|
||||||
def health():
|
|
||||||
files = list_log_files()
|
|
||||||
total_size = sum(os.path.getsize(f[0]) for f in files)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"log_files": len(files),
|
|
||||||
"total_log_size_bytes": total_size,
|
|
||||||
"cached_ips": len(geo_cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Run with Uvicorn
|
|
||||||
# ----------------------------
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import uvicorn
|
|
||||||
uvicorn.run("backend:app", host="0.0.0.0", port=8000, reload=True)
|
|
||||||
|
|
@ -1,2 +0,0 @@
|
||||||
2025-10-23 23:25:31,585 [ERROR] 1
|
|
||||||
2025-10-23 23:25:32,555 [ERROR] 1
|
|
||||||
151
backend.py
151
backend.py
|
|
@ -1,151 +0,0 @@
|
||||||
import os
|
|
||||||
import pandas as pd
|
|
||||||
import geoip2.database
|
|
||||||
from fastapi import FastAPI, Query
|
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# FastAPI Setup
|
|
||||||
# ----------------------------
|
|
||||||
app = FastAPI(title="Reverse Proxy Connections Map API")
|
|
||||||
|
|
||||||
app.add_middleware(
|
|
||||||
CORSMiddleware,
|
|
||||||
allow_origins=["*"],
|
|
||||||
allow_methods=["*"],
|
|
||||||
allow_headers=["*"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# GeoIP Setup
|
|
||||||
# ----------------------------
|
|
||||||
reader = geoip2.database.Reader("GeoLite2-City.mmdb")
|
|
||||||
geo_cache = {} # cache IP lookups to save CPU
|
|
||||||
|
|
||||||
def ip_to_geo(ip):
|
|
||||||
if ip in geo_cache:
|
|
||||||
return geo_cache[ip]
|
|
||||||
try:
|
|
||||||
response = reader.city(ip)
|
|
||||||
latlon = (response.location.latitude, response.location.longitude)
|
|
||||||
except Exception:
|
|
||||||
latlon = (None, None)
|
|
||||||
geo_cache[ip] = latlon
|
|
||||||
return latlon
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Helper: Parse timestamp from line
|
|
||||||
# ----------------------------
|
|
||||||
def line_timestamp(line: str):
|
|
||||||
try:
|
|
||||||
ts_str = line.split(" ", 1)[0]
|
|
||||||
return pd.to_datetime(ts_str)
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Binary search on lines
|
|
||||||
# ----------------------------
|
|
||||||
def find_line_index(lines, target_time, seek_start=True):
|
|
||||||
lo, hi = 0, len(lines) - 1
|
|
||||||
best_idx = None
|
|
||||||
|
|
||||||
while lo <= hi:
|
|
||||||
mid = (lo + hi) // 2
|
|
||||||
ts = line_timestamp(lines[mid])
|
|
||||||
if ts is None:
|
|
||||||
# skip malformed line: move lo forward for start, hi backward for end
|
|
||||||
if seek_start:
|
|
||||||
lo = mid + 1
|
|
||||||
else:
|
|
||||||
hi = mid - 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
if seek_start:
|
|
||||||
if ts >= target_time:
|
|
||||||
best_idx = mid
|
|
||||||
hi = mid - 1 # search earlier
|
|
||||||
else:
|
|
||||||
lo = mid + 1 # search later
|
|
||||||
else:
|
|
||||||
if ts <= target_time:
|
|
||||||
best_idx = mid
|
|
||||||
lo = mid + 1 # search later
|
|
||||||
else:
|
|
||||||
hi = mid - 1 # search earlier
|
|
||||||
|
|
||||||
# For end search, make sure we return the **last index ≤ target**
|
|
||||||
if best_idx is None:
|
|
||||||
return len(lines) - 1 if not seek_start else 0
|
|
||||||
return best_idx
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Load logs using binary search on lines
|
|
||||||
# ----------------------------
|
|
||||||
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
|
|
||||||
start_dt = pd.to_datetime(start) if start else None
|
|
||||||
end_dt = pd.to_datetime(end) if end else None
|
|
||||||
records = []
|
|
||||||
|
|
||||||
with open("file.log", "r", errors="ignore") as f:
|
|
||||||
lines = f.readlines()
|
|
||||||
|
|
||||||
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
|
|
||||||
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
|
|
||||||
|
|
||||||
|
|
||||||
for line in lines[start_idx:end_idx+1]:
|
|
||||||
try:
|
|
||||||
parts = line.strip().split(" ", 3)
|
|
||||||
if len(parts) != 4:
|
|
||||||
continue
|
|
||||||
timestamp, ip, method, path = parts
|
|
||||||
ts = pd.to_datetime(timestamp)
|
|
||||||
if start_dt and ts < start_dt:
|
|
||||||
continue
|
|
||||||
if end_dt and ts > end_dt:
|
|
||||||
break
|
|
||||||
if service and service not in path:
|
|
||||||
continue
|
|
||||||
lat, lon = ip_to_geo(ip)
|
|
||||||
if lat is None or lon is None:
|
|
||||||
continue
|
|
||||||
records.append({
|
|
||||||
"timestamp": ts.isoformat(),
|
|
||||||
"path": path,
|
|
||||||
"lat": lat,
|
|
||||||
"lon": lon
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return records
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# API Endpoint
|
|
||||||
# ----------------------------
|
|
||||||
@app.get("/connections")
|
|
||||||
def get_connections(
|
|
||||||
service: Optional[str] = Query(None, description="Filter by service path"),
|
|
||||||
start: Optional[str] = Query(None, description="Start datetime in ISO format"),
|
|
||||||
end: Optional[str] = Query(None, description="End datetime in ISO format")
|
|
||||||
):
|
|
||||||
return load_logs_binary(service, start, end)
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Healthcheck
|
|
||||||
# ----------------------------
|
|
||||||
@app.get("/health")
|
|
||||||
def health():
|
|
||||||
size = os.path.getsize("file.log")
|
|
||||||
return {"status": "ok", "log_size_bytes": size, "cached_ips": len(geo_cache)}
|
|
||||||
|
|
||||||
# ----------------------------
|
|
||||||
# Run with Uvicorn
|
|
||||||
# ----------------------------
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import uvicorn
|
|
||||||
uvicorn.run("backend:app", host="0.0.0.0", port=8000, reload=True)
|
|
||||||
|
|
@ -1,64 +0,0 @@
|
||||||
import re
|
|
||||||
import ipaddress
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
import time
|
|
||||||
from local import *
|
|
||||||
|
|
||||||
INTERNAL_NETWORKS = [
|
|
||||||
ipaddress.ip_network("10.0.0.0/8"),
|
|
||||||
ipaddress.ip_network("192.168.0.0/16"),
|
|
||||||
ipaddress.ip_network("172.16.0.0/12"),
|
|
||||||
]
|
|
||||||
|
|
||||||
log_line_re = re.compile(
|
|
||||||
r'(?P<ip>\S+) - - \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+"'
|
|
||||||
)
|
|
||||||
|
|
||||||
def is_external(ip):
|
|
||||||
ip_addr = ipaddress.ip_address(ip)
|
|
||||||
return not any(ip_addr in net for net in INTERNAL_NETWORKS)
|
|
||||||
|
|
||||||
def parse_nginx_line(line):
|
|
||||||
match = log_line_re.match(line)
|
|
||||||
if not match:
|
|
||||||
return None
|
|
||||||
data = match.groupdict()
|
|
||||||
if not is_external(data["ip"]):
|
|
||||||
return None
|
|
||||||
dt = datetime.strptime(data["time"], "%d/%b/%Y:%H:%M:%S %z")
|
|
||||||
dt_utc = dt.astimezone(timezone.utc)
|
|
||||||
iso_time = dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
||||||
return f'{iso_time} {data["ip"]} {data["method"]} {data["path"]}'
|
|
||||||
|
|
||||||
def tail(f):
|
|
||||||
f.seek(0, 2) # Go to the end of the file
|
|
||||||
while True:
|
|
||||||
line = f.readline()
|
|
||||||
if not line:
|
|
||||||
time.sleep(0.01) # Sleep very briefly
|
|
||||||
continue
|
|
||||||
yield line
|
|
||||||
|
|
||||||
def main():
|
|
||||||
buffer = []
|
|
||||||
buffer_size = 10 # adjust for your throughput
|
|
||||||
flush_interval = 0.5 # seconds
|
|
||||||
|
|
||||||
with open(ACCESS_LOG, "r") as f:
|
|
||||||
tail_lines = tail(f)
|
|
||||||
last_flush = time.time()
|
|
||||||
for line in tail_lines:
|
|
||||||
parsed = parse_nginx_line(line)
|
|
||||||
if parsed:
|
|
||||||
buffer.append(parsed)
|
|
||||||
# Flush buffer if size reached or interval passed
|
|
||||||
if len(buffer) >= buffer_size or (time.time() - last_flush) > flush_interval:
|
|
||||||
if buffer:
|
|
||||||
with open(OUTPUT_LOG, "a") as out:
|
|
||||||
out.write("\n".join(buffer) + "\n")
|
|
||||||
buffer.clear()
|
|
||||||
last_flush = time.time()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue