completed

This commit is contained in:
lichene 2025-10-26 11:31:11 +01:00
parent b816412306
commit 26985812a4
7 changed files with 387 additions and 25 deletions

2
README
View file

@ -1,4 +1,4 @@
# geolog
Tool to monitor the internet backgroung radiation.
This readme is still to be done.
The nginx logs rotate every day at midnight so we read every day with a cronjob the last completed access log

View file

@ -0,0 +1,24 @@
192.168.1.1 - - [25/Oct/2025:00:03:14 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:16 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:18 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:20 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:22 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:24 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:26 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:28 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
151.62.125.56 - cur [25/Oct/2025:00:03:30 +0200] "GET /ocs/v2.php/apps/notifications/api/v2/notifications?format=json HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchitecture: >
151.62.125.56 - cur [25/Oct/2025:00:03:30 +0200] "GET /ocs/v2.php/apps/user_status/api/v1/user_status?format=json HTTP/1.1" 200 149 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchitecture: x8>
151.62.125.56 - Freekkettone [25/Oct/2025:00:03:30 +0200] "GET /ocs/v2.php/apps/user_status/api/v1/user_status?format=json HTTP/1.1" 200 173 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchite>
151.62.125.56 - Freekkettone [25/Oct/2025:00:03:30 +0200] "GET /ocs/v2.php/apps/notifications/api/v2/notifications?format=json HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchi>
151.62.125.56 - Freekkettone [25/Oct/2025:00:03:30 +0200] "PROPFIND /remote.php/dav/files/Freekkettone/ HTTP/1.1" 207 254 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchitecture: x86_64 OsArc>
151.62.125.56 - cur [25/Oct/2025:00:03:30 +0200] "PROPFIND /remote.php/dav/files/cur/ HTTP/1.1" 207 250 "-" "Mozilla/5.0 (Linux) mirall/3.17.2daily (Nextcloud, manjaro-6.12.48-1-MANJARO ClientArchitecture: x86_64 OsArchitecture: x86_64)"
192.168.1.1 - - [25/Oct/2025:00:03:30 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:32 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:35 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:36 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:38 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:40 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:42 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:44 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:46 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"
192.168.1.1 - - [25/Oct/2025:00:03:48 +0200] "POST /api/actions/runner.v1.RunnerService/FetchTask HTTP/2.0" 200 2 "-" "connect-go/1.18.1 (go1.24.4)"

226
backend2.py Normal file
View file

@ -0,0 +1,226 @@
#!/usr/bin/env python3
import os
import re
import pandas as pd
import geoip2.database
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional, List
from datetime import datetime
from compressor import *
#logging
import logging
logger = logging.getLogger('uvicorn.error')
# ----------------------------
# Configuration
# ----------------------------
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
LOG_PREFIX = "filtered_" # matches cron-generated files
GEO_DB_PATH = "GeoLite2-City.mmdb"
FILENAME_RE = re.compile(r"filtered_(\d{4}-\d{2}-\d{2})_(\d+)\.bin")
# ----------------------------
# FastAPI Setup
# ----------------------------
app = FastAPI(title="Reverse Proxy Connections Map API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# ----------------------------
# GeoIP Setup
# ----------------------------
reader = geoip2.database.Reader(GEO_DB_PATH)
geo_cache = {} # cache IP lookups to save CPU
def ip_to_geo(ip):
if ip in geo_cache:
return geo_cache[ip]
try:
response = reader.city(ip)
latlon = (response.location.latitude, response.location.longitude)
except Exception:
latlon = (None, None)
geo_cache[ip] = latlon
return latlon
# ----------------------------
# Helper: Parse timestamp from line
# ----------------------------
def line_timestamp(line: str):
try:
ts_str = line.split(" ", 1)[0]
return pd.to_datetime(ts_str)
except Exception:
return None
# ----------------------------
# Binary search on lines
# ----------------------------
def find_line_index(lines, target_time, seek_start=True):
lo, hi = 0, len(lines) - 1
best_idx = None
while lo <= hi:
mid = (lo + hi) // 2
ts = line_timestamp(lines[mid])
if ts is None:
if seek_start:
lo = mid + 1
else:
hi = mid - 1
continue
if seek_start:
if ts >= target_time:
best_idx = mid
hi = mid - 1
else:
lo = mid + 1
else:
if ts <= target_time:
best_idx = mid
lo = mid + 1
else:
hi = mid - 1
if best_idx is None:
return len(lines) - 1 if not seek_start else 0
return best_idx
# ----------------------------
# List log files and parse dates
# ----------------------------
def list_log_files() -> List[tuple[str, datetime]]:
files = []
for f in os.listdir(LOG_DIR):
if f.startswith(LOG_PREFIX) and f.endswith(".bin"):
match = FILENAME_RE.match(f)
if not match:
continue
date_str = match.group(1)
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
files.append((os.path.join(LOG_DIR, f), date))
except Exception:
continue
# sort by date and index
return sorted(files, key=lambda x: (x[1], x[0]))
def read_compressed_log(log_file):
"""Helper to read and decompress a log file."""
compressor = load_or_create_compressor()
entries = []
print(log_file)
with open(log_file, "rb") as f:
while chunk := f.read(13):
if len(chunk) < 13: # Incomplete entry at end of file
print(f"Warning: Incomplete entry at end of {log_file} ({len(chunk)} bytes), skipping")
break
try:
iso_time, ip, method, path = compressor.decompress_entry(chunk)
entries.append(f"{iso_time} {ip} {method} {path}")
except Exception as e:
print(f"Warning: Failed to decompress entry: {e}")
continue
return entries
# ----------------------------
# Load logs efficiently using filename dates
# ----------------------------
def load_logs_binary(service: Optional[str], start: Optional[str], end: Optional[str]):
start_dt = pd.to_datetime(start) if start else None
end_dt = pd.to_datetime(end) if end else None
records = []
files = list_log_files()
logger.error(files)
if not files:
return []
for file_path, file_date in files:
# Skip file if outside range based on filename date
if start_dt and file_date.date() < start_dt.date():
continue
if end_dt and file_date.date() > end_dt.date():
continue
#with open(file_path, "r", errors="ignore") as f:
#lines = f.readlines()
lines = read_compressed_log(file_path)
if not lines:
continue
start_idx = find_line_index(lines, start_dt, seek_start=True) if start_dt else 0
end_idx = find_line_index(lines, end_dt, seek_start=False) if end_dt else len(lines) - 1
for line in lines[start_idx:end_idx + 1]:
try:
parts = line.strip().split(" ", 3)
if len(parts) != 4:
continue
timestamp, ip, method, path = parts
ts = pd.to_datetime(timestamp)
if start_dt and ts < start_dt:
continue
if end_dt and ts > end_dt:
break
if service and service not in path:
continue
lat, lon = ip_to_geo(ip)
if lat is None or lon is None:
continue
records.append({
"timestamp": ts.isoformat(),
"path": path,
"lat": lat,
"lon": lon
})
except Exception:
continue
return records
# ----------------------------
# API Endpoints
# ----------------------------
@app.get("/connections")
def get_connections(
service: Optional[str] = Query(None, description="Filter by service path"),
start: Optional[str] = Query(None, description="Start datetime (ISO format)"),
end: Optional[str] = Query(None, description="End datetime (ISO format)")
):
logger.error("Endpoint hit!")
return load_logs_binary(service, start, end)
@app.get("/health")
def health():
files = list_log_files()
total_size = sum(os.path.getsize(f[0]) for f in files)
return {
"status": "ok",
"log_files": len(files),
"total_log_size_bytes": total_size,
"cached_ips": len(geo_cache)
}
# ----------------------------
# Run with Uvicorn
# ----------------------------
if __name__ == "__main__":
import uvicorn
uvicorn.run("backend2:app", host="0.0.0.0", port=8000, reload=True)

76
compressor.py Normal file
View file

@ -0,0 +1,76 @@
import struct
from datetime import datetime, timezone
import pickle
import os
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
# Global compressor instance (needs to persist across runs)
COMPRESSOR_FILE = os.path.join(LOG_DIR, "compressor_state.pkl")
def load_or_create_compressor():
"""Load existing compressor state or create new one."""
if os.path.exists(COMPRESSOR_FILE):
with open(COMPRESSOR_FILE, "rb") as f:
return pickle.load(f)
return LogCompressor()
def save_compressor(compressor):
"""Save compressor state to preserve path dictionary."""
with open(COMPRESSOR_FILE, "wb") as f:
pickle.dump(compressor, f)
class LogCompressor:
def __init__(self):
self.path_to_id = {}
self.id_to_path = {}
self.next_path_id = 0
self.method_map = {"GET": 0, "POST": 1, "PUT": 2, "DELETE": 3, "HEAD": 4, "PATCH": 5, "OPTIONS": 6}
def get_path_id(self, path):
"""Get or create ID for a path."""
if path not in self.path_to_id:
self.path_to_id[path] = self.next_path_id
self.id_to_path[self.next_path_id] = path
self.next_path_id += 1
return self.path_to_id[path]
def compress_entry(self, iso_time, ip, method, path):
"""
Compress a log entry to bytes.
Format: 4 bytes (timestamp) + 4 bytes (IP) + 1 byte (method) + 4 bytes (path_id) = 13 bytes
"""
# Timestamp: Unix timestamp as 4-byte unsigned int (good until 2106)
dt = datetime.strptime(iso_time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
timestamp = int(dt.timestamp())
# IP: Convert to 4 bytes
ip_parts = [int(part) for part in ip.split('.')]
ip_int = (ip_parts[0] << 24) + (ip_parts[1] << 16) + (ip_parts[2] << 8) + ip_parts[3]
# Method: 1 byte
method_id = self.method_map.get(method, 255) # 255 for unknown
# Path: Get ID (4 bytes for path index)
path_id = self.get_path_id(path)
# Pack into bytes: I=unsigned int (4 bytes), B=unsigned char (1 byte)
return struct.pack('<IIBI', timestamp, ip_int, method_id, path_id)
def decompress_entry(self, data):
"""Decompress bytes back to log entry."""
timestamp, ip_int, method_id, path_id = struct.unpack('<IIBI', data)
# Timestamp
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
iso_time = dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# IP
ip = f"{(ip_int >> 24) & 0xFF}.{(ip_int >> 16) & 0xFF}.{(ip_int >> 8) & 0xFF}.{ip_int & 0xFF}"
# Method
method = {v: k for k, v in self.method_map.items()}.get(method_id, "UNKNOWN")
# Path
path = self.id_to_path.get(path_id, "UNKNOWN")
return iso_time, ip, method, path

1
config.js Normal file
View file

@ -0,0 +1 @@
const apiBaseUrl = 'http://localhost:8000';

View file

@ -49,6 +49,7 @@
<div id="map"></div>
<script src="https://unpkg.com/leaflet/dist/leaflet.js"></script>
<script src="./config.js"></script>
<script>
const OUR_COORDS = [46.0660875,11.1501202];
const MAP_CENTER = [20, 0];
@ -85,9 +86,10 @@
let animTimers = [];
let isPlaying = false;
let simStartMs = 0, simEndMs = 0;
async function fetchConnections(service, start, end) {
let url = `/connections?`;
let url = `${apiBaseUrl}/connections?`;
if (service) url += `service=${encodeURIComponent(service)}&`;
if (start) url += `start=${new Date(start).toISOString()}&`;
if (end) url += `end=${new Date(end).toISOString()}&`;

View file

@ -4,12 +4,17 @@ import ipaddress
from datetime import datetime, timezone
import os
from local import * # Make sure this defines ACCESS_LOG, LOG_DIR, etc.
from compressor import *
import pickle
# ==== CONFIGURATION ====
MAX_LOG_LINES = 50000 # adjust as needed
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(LOG_DIR, exist_ok=True)
COMPRESSOR_FILE = os.path.join(LOG_DIR, "compressor_state.pkl")
INTERNAL_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("192.168.0.0/16"),
@ -17,14 +22,15 @@ INTERNAL_NETWORKS = [
]
log_line_re = re.compile(
r'(?P<ip>\S+) - - \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+"'
r'(?P<ip>\S+) - (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) \S+"'
)
def is_external(ip):
ip_addr = ipaddress.ip_address(ip)
return not any(ip_addr in net for net in INTERNAL_NETWORKS)
def parse_nginx_line(line):
def parse_and_compress_nginx_line(line):
match = log_line_re.match(line)
if not match:
return None
@ -34,45 +40,72 @@ def parse_nginx_line(line):
dt = datetime.strptime(data["time"], "%d/%b/%Y:%H:%M:%S %z")
dt_utc = dt.astimezone(timezone.utc)
iso_time = dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
return f'{iso_time} {data["ip"]} {data["method"]} {data["path"]}'
# Return compressed bytes instead of string
return compressor.compress_entry(iso_time, data["ip"], data["method"], data["path"])
def get_current_logfile():
"""Find or create the latest log file with line limit."""
"""Find or create the latest log file with entry limit."""
today = datetime.now().strftime("%Y-%m-%d")
base_name = os.path.join(LOG_DIR, f"filtered_{today}")
# Find the highest index by looping until file doesn't exist
index = 1
while True:
log_file = f"{base_name}_{index}.log"
if not os.path.exists(log_file):
return log_file
# Check line count
with open(log_file, "r") as f:
line_count = sum(1 for _ in f)
if line_count < MAX_LOG_LINES:
return log_file
while os.path.exists(f"{base_name}_{index}.bin"):
index += 1
# index is now one past the last existing file
# Check if the last existing file (index - 1) has room
if index > 1:
last_file = f"{base_name}_{index - 1}.bin"
# Count entries (13 bytes each)
file_size = os.path.getsize(last_file)
entry_count = file_size // 13
if entry_count < MAX_LOG_LINES:
return last_file
# Either no files exist or last file is full, create new one
return f"{base_name}_{index}.bin"
def process_log():
"""Process and compress nginx logs."""
compressor = load_or_create_compressor()
output_file = get_current_logfile()
buffer = []
with open(ACCESS_LOG, "r") as f:
for line in f:
parsed = parse_nginx_line(line)
if parsed:
buffer.append(parsed)
match = log_line_re.match(line)
if not match:
continue
data = match.groupdict()
if not is_external(data["ip"]):
continue
# Parse and convert to UTC
dt = datetime.strptime(data["time"], "%d/%b/%Y:%H:%M:%S %z")
dt_utc = dt.astimezone(timezone.utc)
iso_time = dt_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# Compress entry
compressed = compressor.compress_entry(
iso_time, data["ip"], data["method"], data["path"]
)
buffer.append(compressed)
# Write compressed binary data
if buffer:
with open(output_file, "a") as out:
out.write("\n".join(buffer) + "\n")
def flush_access_log():
"""Safely truncate the access log after processing."""
with open(ACCESS_LOG, "w"):
pass # Opening with 'w' truncates file
with open(output_file, "ab") as out: # 'ab' for append binary
for entry in buffer:
out.write(entry)
# Save compressor state (path dictionary)
save_compressor(compressor)
def main():
process_log()
#flush_access_log()
if __name__ == "__main__":
main()