diff --git a/app/database.py b/app/database.py index e5d11d6..191afa8 100644 --- a/app/database.py +++ b/app/database.py @@ -161,7 +161,7 @@ class RRDDatabase: for point in data_points: timestamps.append(current_time) for i, ds in enumerate(ds_names): - value = point[i] if point[i] is not None else 0 + value = point[i] if point[i] is not None else None # Changed 0 to None for better representation data[ds].append(value) current_time += step @@ -201,7 +201,7 @@ class RRDDatabase: for point in data_points: timestamps.append(current_time) for i, ds in enumerate(ds_names): - value = point[i] if point[i] is not None else 0 + value = point[i] if point[i] is not None else None # Changed 0 to None for better representation data[ds].append(value) current_time += step diff --git a/app/main.py b/app/main.py index 7dc1741..41c07de 100644 --- a/app/main.py +++ b/app/main.py @@ -2,8 +2,8 @@ import os import uuid import json import logging -from datetime import datetime, timezone, timedelta -from fastapi import FastAPI, Request, status +from datetime import datetime, timezone +from fastapi import FastAPI, Request, status, Query from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles @@ -19,38 +19,33 @@ import sys from .database import RRDDatabase # --- Service Configuration --- -# Generate a unique Service UUID on startup, or get it from an environment variable SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4())) database = RRDDatabase() # --- Logging Configuration --- -# Get the root logger logger = logging.getLogger() logger.setLevel(logging.INFO) -# Custom handler to capture logs class BufferHandler(logging.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # Instantiate the formatter once for efficiency self.formatter = jsonlogger.JsonFormatter() def emit(self, record): try: - # Format the record as JSON string, then parse to dict + # Format the record as a JSON string and then parse it back to a dict + # This ensures consistency with the jsonlogger's output format log_entry = json.loads(self.formatter.format(record)) log_buffer.add_log(log_entry) except Exception as e: - # Log the error to stderr, to avoid recursion or filling the buffer with errors print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr) - class LogBuffer: def __init__(self, maxlen=1000): self.buffer = deque(maxlen=maxlen) def add_log(self, record): - # Ensure 'asctime' is present or handle its absence + # Assuming 'record' here is already a dictionary parsed from the JSON log string timestamp = record.get('asctime') or datetime.utcnow().isoformat() self.buffer.append({ 'timestamp': timestamp, @@ -60,28 +55,40 @@ class LogBuffer: if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']} }) - def get_logs(self, limit=100): - return list(self.buffer)[-limit:] + def get_logs(self, limit=100, level=None, since=None): + logger.debug(f"Fetching logs with limit={limit}, level={level}, since={since}") + logs = list(self.buffer) + # Apply level filter + if level and level.strip(): + level = level.upper() + valid_levels = {'INFO', 'WARNING', 'ERROR', 'DEBUG'} # Added DEBUG for completeness + if level in valid_levels: + logs = [log for log in logs if log['level'].upper() == level] + else: + logger.warning(f"Invalid log level: {level}") + # Apply since filter + if since: + try: + # Handle 'Z' for UTC and ensure timezone awareness for comparison + since_dt = datetime.fromisoformat(since.replace('Z', '+00:00')).astimezone(timezone.utc) + logs = [log for log in logs if + datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00')).astimezone(timezone.utc) >= since_dt] + except ValueError: + logger.warning(f"Invalid 'since' timestamp: {since}") + logger.debug(f"Returning {len(logs[-limit:])} logs") + return logs[-limit:] -# Create global log buffer log_buffer = LogBuffer() -# Use a handler that streams to stdout logHandler = logging.StreamHandler() - -# Create a JSON formatter and add it to the handler -# The format string adds default log attributes to the JSON output formatter = jsonlogger.JsonFormatter( '%(asctime)s %(name)s %(levelname)s %(message)s' ) logHandler.setFormatter(formatter) -# Add handlers to the root logger if not logger.handlers: logger.addHandler(logHandler) - buffer_handler = BufferHandler() - logger.addHandler(buffer_handler) - + logger.addHandler(BufferHandler()) # --- FastAPI Application --- app = FastAPI( @@ -89,14 +96,10 @@ app = FastAPI( description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}" ) -# Configure templates for the web interface templates = Jinja2Templates(directory="app/web/templates") - -# Mount static files directory app.mount("/static", StaticFiles(directory="app/web/static"), name="static") - -# --- Data Models (as defined in the project spec) --- +# --- Data Models --- class NodeStatusModel(BaseModel): uptime_seconds: int load_avg: Annotated[List[float], Field(min_length=3, max_length=3)] @@ -128,50 +131,41 @@ class StatusUpdate(BaseModel): raise ValueError(f'Invalid UUID format in pings: {key}') return v - # --- Node Management and Health Logic --- -# A mock database of known nodes, now storing more comprehensive data known_nodes_db: Dict[str, Dict] = {} -# Health calculation constants (can be tuned) LOAD_AVG_WARNING_THRESHOLD = 1.5 LOAD_AVG_CRITICAL_THRESHOLD = 3.0 MEMORY_WARNING_THRESHOLD = 75.0 MEMORY_CRITICAL_THRESHOLD = 90.0 -# If a node hasn't reported in this many seconds, it's considered critical -LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30 +LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30 def get_node_health(node_data: Dict) -> str: - """Calculates the health status based on node metrics and last seen time.""" - # Check for liveness first last_seen_str = node_data.get("last_seen") if last_seen_str: last_seen_dt = datetime.fromisoformat(last_seen_str).replace(tzinfo=timezone.utc) time_since_last_seen = (datetime.now(timezone.utc) - last_seen_dt).total_seconds() if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS: - return "critical" # Node has not reported recently + return "critical" else: - return "unknown" # Should not happen if 'last_seen' is always set + return "unknown" status_model_data = node_data.get("status") if not status_model_data: - # This could happen if a node is just discovered but hasn't sent a full status update yet - return "unknown" + return "unknown" try: status = NodeStatusModel(**status_model_data) except Exception: logger.error(f"Could not parse status data for node {node_data.get('uuid')}", exc_info=True) - return "unknown" # Or critical if parsing fails + return "unknown" - # Check load average (using 1-minute load for primary indicator) load_1min = status.load_avg[0] if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD: return "critical" elif load_1min >= LOAD_AVG_WARNING_THRESHOLD: return "warning" - # Check memory usage if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD: return "critical" elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD: @@ -179,53 +173,108 @@ def get_node_health(node_data: Dict) -> str: return "healthy" - # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): - """Serves the main web page which displays the Service UUID and the node grid.""" + # Use X-Forwarded-For if available, otherwise client.host + client_ip = request.headers.get("x-forwarded-for", request.client.host) logger.info( "Web root accessed", - extra={'client_ip': request.client.host, 'service_uuid': SERVICE_UUID} + extra={'client_ip': client_ip, 'service_uuid': SERVICE_UUID} ) return templates.TemplateResponse( "index.html", - {"request": request, "service_uuid": SERVICE_UUID} + { + "request": request, + "service_uuid": SERVICE_UUID, + "url_for": request.url_for, # Pass url_for for dynamic URL generation + "root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL + } + ) + +@app.get("/{service_uuid}/logs") +async def get_logs( + request: Request, + service_uuid: str, + limit: int = 100, + format: str = Query(None, description="Response format: 'json' for JSON, default is HTML"), + level: str = Query(None, description="Filter logs by level: INFO, WARNING, ERROR"), + since: str = Query(None, description="Fetch logs since ISO timestamp, e.g., 2025-06-11T13:32:00") +): + # Use X-Forwarded-For if available, otherwise client.host + client_ip = request.headers.get("x-forwarded-for", request.client.host) + logger.info( + "Logs endpoint accessed", + extra={ + 'service_uuid': service_uuid, + 'format': format, + 'level': level, + 'since': since, + 'limit': limit, + 'client_ip': client_ip + } ) -@app.get("/{service_uuid}/logs", response_class=HTMLResponse) -async def get_logs(request: Request, service_uuid: str, limit: int = 100): - """Serve the logs web page with recent logs for the service.""" if service_uuid != SERVICE_UUID: + logger.warning(f"Invalid service UUID: {service_uuid}") return JSONResponse( status_code=404, content={"error": "Service UUID not found"} ) - - logs = log_buffer.get_logs(limit) - return templates.TemplateResponse( - "logs.html", - { - "request": request, - "service_uuid": service_uuid, - "logs": logs, - "log_count": len(logs) - } - ) -@app.put("/{service_uuid}/{node_uuid}/", status_code=status.HTTP_200_OK) + try: + logs = log_buffer.get_logs(limit=limit, level=level, since=since) + log_data = { + "service_uuid": service_uuid, + "log_count": len(logs), + "logs": logs + } + logger.debug(f"Fetched {len(logs)} logs for response") + except Exception as e: + logger.error(f"Error fetching logs: {e}", exc_info=True) + return JSONResponse( + status_code=500, + content={"error": "Failed to fetch logs"} + ) + + if format == "json": + logger.debug("Returning JSON response") + return JSONResponse(content=log_data) + + logger.debug("Rendering logs.html template") + try: + return templates.TemplateResponse( + "logs.html", + { + "request": request, + "service_uuid": service_uuid, + "logs": logs, + "log_count": len(logs), + "url_for": request.url_for, # Pass url_for for dynamic URL generation + "root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL + } + ) + except Exception as e: + logger.error(f"Error rendering logs.html: {e}", exc_info=True) + return JSONResponse( + status_code=500, + content={"error": "Failed to render logs page"} + ) + +@app.put("/{service_uuid}/{node_uuid}/") async def update_node_status( service_uuid: str, node_uuid: str, status_update: StatusUpdate, request: Request ): - """Receives status updates from a node and returns a list of peers.""" + # Use X-Forwarded-For if available, otherwise client.host + client_ip = request.headers.get("x-forwarded-for", request.client.host) logger.info( "Received node status update", extra={ 'event_type': 'node_status_update', - 'client_ip': request.client.host, + 'client_ip': client_ip, 'service_uuid': service_uuid, 'node_uuid': node_uuid, 'data': status_update.dict() @@ -239,7 +288,6 @@ async def update_node_status( ) return {"error": "Service UUID mismatch", "peers": []} - # Update RRD database with system metrics try: database.update_system_metrics( node_uuid=node_uuid, @@ -248,8 +296,7 @@ async def update_node_status( load_avg=status_update.status.load_avg, memory_usage_percent=status_update.status.memory_usage_percent ) - - # Update ping metrics + for target_uuid, latency in status_update.pings.items(): database.update_ping_metrics( node_uuid=node_uuid, @@ -257,29 +304,24 @@ async def update_node_status( timestamp=status_update.timestamp, latency_ms=latency ) - + except Exception as e: logger.error(f"Database update failed: {e}", exc_info=True) - # Continue processing even if DB update fails - # Auto-discovery logic and update known_nodes_db with full status current_time_utc = datetime.now(timezone.utc) - + known_nodes_db[node_uuid] = { "last_seen": current_time_utc.isoformat(), - "ip": request.client.host, - "status": status_update.status.dict(), # Store the dict representation - # Store direct values for convenience in /nodes/status endpoint + "ip": request.client.host, # Keep original client.host here as it's the direct connection + "status": status_update.status.dict(), "uptime_seconds": status_update.status.uptime_seconds, "load_avg": status_update.status.load_avg, "memory_usage_percent": status_update.status.memory_usage_percent } - - # Calculate health for logging purposes (it will be recalculated for /nodes/status) + health_status_for_log = get_node_health(known_nodes_db[node_uuid]) logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}") - # Respond with the list of other known peers peer_list = {uuid: {"last_seen": data["last_seen"], "ip": data["ip"]} for uuid, data in known_nodes_db.items() if uuid != node_uuid} @@ -287,25 +329,21 @@ async def update_node_status( @app.get("/nodes/status") async def get_all_nodes_status(): - """Returns the current status of all known nodes for the UI, including ping latencies.""" logger.info("Fetching all nodes status for UI.") response_nodes = [] for node_uuid, data in known_nodes_db.items(): - # Dynamically calculate health for each node current_health = get_node_health(data) - # Build connections dictionary with raw ping latencies connections = {} for target_uuid in known_nodes_db: - if target_uuid != node_uuid: # Exclude self - # Fetch recent ping data (last 5 minutes to account for RRD step=60s) + if target_uuid != node_uuid: ping_data = database.get_ping_data(node_uuid, target_uuid, start_time="-300s") latency_ms = None if ping_data and ping_data['data']['latency']: - # Get the most recent non-null latency + # Get the most recent non-None latency for latency in reversed(ping_data['data']['latency']): - if latency is not None: + if latency is not None and not (isinstance(latency, float) and latency == 0.0): # Exclude 0.0 which might be a default latency_ms = float(latency) break connections[target_uuid] = latency_ms @@ -324,5 +362,4 @@ async def get_all_nodes_status(): @app.get("/health") async def health_check(): - """Health check endpoint for container orchestration.""" return {"status": "ok"} diff --git a/app/web/static/logs.js b/app/web/static/logs.js index 80b7c7b..9653288 100644 --- a/app/web/static/logs.js +++ b/app/web/static/logs.js @@ -1,16 +1,34 @@ document.addEventListener('DOMContentLoaded', () => { const logTableContainer = document.getElementById('log-table-container'); const logCountSpan = document.getElementById('log-count'); - const POLLING_INTERVAL_MS = 5000; // Poll every 5 seconds - const serviceUuid = logTableContainer.dataset.serviceUuid || '{{ service_uuid }}'; // Fallback for non-dynamic rendering + const levelRadios = document.querySelectorAll('input[name="log-level"]'); + const sinceFilter = document.getElementById('since-filter'); + const applyFiltersButton = document.getElementById('apply-filters'); + const POLLING_INTERVAL_MS = 5000; + const serviceUuid = logTableContainer.dataset.serviceUuid; // Get service UUID from data attribute + + let currentLevel = ''; + let currentSince = ''; async function fetchLogs() { + console.log('Fetching logs with params:', { level: currentLevel, since: currentSince }); try { - const response = await fetch(`/${serviceUuid}/logs?limit=100`); + const params = new URLSearchParams({ + format: 'json', + limit: '100' + }); + if (currentLevel) params.append('level', currentLevel); + if (currentSince) params.append('since', currentSince); + // Use window.API_BASE_PATH for dynamic base URL + const url = `${window.API_BASE_PATH}/${serviceUuid}/logs?${params.toString()}`; + console.log('Fetch URL:', url); + const response = await fetch(url); + console.log('Response status:', response.status); if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } const data = await response.json(); + console.log('Received logs:', data.logs.length); renderLogTable(data.logs); logCountSpan.textContent = data.log_count; } catch (error) { @@ -20,18 +38,17 @@ document.addEventListener('DOMContentLoaded', () => { } function renderLogTable(logs) { - logTableContainer.innerHTML = ''; // Clear existing content + console.log('Rendering logs:', logs.length); + logTableContainer.innerHTML = ''; if (logs.length === 0) { logTableContainer.innerHTML = '
'; return; } - // Create table const table = document.createElement('table'); table.classList.add('log-table'); - // Create header const thead = document.createElement('thead'); const headerRow = document.createElement('tr'); const headers = [ @@ -52,7 +69,6 @@ document.addEventListener('DOMContentLoaded', () => { thead.appendChild(headerRow); table.appendChild(thead); - // Create body const tbody = document.createElement('tbody'); logs.forEach(log => { const row = document.createElement('tr'); @@ -73,7 +89,6 @@ document.addEventListener('DOMContentLoaded', () => { logTableContainer.appendChild(table); - // Add click handlers for JSON toggles document.querySelectorAll('.json-toggle').forEach(toggle => { toggle.addEventListener('click', () => { const jsonContent = toggle.nextElementSibling; @@ -120,7 +135,17 @@ document.addEventListener('DOMContentLoaded', () => { return div.innerHTML; } - // Initial fetch and polling + applyFiltersButton.addEventListener('click', () => { + const selectedRadio = document.querySelector('input[name="log-level"]:checked'); + currentLevel = selectedRadio ? selectedRadio.value : ''; + const sinceValue = sinceFilter.value; + // Convert local datetime input to ISO string for backend, handling potential timezone issues + currentSince = sinceValue ? new Date(sinceValue).toISOString().replace(/\.\d{3}Z$/, 'Z') : ''; // Ensure 'Z' for UTC + console.log('Applying filters:', { level: currentLevel, since: currentSince }); + fetchLogs(); + }); + + console.log('Initializing logs page'); fetchLogs(); setInterval(fetchLogs, POLLING_INTERVAL_MS); -}); \ No newline at end of file +}); diff --git a/app/web/static/script.js b/app/web/static/script.js index 179a43c..670205f 100644 --- a/app/web/static/script.js +++ b/app/web/static/script.js @@ -1,12 +1,12 @@ document.addEventListener('DOMContentLoaded', () => { const nodeGridContainer = document.getElementById('node-grid-container'); const nodeCountSpan = document.getElementById('node-count'); - const serviceUuidPara = document.getElementById('service-uuid'); const POLLING_INTERVAL_MS = 3000; // Poll every 3 seconds async function fetchNodeData() { try { - const response = await fetch('/nodes/status'); + // Use window.API_BASE_PATH for dynamic base URL + const response = await fetch(`${window.API_BASE_PATH}/nodes/status`); if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } @@ -15,29 +15,24 @@ document.addEventListener('DOMContentLoaded', () => { } catch (error) { console.error("Error fetching node data:", error); nodeGridContainer.innerHTML = ' '; - serviceUuidPara.style.display = 'block'; // Show UUID on error } } function renderNodeGrid(nodes) { nodeGridContainer.innerHTML = ''; // Clear existing content nodeCountSpan.textContent = nodes.length; // Update total node count - serviceUuidPara.style.display = nodes.length === 0 ? 'block' : 'none'; // Toggle Service UUID if (nodes.length === 0) { nodeGridContainer.innerHTML = ' '; return; } - // Ensure we have exactly 4 nodes - if (nodes.length !== 4) { - nodeGridContainer.innerHTML = ' '; - return; - } - - // Create a 4x4 grid container + // Create a dynamic grid container const grid = document.createElement('div'); grid.classList.add('connection-grid'); + // Dynamically set grid columns based on number of nodes + 1 for the header column + // minmax(100px, 1fr) for the row header, then repeat for each node column + grid.style.gridTemplateColumns = `minmax(100px, 1fr) repeat(${nodes.length}, minmax(100px, 1fr))`; // Add header row for column UUIDs const headerRow = document.createElement('div'); @@ -52,7 +47,7 @@ document.addEventListener('DOMContentLoaded', () => { }); grid.appendChild(headerRow); - // Create rows for the 4x4 grid + // Create rows for the grid nodes.forEach((rowNode, rowIndex) => { const row = document.createElement('div'); row.classList.add('grid-row'); @@ -78,7 +73,7 @@ document.addEventListener('DOMContentLoaded', () => {IP: ${rowNode.ip}
Last Seen: ${new Date(rowNode.last_seen).toLocaleTimeString()}
Uptime: ${rowNode.uptime_seconds ? formatUptime(rowNode.uptime_seconds) : 'N/A'}
-Load Avg (1m, 5m, 15m): ${rowNode.load_avg ? rowNode.load_avg.join(', ') : 'N/A'}
+Load Avg (1m, 5m, 15m): ${rowNode.load_avg ? rowNode.load_avg.map(l => l.toFixed(2)).join(', ') : 'N/A'}
Memory Usage: ${rowNode.memory_usage_percent ? rowNode.memory_usage_percent.toFixed(2) + '%' : 'N/A'}
`; @@ -91,8 +86,8 @@ document.addEventListener('DOMContentLoaded', () => { cell.innerHTML = `From: ${rowNode.uuid}
-to: ${colNode.uuid}
+From: ${rowNode.uuid.substring(0, 8)}...
+to: ${colNode.uuid.substring(0, 8)}...
Latency: ${displayLatency}
Total Nodes: 0
- +Service UUID: {{ service_uuid }}