diff --git a/Dockerfile b/Dockerfile index e399937..f7e1b46 100644 --- a/Dockerfile +++ b/Dockerfile @@ -75,4 +75,4 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD wget --no-verbose --tries=1 --spider http://localhost:8000/health || exit 1 # Run the application using the virtual environment's python interpreter -CMD ["/opt/venv/bin/python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["/opt/venv/bin/python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips", "*"] diff --git a/app/database.py b/app/database.py index 191afa8..43289c1 100644 --- a/app/database.py +++ b/app/database.py @@ -12,9 +12,9 @@ class RRDDatabase: # Use environment variable or default to /data if data_dir is None: data_dir = os.environ.get("DATA_DIR", "/data") - + self.data_dir = Path(data_dir) - + # Create data directory if it doesn't exist try: self.data_dir.mkdir(parents=True, exist_ok=True) @@ -29,11 +29,11 @@ class RRDDatabase: # RRD configuration self.step = 60 # 1-minute intervals self.heartbeat = 120 # 2-minute heartbeat (allow 1 missed update) - + # Retention policy (6 months total) self.rra_config = [ "RRA:AVERAGE:0.5:1:1440", # 1-min avg for 24 hours (1440 points) - "RRA:AVERAGE:0.5:60:744", # 1-hour avg for 31 days (744 points) + "RRA:AVERAGE:0.5:60:744", # 1-hour avg for 31 days (744 points) "RRA:AVERAGE:0.5:1440:180", # 1-day avg for 6 months (180 points) "RRA:MAX:0.5:1:1440", # 1-min max for 24 hours "RRA:MAX:0.5:60:744", # 1-hour max for 31 days @@ -49,10 +49,10 @@ class RRDDatabase: def _create_system_rrd(self, node_uuid: str) -> str: """Create RRD file for system metrics (uptime, load, memory).""" rrd_file = self._get_node_dir(node_uuid) / "system.rrd" - + if rrd_file.exists(): return str(rrd_file) - + try: rrdtool.create( str(rrd_file), @@ -60,7 +60,7 @@ class RRDDatabase: # Data sources f"DS:uptime:GAUGE:{self.heartbeat}:0:U", # Uptime in seconds f"DS:load1:GAUGE:{self.heartbeat}:0:100", # 1-min load average - f"DS:load5:GAUGE:{self.heartbeat}:0:100", # 5-min load average + f"DS:load5:GAUGE:{self.heartbeat}:0:100", # 5-min load average f"DS:load15:GAUGE:{self.heartbeat}:0:100", # 15-min load average f"DS:memory:GAUGE:{self.heartbeat}:0:100", # Memory usage % # Round Robin Archives @@ -75,10 +75,10 @@ class RRDDatabase: def _create_ping_rrd(self, node_uuid: str, target_uuid: str) -> str: """Create RRD file for ping metrics between two nodes.""" rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd" - + if rrd_file.exists(): return str(rrd_file) - + try: rrdtool.create( str(rrd_file), @@ -95,68 +95,68 @@ class RRDDatabase: logger.error(f"Failed to create ping RRD for {node_uuid}->{target_uuid}: {e}") raise - def update_system_metrics(self, node_uuid: str, timestamp: datetime, - uptime_seconds: int, load_avg: List[float], + def update_system_metrics(self, node_uuid: str, timestamp: datetime, + uptime_seconds: int, load_avg: List[float], memory_usage_percent: float): """Update system metrics for a node.""" try: rrd_file = self._create_system_rrd(node_uuid) - + # Convert datetime to Unix timestamp unix_time = int(timestamp.timestamp()) - + # Format: timestamp:uptime:load1:load5:load15:memory values = f"{unix_time}:{uptime_seconds}:{load_avg[0]}:{load_avg[1]}:{load_avg[2]}:{memory_usage_percent}" - + rrdtool.update(rrd_file, values) logger.debug(f"Updated system metrics for {node_uuid}: {values}") - + except Exception as e: logger.error(f"Failed to update system metrics for {node_uuid}: {e}") raise - def update_ping_metrics(self, node_uuid: str, target_uuid: str, + def update_ping_metrics(self, node_uuid: str, target_uuid: str, timestamp: datetime, latency_ms: float): """Update ping metrics between two nodes.""" try: rrd_file = self._create_ping_rrd(node_uuid, target_uuid) - + unix_time = int(timestamp.timestamp()) - + # For now, we only track latency. Loss can be calculated from missing updates values = f"{unix_time}:{latency_ms}:0" # 0% loss (could be enhanced) - + rrdtool.update(rrd_file, values) logger.debug(f"Updated ping metrics {node_uuid}->{target_uuid}: {latency_ms}ms") - + except Exception as e: logger.error(f"Failed to update ping metrics {node_uuid}->{target_uuid}: {e}") raise - def get_system_data(self, node_uuid: str, start_time: str = "-24h", + def get_system_data(self, node_uuid: str, start_time: str = "-24h", end_time: str = "now") -> Optional[Dict]: """Retrieve system metrics data for a node.""" try: rrd_file = self._get_node_dir(node_uuid) / "system.rrd" if not rrd_file.exists(): return None - + result = rrdtool.fetch( str(rrd_file), "AVERAGE", "--start", start_time, "--end", end_time ) - + # Parse RRDtool fetch result start, end, step = result[0] ds_names = result[1] # ['uptime', 'load1', 'load5', 'load15', 'memory'] data_points = result[2] - + # Convert to more usable format timestamps = [] data = {ds: [] for ds in ds_names} - + current_time = start for point in data_points: timestamps.append(current_time) @@ -164,39 +164,39 @@ class RRDDatabase: value = point[i] if point[i] is not None else None # Changed 0 to None for better representation data[ds].append(value) current_time += step - + return { - 'timestamps': timestamps, - 'data': data, - 'step': step + 'timestamps': timestamps, + 'data': data, + 'step': step } - + except Exception as e: logger.error(f"Failed to get system data for {node_uuid}: {e}") return None - def get_ping_data(self, node_uuid: str, target_uuid: str, + def get_ping_data(self, node_uuid: str, target_uuid: str, start_time: str = "-24h", end_time: str = "now") -> Optional[Dict]: """Retrieve ping metrics between two nodes.""" try: rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd" if not rrd_file.exists(): return None - + result = rrdtool.fetch( str(rrd_file), - "AVERAGE", + "AVERAGE", "--start", start_time, "--end", end_time ) - + start, end, step = result[0] ds_names = result[1] # ['latency', 'loss'] data_points = result[2] - + timestamps = [] data = {ds: [] for ds in ds_names} - + current_time = start for point in data_points: timestamps.append(current_time) @@ -204,13 +204,13 @@ class RRDDatabase: value = point[i] if point[i] is not None else None # Changed 0 to None for better representation data[ds].append(value) current_time += step - + return { - 'timestamps': timestamps, - 'data': data, - 'step': step + 'timestamps': timestamps, + 'data': data, + 'step': step } - + except Exception as e: logger.error(f"Failed to get ping data {node_uuid}->{target_uuid}: {e}") return None @@ -232,26 +232,26 @@ class RRDDatabase: # RRD automatically handles data retention based on RRA configuration # This method could be used for cleaning up orphaned files cutoff_date = datetime.now() - timedelta(days=190) # 6+ months - + try: for node_dir in self.data_dir.iterdir(): if not node_dir.is_dir(): continue - + # Check if any RRD files have been modified recently rrd_files = list(node_dir.glob("*.rrd")) if not rrd_files: continue - + # If all RRD files are old, the node is probably dead all_old = all( - datetime.fromtimestamp(f.stat().st_mtime) < cutoff_date + datetime.fromtimestamp(f.stat().st_mtime) < cutoff_date for f in rrd_files ) - + if all_old: logger.info(f"Node {node_dir.name} appears inactive for >6 months") # Could optionally remove the directory here - + except Exception as e: logger.error(f"Failed during cleanup: {e}") diff --git a/app/main.py b/app/main.py index 6ba280d..3ab3c3c 100644 --- a/app/main.py +++ b/app/main.py @@ -8,7 +8,7 @@ from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field, validator -from typing import Dict, List, Annotated +from typing import Dict, List, Annotated, Optional import uuid as uuid_lib from collections import deque @@ -32,14 +32,17 @@ class BufferHandler(logging.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Use the same formatter string as the StreamHandler for consistency + # Ensure asctime is formatted as ISO 8601 UTC with milliseconds and 'Z' self.formatter = jsonlogger.JsonFormatter( - "%(asctime)s %(name)s %(levelname)s %(message)s" + "%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format with milliseconds and Z for UTC ) def emit(self, record): try: log_entry_str = self.formatter.format(record) log_entry = json.loads(log_entry_str) + # The 'asctime' field in log_entry is now guaranteed to be ISO 8601 log_buffer.add_log(log_entry) except Exception as e: print( @@ -53,14 +56,30 @@ class LogBuffer: self.buffer = deque(maxlen=maxlen) def add_log(self, record): - # Assuming 'record' here is already a dictionary parsed from the JSON log string - timestamp = record.get("asctime") or datetime.utcnow().isoformat() + # 'record' is a dictionary parsed from the JSON log string. + # 'asctime' should now be in ISO 8601 format due to BufferHandler's formatter. + timestamp_str = record.get("asctime") + if timestamp_str: + try: + # Use isoparse for robust parsing, then convert to UTC and store as ISO 8601 with 'Z' + dt_obj = isoparse(timestamp_str) + if dt_obj.tzinfo is None: + # Assume UTC if naive (common for logs without explicit timezone info) + dt_obj = dt_obj.replace(tzinfo=timezone.utc) + else: + # Convert to UTC for consistent storage + dt_obj = dt_obj.astimezone(timezone.utc) + timestamp_to_store = dt_obj.isoformat(timespec='milliseconds').replace('+00:00', 'Z') + except ValueError: + logger.warning(f"Failed to parse log timestamp '{timestamp_str}' from formatter. Using current UTC time.") + timestamp_to_store = datetime.utcnow().isoformat(timespec='milliseconds') + 'Z' + else: + timestamp_to_store = datetime.utcnow().isoformat(timespec='milliseconds') + 'Z' + self.buffer.append( { - "timestamp": timestamp, - "level": record.get( - "levelname" - ), # This should now correctly get 'levelname' + "timestamp": timestamp_to_store, + "level": record.get("levelname"), "message": record.get("message"), "extra": { k: v @@ -111,9 +130,8 @@ class LogBuffer: logs = [ log for log in logs - if datetime.fromisoformat( - log["timestamp"].replace("Z", "+00:00") - ).astimezone(timezone.utc) + # log["timestamp"] is now guaranteed to be ISO 8601 with 'Z' + if isoparse(log["timestamp"]).astimezone(timezone.utc) >= since_dt ] except ValueError: @@ -125,7 +143,11 @@ class LogBuffer: log_buffer = LogBuffer() logHandler = logging.StreamHandler() -formatter = jsonlogger.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s") +# Ensure StreamHandler also formats asctime into ISO 8601 UTC +formatter = jsonlogger.JsonFormatter( + "%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format with milliseconds and Z for UTC +) logHandler.setFormatter(formatter) if not logger.handlers: @@ -148,6 +170,10 @@ app = FastAPI( templates = Jinja2Templates(directory="app/web/templates") app.mount("/static", StaticFiles(directory="app/web/static"), name="static") +# To correctly handle HTTPS behind a reverse proxy, ensure your Uvicorn server +# is run with --proxy-headers and --forwarded-allow-ips. +# e.g., uvicorn main:app --host 0.0.0.0 --port 8000 --proxy-headers --forwarded-allow-ips '*' + # --- Data Models --- class NodeStatusModel(BaseModel): @@ -238,6 +264,30 @@ def get_node_health(node_data: Dict) -> str: return "healthy" +def format_uptime(seconds: Optional[int]) -> str: + """Formats uptime in seconds into a human-readable string (e.g., "1d 2h 3m 4s").""" + if seconds is None: + return "N/A" + days = seconds // (3600 * 24) + seconds %= (3600 * 24) + hours = seconds // 3600 + seconds %= 3600 + minutes = seconds // 60 + remaining_seconds = seconds % 60 + + parts = [] + if days > 0: + parts.append(f"{days}d") + if hours > 0: + parts.append(f"{hours}h") + if minutes > 0: + parts.append(f"{minutes}m") + # Always include seconds if no other parts, or if there are remaining seconds + if remaining_seconds > 0 or not parts: + parts.append(f"{remaining_seconds}s") + return " ".join(parts) + + # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): @@ -247,15 +297,71 @@ async def read_root(request: Request): "Web root accessed", extra={"client_ip": client_ip, "service_uuid": SERVICE_UUID}, ) + + # --- Prepare initial node data for server-side rendering --- + current_time_utc = datetime.now(timezone.utc) + nodes_to_remove = [] + for node_uuid, data in known_nodes_db.items(): + last_seen_dt = datetime.fromisoformat(data["last_seen"]).replace( + tzinfo=timezone.utc + ) + if ( + current_time_utc - last_seen_dt + ).total_seconds() > NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS: + nodes_to_remove.append(node_uuid) + logger.info(f"Node {node_uuid} inactive for >{NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS}s. Will not render initially.") + + # Filter out inactive nodes for the initial render + active_known_nodes_db = { + k: v for k, v in known_nodes_db.items() + if k not in nodes_to_remove + } + + initial_nodes_data = [] + for node_uuid, data in active_known_nodes_db.items(): + current_health = get_node_health(data) + + connections = {} + for target_uuid in active_known_nodes_db: # Only iterate over currently active nodes + if target_uuid != node_uuid: + ping_data = database.get_ping_data( + node_uuid, target_uuid, start_time="-300s" + ) + latency_ms = None + if ping_data and ping_data["data"]["latency"]: + # Get the most recent non-None, non-zero latency + for latency in reversed(ping_data["data"]["latency"]): + if latency is not None and not (isinstance(latency, float) and latency == 0.0): + latency_ms = float(latency) + break + connections[target_uuid] = latency_ms + + initial_nodes_data.append( + { + "uuid": node_uuid, + "last_seen": data["last_seen"], # Keep original for JS + "formatted_last_seen": datetime.fromisoformat(data["last_seen"]).strftime("%Y-%m-%d %H:%M:%S UTC"), + "ip": data["ip"], + "health_status": current_health, + "uptime_seconds": data.get("uptime_seconds"), + "formatted_uptime": format_uptime(data.get("uptime_seconds")), # Pre-format uptime for HTML + "load_avg": data.get("load_avg"), + "memory_usage_percent": data.get("memory_usage_percent"), + "connections": connections, + } + ) + # --- End initial node data preparation --- + return templates.TemplateResponse( "index.html", { "request": request, "service_uuid": SERVICE_UUID, - "url_for": request.url_for, # Pass url_for for dynamic URL generation + "url_for": request.url_for, "root_path": request.scope.get( "root_path", "" ), # Pass root_path for JS base URL + "nodes": initial_nodes_data, # Pass initial node data for server-side rendering }, ) diff --git a/app/web/static/logs.js b/app/web/static/logs.js index 21076cb..a9e474f 100644 --- a/app/web/static/logs.js +++ b/app/web/static/logs.js @@ -43,7 +43,7 @@ document.addEventListener('DOMContentLoaded', () => { } // Attempt to parse JSON. This is where the error would occur if the content is HTML. - const data = await response.json(); + const data = await response.json(); console.log('Received logs:', data.logs.length); renderLogTable(data.logs); logCountSpan.textContent = data.log_count; diff --git a/app/web/templates/index.html b/app/web/templates/index.html index 6a09db3..b131538 100644 --- a/app/web/templates/index.html +++ b/app/web/templates/index.html @@ -9,12 +9,79 @@

Node Monitoring System

-

Total Nodes: 0

-

Service UUID: {{ service_uuid }}

+

Total Nodes: {{ nodes|length }}

+

Service UUID: {{ service_uuid }}

-

Loading node data...

+ {% if nodes %} +
+ +
+
{# Top-left corner #} + {% for node in nodes %} +
+
{{ node.uuid[:8] }}...
+
+ {% endfor %} +
+ + + {% for row_node in nodes %} +
+ +
+
{{ row_node.uuid[:8] }}...
+
+ + + {% for col_node in nodes %} +
+ {% if row_node.uuid == col_node.uuid %} +
Status: {{ row_node.health_status.upper() }}
+
+

UUID: {{ row_node.uuid }}

+

IP: {{ row_node.ip }}

+

Last Seen: {{ row_node.formatted_last_seen }}

+

Uptime: {{ row_node.formatted_uptime }}

+

Load Avg (1m, 5m, 15m): {{ row_node.load_avg | map('%.2f' | format) | join(', ') if row_node.load_avg else 'N/A' }}

+

Memory Usage: {{ '%.2f' | format(row_node.memory_usage_percent) + '%' if row_node.memory_usage_percent is not none else 'N/A' }}

+
+ {% else %} + {% set latency = row_node.connections[col_node.uuid] if col_node.uuid in row_node.connections else None %} + {% set display_latency = 'N/A' %} + {% if latency is not none and latency is not equalto 0.0 %} + {% set display_latency = '%.1f ms' | format(latency) %} + {% endif %} +
Ping: {{ display_latency }}
+
+

From: {{ row_node.uuid[:8] }}...

+

to: {{ col_node.uuid[:8] }}...

+

Latency: {{ display_latency }}

+
+ {% endif %} +
+ {% endfor %} +
+ {% endfor %} +
+ {% else %} +

No nodes reporting yet. Start a client!

+ {% endif %}