import os import uuid import json import logging from datetime import datetime, timezone, timedelta from fastapi import FastAPI, Request, status, Query from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field, validator from typing import Dict, List, Annotated import uuid as uuid_lib from collections import deque from dateutil.parser import isoparse from pythonjsonlogger import jsonlogger import sys from .database import RRDDatabase # --- Service Configuration --- SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4())) database = RRDDatabase() # --- Logging Configuration --- logger = logging.getLogger() logger.setLevel(logging.INFO) class BufferHandler(logging.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.formatter = jsonlogger.JsonFormatter() def emit(self, record): try: # Format the record as a JSON string and then parse it back to a dict # This ensures consistency with the jsonlogger's output format log_entry = json.loads(self.formatter.format(record)) log_buffer.add_log(log_entry) except Exception as e: print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr) class LogBuffer: def __init__(self, maxlen=1000): self.buffer = deque(maxlen=maxlen) def add_log(self, record): # Assuming 'record' here is already a dictionary parsed from the JSON log string timestamp = record.get('asctime') or datetime.utcnow().isoformat() self.buffer.append({ 'timestamp': timestamp, 'level': record.get('levelname'), 'message': record.get('message'), 'extra': {k: v for k, v in record.items() if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']} }) def get_logs(self, limit=100, level=None, since=None): logger.debug(f"Fetching logs with limit={limit}, level={level}, since={since}") logs = list(self.buffer) # Apply level filter if level and level.strip(): level = level.upper() valid_levels = {'INFO', 'WARNING', 'ERROR', 'DEBUG'} if level in valid_levels: logs = [log for log in logs if log['level'].upper() == level] else: logger.warning(f"Invalid log level: {level}") # Apply since filter if since: try: # Use isoparse for robust parsing of ISO 8601 strings since_dt = isoparse(since) # If the parsed datetime is naive (no timezone info), assume it's UTC if since_dt.tzinfo is None: since_dt = since_dt.replace(tzinfo=timezone.utc) else: # If it has timezone info, convert it to UTC for consistent comparison since_dt = since_dt.astimezone(timezone.utc) logs = [log for log in logs if datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00')).astimezone(timezone.utc) >= since_dt] except ValueError: logger.warning(f"Invalid 'since' timestamp: {since}") logger.debug(f"Returning {len(logs[-limit:])} logs") return logs[-limit:] log_buffer = LogBuffer() logHandler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter( '%(asctime)s %(name)s %(levelname)s %(message)s' ) logHandler.setFormatter(formatter) if not logger.handlers: logger.addHandler(logHandler) logger.addHandler(BufferHandler()) # Configure Uvicorn's loggers to propagate to the root logger # This ensures Uvicorn's startup and access logs are captured by our BufferHandler logging.getLogger("uvicorn").propagate = True logging.getLogger("uvicorn.access").propagate = True logging.getLogger("uvicorn.error").propagate = True # --- FastAPI Application --- app = FastAPI( title="Node Monitoring System", description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}" ) templates = Jinja2Templates(directory="app/web/templates") app.mount("/static", StaticFiles(directory="app/web/static"), name="static") # --- Data Models --- class NodeStatusModel(BaseModel): uptime_seconds: int load_avg: Annotated[List[float], Field(min_length=3, max_length=3)] memory_usage_percent: float class PingModel(BaseModel): pings: Dict[Annotated[str, Field(pattern=r'^[0-9a-fA-F-]{36}$')], float] class StatusUpdate(BaseModel): node: str = Field(..., description="Node UUID") timestamp: datetime status: NodeStatusModel pings: Dict[str, float] @validator('node') def validate_node_uuid(cls, v): try: uuid_lib.UUID(v) return v except ValueError: raise ValueError('Invalid UUID format') @validator('pings') def validate_ping_uuids(cls, v): for key in v.keys(): try: uuid_lib.UUID(key) except ValueError: raise ValueError(f'Invalid UUID format in pings: {key}') return v # --- Node Management and Health Logic --- known_nodes_db: Dict[str, Dict] = {} LOAD_AVG_WARNING_THRESHOLD = 1.5 LOAD_AVG_CRITICAL_THRESHOLD = 3.0 MEMORY_WARNING_THRESHOLD = 75.0 MEMORY_CRITICAL_THRESHOLD = 90.0 LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30 NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS = 300 # Remove node from UI after 5 minutes of inactivity def get_node_health(node_data: Dict) -> str: last_seen_str = node_data.get("last_seen") if last_seen_str: last_seen_dt = datetime.fromisoformat(last_seen_str).replace(tzinfo=timezone.utc) time_since_last_seen = (datetime.now(timezone.utc) - last_seen_dt).total_seconds() if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS: return "critical" else: return "unknown" status_model_data = node_data.get("status") if not status_model_data: return "unknown" try: status = NodeStatusModel(**status_model_data) except Exception: logger.error(f"Could not parse status data for node {node_data.get('uuid')}", exc_info=True) return "unknown" load_1min = status.load_avg[0] if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD: return "critical" elif load_1min >= LOAD_AVG_WARNING_THRESHOLD: return "warning" if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD: return "critical" elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD: return "warning" return "healthy" # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): # Use X-Forwarded-For if available, otherwise client.host client_ip = request.headers.get("x-forwarded-for", request.client.host) logger.info( "Web root accessed", extra={'client_ip': client_ip, 'service_uuid': SERVICE_UUID} ) return templates.TemplateResponse( "index.html", { "request": request, "service_uuid": SERVICE_UUID, "url_for": request.url_for, # Pass url_for for dynamic URL generation "root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL } ) @app.get("/{service_uuid}/logs") async def get_logs( request: Request, service_uuid: str, limit: int = 100, format: str = Query(None, description="Response format: 'json' for JSON, default is HTML"), level: str = Query(None, description="Filter logs by level: INFO, WARNING, ERROR"), since: str = Query(None, description="Fetch logs since ISO timestamp, e.g., 2025-06-11T13:32:00") ): # Use X-Forwarded-For if available, otherwise client.host client_ip = request.headers.get("x-forwarded-for", request.client.host) logger.info( "Logs endpoint accessed", extra={ 'service_uuid': service_uuid, 'format': format, 'level': level, 'since': since, 'limit': limit, 'client_ip': client_ip } ) if service_uuid != SERVICE_UUID: logger.warning(f"Invalid service UUID: {service_uuid}") return JSONResponse( status_code=404, content={"error": "Service UUID not found"} ) try: logs = log_buffer.get_logs(limit=limit, level=level, since=since) log_data = { "service_uuid": service_uuid, "log_count": len(logs), "logs": logs } logger.debug(f"Fetched {len(logs)} logs for response") except Exception as e: logger.error(f"Error fetching logs: {e}", exc_info=True) return JSONResponse( status_code=500, content={"error": "Failed to fetch logs"} ) if format == "json": logger.debug("Returning JSON response") return JSONResponse(content=log_data) logger.debug("Rendering logs.html template") try: return templates.TemplateResponse( "logs.html", { "request": request, "service_uuid": service_uuid, "logs": logs, "log_count": len(logs), "url_for": request.url_for, # Pass url_for for dynamic URL generation "root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL } ) except Exception as e: logger.error(f"Error rendering logs.html: {e}", exc_info=True) return JSONResponse( status_code=500, content={"error": "Failed to render logs page"} ) @app.put("/{service_uuid}/{node_uuid}/") async def update_node_status( service_uuid: str, node_uuid: str, status_update: StatusUpdate, request: Request ): # Use X-Forwarded-For if available, otherwise client.host client_ip = request.headers.get("x-forwarded-for", request.client.host) logger.info( "Received node status update", extra={ 'event_type': 'node_status_update', 'client_ip': client_ip, 'service_uuid': service_uuid, 'node_uuid': node_uuid, 'data': status_update.dict() } ) if service_uuid != SERVICE_UUID: logger.warning( "Node sent status to wrong service UUID", extra={'client_node_uuid': node_uuid, 'target_uuid': service_uuid} ) return {"error": "Service UUID mismatch", "peers": []} try: database.update_system_metrics( node_uuid=node_uuid, timestamp=status_update.timestamp, uptime_seconds=status_update.status.uptime_seconds, load_avg=status_update.status.load_avg, memory_usage_percent=status_update.status.memory_usage_percent ) for target_uuid, latency in status_update.pings.items(): database.update_ping_metrics( node_uuid=node_uuid, target_uuid=target_uuid, timestamp=status_update.timestamp, latency_ms=latency ) except Exception as e: logger.error(f"Database update failed: {e}", exc_info=True) current_time_utc = datetime.now(timezone.utc) known_nodes_db[node_uuid] = { "last_seen": current_time_utc.isoformat(), "ip": request.client.host, # Keep original client.host here as it's the direct connection "status": status_update.status.dict(), "uptime_seconds": status_update.status.uptime_seconds, "load_avg": status_update.status.load_avg, "memory_usage_percent": status_update.status.memory_usage_percent } health_status_for_log = get_node_health(known_nodes_db[node_uuid]) logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}") peer_list = {uuid: {"last_seen": data["last_seen"], "ip": data["ip"]} for uuid, data in known_nodes_db.items() if uuid != node_uuid} return {"message": "Status received", "peers": peer_list} @app.get("/nodes/status") async def get_all_nodes_status(): logger.info("Fetching all nodes status for UI.") # Prune inactive nodes from known_nodes_db before processing current_time_utc = datetime.now(timezone.utc) nodes_to_remove = [] for node_uuid, data in known_nodes_db.items(): last_seen_dt = datetime.fromisoformat(data["last_seen"]).replace(tzinfo=timezone.utc) if (current_time_utc - last_seen_dt).total_seconds() > NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS: nodes_to_remove.append(node_uuid) logger.info(f"Removing inactive node {node_uuid} from known_nodes_db.") for node_uuid in nodes_to_remove: known_nodes_db.pop(node_uuid) response_nodes = [] for node_uuid, data in known_nodes_db.items(): current_health = get_node_health(data) connections = {} for target_uuid in known_nodes_db: # Only iterate over currently active nodes if target_uuid != node_uuid: ping_data = database.get_ping_data(node_uuid, target_uuid, start_time="-300s") latency_ms = None if ping_data and ping_data['data']['latency']: # Get the most recent non-None latency for latency in reversed(ping_data['data']['latency']): if latency is not None and not (isinstance(latency, float) and latency == 0.0): # Exclude 0.0 which might be a default latency_ms = float(latency) break connections[target_uuid] = latency_ms response_nodes.append({ "uuid": node_uuid, "last_seen": data["last_seen"], "ip": data["ip"], "health_status": current_health, "uptime_seconds": data.get("uptime_seconds"), "load_avg": data.get("load_avg"), "memory_usage_percent": data.get("memory_usage_percent"), "connections": connections }) return {"nodes": response_nodes} @app.get("/health") async def health_check(): return {"status": "ok"} # --- END OF FILE main.py ---