diff --git a/Dockerfile b/Dockerfile index 6414bbc..af2af21 100644 --- a/Dockerfile +++ b/Dockerfile @@ -28,15 +28,17 @@ RUN apt-get purge -y build-essential python3-dev && \ # Copy application code COPY app/ ./app/ -# Create directory for RRD data -RUN mkdir -p /app/data +# Create directory for RRD data at /data (will be volume mounted) +RUN mkdir -p /data # Expose port EXPOSE 8000 # Create non-root user for security RUN useradd --create-home --shell /bin/bash appuser && \ - chown -R appuser:appuser /app + chown -R appuser:appuser /app && \ + chown -R appuser:appuser /data && \ + chmod 777 /data USER appuser # Health check @@ -44,4 +46,5 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD wget --no-verbose --tries=1 --spider http://localhost:8000/health || exit 1 # Run the application -CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] + diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..e5d11d6 --- /dev/null +++ b/app/database.py @@ -0,0 +1,257 @@ +import os +import rrdtool +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple +from pathlib import Path + +logger = logging.getLogger(__name__) + +class RRDDatabase: + def __init__(self, data_dir: str = None): + # Use environment variable or default to /data + if data_dir is None: + data_dir = os.environ.get("DATA_DIR", "/data") + + self.data_dir = Path(data_dir) + + # Create data directory if it doesn't exist + try: + self.data_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Using data directory: {self.data_dir}") + except PermissionError: + logger.error(f"Permission denied creating data directory: {self.data_dir}") + raise + except Exception as e: + logger.error(f"Failed to create data directory {self.data_dir}: {e}") + raise + + # RRD configuration + self.step = 60 # 1-minute intervals + self.heartbeat = 120 # 2-minute heartbeat (allow 1 missed update) + + # Retention policy (6 months total) + self.rra_config = [ + "RRA:AVERAGE:0.5:1:1440", # 1-min avg for 24 hours (1440 points) + "RRA:AVERAGE:0.5:60:744", # 1-hour avg for 31 days (744 points) + "RRA:AVERAGE:0.5:1440:180", # 1-day avg for 6 months (180 points) + "RRA:MAX:0.5:1:1440", # 1-min max for 24 hours + "RRA:MAX:0.5:60:744", # 1-hour max for 31 days + "RRA:MIN:0.5:60:744", # 1-hour min for 31 days + ] + + def _get_node_dir(self, node_uuid: str) -> Path: + """Get the directory path for a specific node's RRD files.""" + node_dir = self.data_dir / node_uuid + node_dir.mkdir(exist_ok=True) + return node_dir + + def _create_system_rrd(self, node_uuid: str) -> str: + """Create RRD file for system metrics (uptime, load, memory).""" + rrd_file = self._get_node_dir(node_uuid) / "system.rrd" + + if rrd_file.exists(): + return str(rrd_file) + + try: + rrdtool.create( + str(rrd_file), + "--step", str(self.step), + # Data sources + f"DS:uptime:GAUGE:{self.heartbeat}:0:U", # Uptime in seconds + f"DS:load1:GAUGE:{self.heartbeat}:0:100", # 1-min load average + f"DS:load5:GAUGE:{self.heartbeat}:0:100", # 5-min load average + f"DS:load15:GAUGE:{self.heartbeat}:0:100", # 15-min load average + f"DS:memory:GAUGE:{self.heartbeat}:0:100", # Memory usage % + # Round Robin Archives + *self.rra_config + ) + logger.info(f"Created system RRD for node {node_uuid}") + return str(rrd_file) + except Exception as e: + logger.error(f"Failed to create system RRD for {node_uuid}: {e}") + raise + + def _create_ping_rrd(self, node_uuid: str, target_uuid: str) -> str: + """Create RRD file for ping metrics between two nodes.""" + rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd" + + if rrd_file.exists(): + return str(rrd_file) + + try: + rrdtool.create( + str(rrd_file), + "--step", str(self.step), + # Data sources for ping metrics + f"DS:latency:GAUGE:{self.heartbeat}:0:10000", # Ping latency in ms + f"DS:loss:GAUGE:{self.heartbeat}:0:100", # Packet loss % + # Round Robin Archives + *self.rra_config + ) + logger.info(f"Created ping RRD for {node_uuid} -> {target_uuid}") + return str(rrd_file) + except Exception as e: + logger.error(f"Failed to create ping RRD for {node_uuid}->{target_uuid}: {e}") + raise + + def update_system_metrics(self, node_uuid: str, timestamp: datetime, + uptime_seconds: int, load_avg: List[float], + memory_usage_percent: float): + """Update system metrics for a node.""" + try: + rrd_file = self._create_system_rrd(node_uuid) + + # Convert datetime to Unix timestamp + unix_time = int(timestamp.timestamp()) + + # Format: timestamp:uptime:load1:load5:load15:memory + values = f"{unix_time}:{uptime_seconds}:{load_avg[0]}:{load_avg[1]}:{load_avg[2]}:{memory_usage_percent}" + + rrdtool.update(rrd_file, values) + logger.debug(f"Updated system metrics for {node_uuid}: {values}") + + except Exception as e: + logger.error(f"Failed to update system metrics for {node_uuid}: {e}") + raise + + def update_ping_metrics(self, node_uuid: str, target_uuid: str, + timestamp: datetime, latency_ms: float): + """Update ping metrics between two nodes.""" + try: + rrd_file = self._create_ping_rrd(node_uuid, target_uuid) + + unix_time = int(timestamp.timestamp()) + + # For now, we only track latency. Loss can be calculated from missing updates + values = f"{unix_time}:{latency_ms}:0" # 0% loss (could be enhanced) + + rrdtool.update(rrd_file, values) + logger.debug(f"Updated ping metrics {node_uuid}->{target_uuid}: {latency_ms}ms") + + except Exception as e: + logger.error(f"Failed to update ping metrics {node_uuid}->{target_uuid}: {e}") + raise + + def get_system_data(self, node_uuid: str, start_time: str = "-24h", + end_time: str = "now") -> Optional[Dict]: + """Retrieve system metrics data for a node.""" + try: + rrd_file = self._get_node_dir(node_uuid) / "system.rrd" + if not rrd_file.exists(): + return None + + result = rrdtool.fetch( + str(rrd_file), + "AVERAGE", + "--start", start_time, + "--end", end_time + ) + + # Parse RRDtool fetch result + start, end, step = result[0] + ds_names = result[1] # ['uptime', 'load1', 'load5', 'load15', 'memory'] + data_points = result[2] + + # Convert to more usable format + timestamps = [] + data = {ds: [] for ds in ds_names} + + current_time = start + for point in data_points: + timestamps.append(current_time) + for i, ds in enumerate(ds_names): + value = point[i] if point[i] is not None else 0 + data[ds].append(value) + current_time += step + + return { + 'timestamps': timestamps, + 'data': data, + 'step': step + } + + except Exception as e: + logger.error(f"Failed to get system data for {node_uuid}: {e}") + return None + + def get_ping_data(self, node_uuid: str, target_uuid: str, + start_time: str = "-24h", end_time: str = "now") -> Optional[Dict]: + """Retrieve ping metrics between two nodes.""" + try: + rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd" + if not rrd_file.exists(): + return None + + result = rrdtool.fetch( + str(rrd_file), + "AVERAGE", + "--start", start_time, + "--end", end_time + ) + + start, end, step = result[0] + ds_names = result[1] # ['latency', 'loss'] + data_points = result[2] + + timestamps = [] + data = {ds: [] for ds in ds_names} + + current_time = start + for point in data_points: + timestamps.append(current_time) + for i, ds in enumerate(ds_names): + value = point[i] if point[i] is not None else 0 + data[ds].append(value) + current_time += step + + return { + 'timestamps': timestamps, + 'data': data, + 'step': step + } + + except Exception as e: + logger.error(f"Failed to get ping data {node_uuid}->{target_uuid}: {e}") + return None + + def list_nodes(self) -> List[str]: + """Get list of all nodes with RRD data.""" + try: + nodes = [] + for item in self.data_dir.iterdir(): + if item.is_dir() and (item / "system.rrd").exists(): + nodes.append(item.name) + return nodes + except Exception as e: + logger.error(f"Failed to list nodes: {e}") + return [] + + def cleanup_old_data(self): + """Clean up RRD files older than 6 months (handled automatically by RRD retention).""" + # RRD automatically handles data retention based on RRA configuration + # This method could be used for cleaning up orphaned files + cutoff_date = datetime.now() - timedelta(days=190) # 6+ months + + try: + for node_dir in self.data_dir.iterdir(): + if not node_dir.is_dir(): + continue + + # Check if any RRD files have been modified recently + rrd_files = list(node_dir.glob("*.rrd")) + if not rrd_files: + continue + + # If all RRD files are old, the node is probably dead + all_old = all( + datetime.fromtimestamp(f.stat().st_mtime) < cutoff_date + for f in rrd_files + ) + + if all_old: + logger.info(f"Node {node_dir.name} appears inactive for >6 months") + # Could optionally remove the directory here + + except Exception as e: + logger.error(f"Failed during cleanup: {e}") diff --git a/app/main.py b/app/main.py index a55c0bc..c8b021d 100644 --- a/app/main.py +++ b/app/main.py @@ -1,15 +1,14 @@ - import os import uuid import json import logging -from datetime import datetime - +from datetime import datetime, timezone, timedelta from fastapi import FastAPI, Request, status from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates -from pydantic import BaseModel, Field, validator, constr, conlist -from typing import Dict, List +from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel, Field, validator +from typing import Dict, List, Annotated import uuid as uuid_lib from collections import deque @@ -44,7 +43,6 @@ class BufferHandler(logging.Handler): except Exception as e: # Log the error to stderr, to avoid recursion or filling the buffer with errors print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr) - # Optionally, you could log record.msg or record.exc_info here for more context class LogBuffer: @@ -60,7 +58,6 @@ class LogBuffer: 'message': record.get('message'), 'extra': {k: v for k, v in record.items() if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']} - # Added more common LogRecord attributes to exclude from 'extra' }) def get_logs(self, limit=100): @@ -80,10 +77,8 @@ formatter = jsonlogger.JsonFormatter( logHandler.setFormatter(formatter) # Add handlers to the root logger -# Avoid adding handlers multiple times in a uvicorn environment if not logger.handlers: logger.addHandler(logHandler) - # Add buffer handler to logger ONLY ONCE buffer_handler = BufferHandler() logger.addHandler(buffer_handler) @@ -97,15 +92,18 @@ app = FastAPI( # Configure templates for the web interface templates = Jinja2Templates(directory="app/web/templates") +# Mount static files directory +app.mount("/static", StaticFiles(directory="app/web/static"), name="static") + # --- Data Models (as defined in the project spec) --- class NodeStatusModel(BaseModel): uptime_seconds: int - load_avg: conlist(float, min_length=3, max_length=3) + load_avg: Annotated[List[float], Field(min_length=3, max_length=3)] memory_usage_percent: float class PingModel(BaseModel): - pings: Dict[constr(regex=r'^[0-9a-fA-F-]{36}$'), float] + pings: Dict[Annotated[str, Field(pattern=r'^[0-9a-fA-F-]{36}$')], float] class StatusUpdate(BaseModel): node: str = Field(..., description="Node UUID") @@ -131,15 +129,61 @@ class StatusUpdate(BaseModel): return v -# A mock database of known nodes for the auto-discovery demo -# In a real app, this would be managed more dynamically -known_nodes_db = {} +# --- Node Management and Health Logic --- +# A mock database of known nodes, now storing more comprehensive data +known_nodes_db: Dict[str, Dict] = {} + +# Health calculation constants (can be tuned) +LOAD_AVG_WARNING_THRESHOLD = 1.5 +LOAD_AVG_CRITICAL_THRESHOLD = 3.0 +MEMORY_WARNING_THRESHOLD = 75.0 +MEMORY_CRITICAL_THRESHOLD = 90.0 +# If a node hasn't reported in this many seconds, it's considered critical +LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30 + +def get_node_health(node_data: Dict) -> str: + """Calculates the health status based on node metrics and last seen time.""" + # Check for liveness first + last_seen_str = node_data.get("last_seen") + if last_seen_str: + last_seen_dt = datetime.fromisoformat(last_seen_str).replace(tzinfo=timezone.utc) + time_since_last_seen = (datetime.now(timezone.utc) - last_seen_dt).total_seconds() + if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS: + return "critical" # Node has not reported recently + else: + return "unknown" # Should not happen if 'last_seen' is always set + + status_model_data = node_data.get("status") + if not status_model_data: + # This could happen if a node is just discovered but hasn't sent a full status update yet + return "unknown" + + try: + status = NodeStatusModel(**status_model_data) + except Exception: + logger.error(f"Could not parse status data for node {node_data.get('uuid')}", exc_info=True) + return "unknown" # Or critical if parsing fails + + # Check load average (using 1-minute load for primary indicator) + load_1min = status.load_avg[0] + if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD: + return "critical" + elif load_1min >= LOAD_AVG_WARNING_THRESHOLD: + return "warning" + + # Check memory usage + if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD: + return "critical" + elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD: + return "warning" + + return "healthy" # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): - """Serves the main web page which displays the Service UUID.""" + """Serves the main web page which displays the Service UUID and the node grid.""" logger.info( "Web root accessed", extra={'client_ip': request.client.host, 'service_uuid': SERVICE_UUID} @@ -149,7 +193,6 @@ async def read_root(request: Request): {"request": request, "service_uuid": SERVICE_UUID} ) -# Add the logs endpoint @app.get("/{service_uuid}/logs") async def get_logs(service_uuid: str, limit: int = 100): """Get recent logs for the service.""" @@ -175,7 +218,6 @@ async def update_node_status( request: Request ): """Receives status updates from a node and returns a list of peers.""" - # Log the incoming status update with structured context logger.info( "Received node status update", extra={ @@ -205,7 +247,7 @@ async def update_node_status( ) # Update ping metrics - for target_uuid, latency in status_update.pings.pings.items(): + for target_uuid, latency in status_update.pings.items(): database.update_ping_metrics( node_uuid=node_uuid, target_uuid=target_uuid, @@ -214,20 +256,51 @@ async def update_node_status( ) except Exception as e: - logger.error(f"Database update failed: {e}") + logger.error(f"Database update failed: {e}", exc_info=True) # Continue processing even if DB update fails - # Auto-discovery logic - if node_uuid not in known_nodes_db: - logger.info(f"New node discovered: {node_uuid}") - # A real app would need a strategy to handle node addresses - known_nodes_db[node_uuid] = {"last_seen": datetime.utcnow().isoformat(), "ip": request.client.host} + # Auto-discovery logic and update known_nodes_db with full status + current_time_utc = datetime.now(timezone.utc) + + known_nodes_db[node_uuid] = { + "last_seen": current_time_utc.isoformat(), + "ip": request.client.host, + "status": status_update.status.dict(), # Store the dict representation + # Store direct values for convenience in /nodes/status endpoint + "uptime_seconds": status_update.status.uptime_seconds, + "load_avg": status_update.status.load_avg, + "memory_usage_percent": status_update.status.memory_usage_percent + } + + # Calculate health for logging purposes (it will be recalculated for /nodes/status) + health_status_for_log = get_node_health(known_nodes_db[node_uuid]) + logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}") # Respond with the list of other known peers - peer_list = {uuid: data for uuid, data in known_nodes_db.items() if uuid != node_uuid} + peer_list = {uuid: {"last_seen": data["last_seen"], "ip": data["ip"]} + for uuid, data in known_nodes_db.items() if uuid != node_uuid} return {"message": "Status received", "peers": peer_list} +@app.get("/nodes/status") +async def get_all_nodes_status(): + """Returns the current status of all known nodes for the UI.""" + logger.info("Fetching all nodes status for UI.") + response_nodes = [] + for node_uuid, data in known_nodes_db.items(): + # Dynamically calculate health for each node based on its current data + current_health = get_node_health(data) + + response_nodes.append({ + "uuid": node_uuid, + "last_seen": data["last_seen"], + "ip": data["ip"], + "health_status": current_health, + "uptime_seconds": data.get("uptime_seconds"), + "load_avg": data.get("load_avg"), + "memory_usage_percent": data.get("memory_usage_percent") + }) + return {"nodes": response_nodes} @app.get("/health") async def health_check(): diff --git a/app/web/static/script.js b/app/web/static/script.js new file mode 100644 index 0000000..74d50ab --- /dev/null +++ b/app/web/static/script.js @@ -0,0 +1,73 @@ +document.addEventListener('DOMContentLoaded', () => { + const nodeGridContainer = document.getElementById('node-grid-container'); + const nodeCountSpan = document.getElementById('node-count'); + const POLLING_INTERVAL_MS = 3000; // Poll every 3 seconds + + async function fetchNodeData() { + try { + const response = await fetch('/nodes/status'); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const data = await response.json(); + renderNodeGrid(data.nodes); + } catch (error) { + console.error("Error fetching node data:", error); + nodeGridContainer.innerHTML = '
'; + } + } + + function renderNodeGrid(nodes) { + nodeGridContainer.innerHTML = ''; // Clear existing nodes + nodeCountSpan.textContent = nodes.length; // Update total node count + + if (nodes.length === 0) { + nodeGridContainer.innerHTML = ' '; + return; + } + + nodes.forEach(node => { + const nodeCell = document.createElement('div'); + nodeCell.classList.add('node-cell'); + nodeCell.classList.add(`node-${node.health_status}`); // Apply health color class + + // Truncate UUID for display + const displayUuid = node.uuid.substring(0, 8) + '...'; + + nodeCell.innerHTML = ` +UUID: ${node.uuid}
+IP: ${node.ip}
+Last Seen: ${new Date(node.last_seen).toLocaleTimeString()}
+Uptime: ${node.uptime_seconds ? formatUptime(node.uptime_seconds) : 'N/A'}
+Load Avg (1m, 5m, 15m): ${node.load_avg ? node.load_avg.join(', ') : 'N/A'}
+Memory Usage: ${node.memory_usage_percent ? node.memory_usage_percent.toFixed(2) + '%' : 'N/A'}
+Service ID: {{ service_uuid }}
Total Nodes: 0
+