import os import uuid import json import logging from datetime import datetime, timezone, timedelta from fastapi import FastAPI, Request, status from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field, validator from typing import Dict, List, Annotated import uuid as uuid_lib from collections import deque from pythonjsonlogger import jsonlogger import sys from .database import RRDDatabase # --- Service Configuration --- # Generate a unique Service UUID on startup, or get it from an environment variable SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4())) database = RRDDatabase() # --- Logging Configuration --- # Get the root logger logger = logging.getLogger() logger.setLevel(logging.INFO) # Custom handler to capture logs class BufferHandler(logging.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Instantiate the formatter once for efficiency self.formatter = jsonlogger.JsonFormatter() def emit(self, record): try: # Format the record as JSON string, then parse to dict log_entry = json.loads(self.formatter.format(record)) log_buffer.add_log(log_entry) except Exception as e: # Log the error to stderr, to avoid recursion or filling the buffer with errors print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr) class LogBuffer: def __init__(self, maxlen=1000): self.buffer = deque(maxlen=maxlen) def add_log(self, record): # Ensure 'asctime' is present or handle its absence timestamp = record.get('asctime') or datetime.utcnow().isoformat() self.buffer.append({ 'timestamp': timestamp, 'level': record.get('levelname'), 'message': record.get('message'), 'extra': {k: v for k, v in record.items() if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']} }) def get_logs(self, limit=100): return list(self.buffer)[-limit:] # Create global log buffer log_buffer = LogBuffer() # Use a handler that streams to stdout logHandler = logging.StreamHandler() # Create a JSON formatter and add it to the handler # The format string adds default log attributes to the JSON output formatter = jsonlogger.JsonFormatter( '%(asctime)s %(name)s %(levelname)s %(message)s' ) logHandler.setFormatter(formatter) # Add handlers to the root logger if not logger.handlers: logger.addHandler(logHandler) buffer_handler = BufferHandler() logger.addHandler(buffer_handler) # --- FastAPI Application --- app = FastAPI( title="Node Monitoring System", description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}" ) # Configure templates for the web interface templates = Jinja2Templates(directory="app/web/templates") # Mount static files directory app.mount("/static", StaticFiles(directory="app/web/static"), name="static") # --- Data Models (as defined in the project spec) --- class NodeStatusModel(BaseModel): uptime_seconds: int load_avg: Annotated[List[float], Field(min_length=3, max_length=3)] memory_usage_percent: float class PingModel(BaseModel): pings: Dict[Annotated[str, Field(pattern=r'^[0-9a-fA-F-]{36}$')], float] class StatusUpdate(BaseModel): node: str = Field(..., description="Node UUID") timestamp: datetime status: NodeStatusModel pings: Dict[str, float] @validator('node') def validate_node_uuid(cls, v): try: uuid_lib.UUID(v) return v except ValueError: raise ValueError('Invalid UUID format') @validator('pings') def validate_ping_uuids(cls, v): for key in v.keys(): try: uuid_lib.UUID(key) except ValueError: raise ValueError(f'Invalid UUID format in pings: {key}') return v # --- Node Management and Health Logic --- # A mock database of known nodes, now storing more comprehensive data known_nodes_db: Dict[str, Dict] = {} # Health calculation constants (can be tuned) LOAD_AVG_WARNING_THRESHOLD = 1.5 LOAD_AVG_CRITICAL_THRESHOLD = 3.0 MEMORY_WARNING_THRESHOLD = 75.0 MEMORY_CRITICAL_THRESHOLD = 90.0 # If a node hasn't reported in this many seconds, it's considered critical LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30 def get_node_health(node_data: Dict) -> str: """Calculates the health status based on node metrics and last seen time.""" # Check for liveness first last_seen_str = node_data.get("last_seen") if last_seen_str: last_seen_dt = datetime.fromisoformat(last_seen_str).replace(tzinfo=timezone.utc) time_since_last_seen = (datetime.now(timezone.utc) - last_seen_dt).total_seconds() if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS: return "critical" # Node has not reported recently else: return "unknown" # Should not happen if 'last_seen' is always set status_model_data = node_data.get("status") if not status_model_data: # This could happen if a node is just discovered but hasn't sent a full status update yet return "unknown" try: status = NodeStatusModel(**status_model_data) except Exception: logger.error(f"Could not parse status data for node {node_data.get('uuid')}", exc_info=True) return "unknown" # Or critical if parsing fails # Check load average (using 1-minute load for primary indicator) load_1min = status.load_avg[0] if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD: return "critical" elif load_1min >= LOAD_AVG_WARNING_THRESHOLD: return "warning" # Check memory usage if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD: return "critical" elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD: return "warning" return "healthy" # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Serves the main web page which displays the Service UUID and the node grid.""" logger.info( "Web root accessed", extra={'client_ip': request.client.host, 'service_uuid': SERVICE_UUID} ) return templates.TemplateResponse( "index.html", {"request": request, "service_uuid": SERVICE_UUID} ) @app.get("/{service_uuid}/logs") async def get_logs(service_uuid: str, limit: int = 100): """Get recent logs for the service.""" if service_uuid != SERVICE_UUID: return JSONResponse( status_code=404, content={"error": "Service UUID not found"} ) logs = log_buffer.get_logs(limit) return { "service_uuid": service_uuid, "log_count": len(logs), "logs": logs } @app.put("/{service_uuid}/{node_uuid}/", status_code=status.HTTP_200_OK) async def update_node_status( service_uuid: str, node_uuid: str, status_update: StatusUpdate, request: Request ): """Receives status updates from a node and returns a list of peers.""" logger.info( "Received node status update", extra={ 'event_type': 'node_status_update', 'client_ip': request.client.host, 'service_uuid': service_uuid, 'node_uuid': node_uuid, 'data': status_update.dict() } ) if service_uuid != SERVICE_UUID: logger.warning( "Node sent status to wrong service UUID", extra={'client_node_uuid': node_uuid, 'target_uuid': service_uuid} ) return {"error": "Service UUID mismatch", "peers": []} # Update RRD database with system metrics try: database.update_system_metrics( node_uuid=node_uuid, timestamp=status_update.timestamp, uptime_seconds=status_update.status.uptime_seconds, load_avg=status_update.status.load_avg, memory_usage_percent=status_update.status.memory_usage_percent ) # Update ping metrics for target_uuid, latency in status_update.pings.items(): database.update_ping_metrics( node_uuid=node_uuid, target_uuid=target_uuid, timestamp=status_update.timestamp, latency_ms=latency ) except Exception as e: logger.error(f"Database update failed: {e}", exc_info=True) # Continue processing even if DB update fails # Auto-discovery logic and update known_nodes_db with full status current_time_utc = datetime.now(timezone.utc) known_nodes_db[node_uuid] = { "last_seen": current_time_utc.isoformat(), "ip": request.client.host, "status": status_update.status.dict(), # Store the dict representation # Store direct values for convenience in /nodes/status endpoint "uptime_seconds": status_update.status.uptime_seconds, "load_avg": status_update.status.load_avg, "memory_usage_percent": status_update.status.memory_usage_percent } # Calculate health for logging purposes (it will be recalculated for /nodes/status) health_status_for_log = get_node_health(known_nodes_db[node_uuid]) logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}") # Respond with the list of other known peers peer_list = {uuid: {"last_seen": data["last_seen"], "ip": data["ip"]} for uuid, data in known_nodes_db.items() if uuid != node_uuid} return {"message": "Status received", "peers": peer_list} @app.get("/nodes/status") async def get_all_nodes_status(): """Returns the current status of all known nodes for the UI.""" logger.info("Fetching all nodes status for UI.") response_nodes = [] for node_uuid, data in known_nodes_db.items(): # Dynamically calculate health for each node based on its current data current_health = get_node_health(data) response_nodes.append({ "uuid": node_uuid, "last_seen": data["last_seen"], "ip": data["ip"], "health_status": current_health, "uptime_seconds": data.get("uptime_seconds"), "load_avg": data.get("load_avg"), "memory_usage_percent": data.get("memory_usage_percent") }) return {"nodes": response_nodes} @app.get("/health") async def health_check(): """Health check endpoint for container orchestration.""" return {"status": "ok"}