Files
node-monitor/app/main.py

562 lines
20 KiB
Python

import os
import uuid
import json
import logging
from datetime import datetime, timezone, timedelta
from fastapi import FastAPI, Request, status, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field, validator
from typing import Dict, List, Annotated, Optional
import uuid as uuid_lib
from collections import deque
from dateutil.parser import isoparse # Import isoparse for robust date parsing
from pythonjsonlogger import jsonlogger
import sys
from .database import RRDDatabase
# --- Service Configuration ---
SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4()))
database = RRDDatabase()
# --- Logging Configuration ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class BufferHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Use the same formatter string as the StreamHandler for consistency
# Ensure asctime is formatted as ISO 8601 UTC with milliseconds and 'Z'
self.formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format with milliseconds and Z for UTC
)
def emit(self, record):
try:
log_entry_str = self.formatter.format(record)
log_entry = json.loads(log_entry_str)
# The 'asctime' field in log_entry is now guaranteed to be ISO 8601
log_buffer.add_log(log_entry)
except Exception as e:
print(
f"Error in BufferHandler: Could not process log record: {e}",
file=sys.stderr,
)
class LogBuffer:
def __init__(self, maxlen=1000):
self.buffer = deque(maxlen=maxlen)
def add_log(self, record):
# 'record' is a dictionary parsed from the JSON log string.
# 'asctime' should now be in ISO 8601 format due to BufferHandler's formatter.
timestamp_str = record.get("asctime")
if timestamp_str:
try:
# Use isoparse for robust parsing, then convert to UTC and store as ISO 8601 with 'Z'
dt_obj = isoparse(timestamp_str)
if dt_obj.tzinfo is None:
# Assume UTC if naive (common for logs without explicit timezone info)
dt_obj = dt_obj.replace(tzinfo=timezone.utc)
else:
# Convert to UTC for consistent storage
dt_obj = dt_obj.astimezone(timezone.utc)
timestamp_to_store = dt_obj.isoformat(timespec='milliseconds').replace('+00:00', 'Z')
except ValueError:
logger.warning(f"Failed to parse log timestamp '{timestamp_str}' from formatter. Using current UTC time.")
timestamp_to_store = datetime.utcnow().isoformat(timespec='milliseconds') + 'Z'
else:
timestamp_to_store = datetime.utcnow().isoformat(timespec='milliseconds') + 'Z'
self.buffer.append(
{
"timestamp": timestamp_to_store,
"level": record.get("levelname"),
"message": record.get("message"),
"extra": {
k: v
for k, v in record.items()
if k
not in [
"asctime",
"levelname",
"message",
"name",
"lineno",
"filename",
"pathname",
"funcName",
"process",
"processName",
"thread",
"threadName",
]
},
}
)
def get_logs(self, limit=100, level=None, since=None):
logger.debug(f"Fetching logs with limit={limit}, level={level}, since={since}")
logs = list(self.buffer)
# Apply level filter
if level and level.strip():
level = level.upper()
valid_levels = {"INFO", "WARNING", "ERROR", "DEBUG"}
if level in valid_levels:
logs = [log for log in logs if log["level"].upper() == level]
else:
logger.warning(f"Invalid log level: {level}")
# Apply since filter
if since:
try:
# Use isoparse for robust parsing of ISO 8601 strings
since_dt = isoparse(since)
# If the parsed datetime is naive (no timezone info), assume it's UTC
if since_dt.tzinfo is None:
since_dt = since_dt.replace(tzinfo=timezone.utc)
else:
# If it has timezone info, convert it to UTC for consistent comparison
since_dt = since_dt.astimezone(timezone.utc)
logs = [
log
for log in logs
# log["timestamp"] is now guaranteed to be ISO 8601 with 'Z'
if isoparse(log["timestamp"]).astimezone(timezone.utc)
>= since_dt
]
except ValueError:
logger.warning(f"Invalid 'since' timestamp: {since}")
logger.debug(f"Returning {len(logs[-limit:])} logs")
return logs[-limit:]
log_buffer = LogBuffer()
logHandler = logging.StreamHandler()
# Ensure StreamHandler also formats asctime into ISO 8601 UTC
formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S.%fZ" # ISO 8601 format with milliseconds and Z for UTC
)
logHandler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(logHandler)
logger.addHandler(BufferHandler())
# Configure Uvicorn's loggers to propagate to the root logger
# This ensures Uvicorn's startup and access logs are captured by our BufferHandler
logging.getLogger("uvicorn").propagate = True
logging.getLogger("uvicorn.access").propagate = True
logging.getLogger("uvicorn.error").propagate = True
# --- FastAPI Application ---
app = FastAPI(
title="Node Monitoring System",
description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}",
)
templates = Jinja2Templates(directory="app/web/templates")
app.mount("/static", StaticFiles(directory="app/web/static"), name="static")
# To correctly handle HTTPS behind a reverse proxy, ensure your Uvicorn server
# is run with --proxy-headers and --forwarded-allow-ips.
# e.g., uvicorn main:app --host 0.0.0.0 --port 8000 --proxy-headers --forwarded-allow-ips '*'
# --- Data Models ---
class NodeStatusModel(BaseModel):
uptime_seconds: int
load_avg: Annotated[List[float], Field(min_length=3, max_length=3)]
memory_usage_percent: float
class PingModel(BaseModel):
pings: Dict[Annotated[str, Field(pattern=r"^[0-9a-fA-F-]{36}$")], float]
class StatusUpdate(BaseModel):
node: str = Field(..., description="Node UUID")
timestamp: datetime
status: NodeStatusModel
pings: Dict[str, float]
@validator("node")
def validate_node_uuid(cls, v):
try:
uuid_lib.UUID(v)
return v
except ValueError:
raise ValueError("Invalid UUID format")
@validator("pings")
def validate_ping_uuids(cls, v):
for key in v.keys():
try:
uuid_lib.UUID(key)
except ValueError:
raise ValueError(f"Invalid UUID format in pings: {key}")
return v
# --- Node Management and Health Logic ---
known_nodes_db: Dict[str, Dict] = {}
LOAD_AVG_WARNING_THRESHOLD = 1.5
LOAD_AVG_CRITICAL_THRESHOLD = 3.0
MEMORY_WARNING_THRESHOLD = 75.0
MEMORY_CRITICAL_THRESHOLD = 90.0
LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30
NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS = (
300 # Remove node from UI after 5 minutes of inactivity
)
def get_node_health(node_data: Dict) -> str:
last_seen_str = node_data.get("last_seen")
if last_seen_str:
last_seen_dt = datetime.fromisoformat(last_seen_str).replace(
tzinfo=timezone.utc
)
time_since_last_seen = (
datetime.now(timezone.utc) - last_seen_dt
).total_seconds()
if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS:
return "critical"
else:
return "unknown"
status_model_data = node_data.get("status")
if not status_model_data:
return "unknown"
try:
status = NodeStatusModel(**status_model_data)
except Exception:
logger.error(
f"Could not parse status data for node {node_data.get('uuid')}",
exc_info=True,
)
return "unknown"
load_1min = status.load_avg[0]
if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD:
return "critical"
elif load_1min >= LOAD_AVG_WARNING_THRESHOLD:
return "warning"
if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD:
return "critical"
elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD:
return "warning"
return "healthy"
def format_uptime(seconds: Optional[int]) -> str:
"""Formats uptime in seconds into a human-readable string (e.g., "1d 2h 3m 4s")."""
if seconds is None:
return "N/A"
days = seconds // (3600 * 24)
seconds %= (3600 * 24)
hours = seconds // 3600
seconds %= 3600
minutes = seconds // 60
remaining_seconds = seconds % 60
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
# Always include seconds if no other parts, or if there are remaining seconds
if remaining_seconds > 0 or not parts:
parts.append(f"{remaining_seconds}s")
return " ".join(parts)
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Web root accessed",
extra={"client_ip": client_ip, "service_uuid": SERVICE_UUID},
)
# --- Prepare initial node data for server-side rendering ---
current_time_utc = datetime.now(timezone.utc)
nodes_to_remove = []
for node_uuid, data in known_nodes_db.items():
last_seen_dt = datetime.fromisoformat(data["last_seen"]).replace(
tzinfo=timezone.utc
)
if (
current_time_utc - last_seen_dt
).total_seconds() > NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS:
nodes_to_remove.append(node_uuid)
logger.info(f"Node {node_uuid} inactive for >{NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS}s. Will not render initially.")
# Filter out inactive nodes for the initial render
active_known_nodes_db = {
k: v for k, v in known_nodes_db.items()
if k not in nodes_to_remove
}
initial_nodes_data = []
for node_uuid, data in active_known_nodes_db.items():
current_health = get_node_health(data)
connections = {}
for target_uuid in active_known_nodes_db: # Only iterate over currently active nodes
if target_uuid != node_uuid:
ping_data = database.get_ping_data(
node_uuid, target_uuid, start_time="-300s"
)
latency_ms = None
if ping_data and ping_data["data"]["latency"]:
# Get the most recent non-None, non-zero latency
for latency in reversed(ping_data["data"]["latency"]):
if latency is not None and not (isinstance(latency, float) and latency == 0.0):
latency_ms = float(latency)
break
connections[target_uuid] = latency_ms
initial_nodes_data.append(
{
"uuid": node_uuid,
"last_seen": data["last_seen"], # Keep original for JS
"formatted_last_seen": datetime.fromisoformat(data["last_seen"]).strftime("%Y-%m-%d %H:%M:%S UTC"),
"ip": data["ip"],
"health_status": current_health,
"uptime_seconds": data.get("uptime_seconds"),
"formatted_uptime": format_uptime(data.get("uptime_seconds")), # Pre-format uptime for HTML
"load_avg": data.get("load_avg"),
"memory_usage_percent": data.get("memory_usage_percent"),
"connections": connections,
}
)
# --- End initial node data preparation ---
return templates.TemplateResponse(
"index.html",
{
"request": request,
"service_uuid": SERVICE_UUID,
"url_for": request.url_for,
"root_path": request.scope.get(
"root_path", ""
), # Pass root_path for JS base URL
"nodes": initial_nodes_data, # Pass initial node data for server-side rendering
},
)
@app.get("/{service_uuid}/logs")
async def get_logs(
request: Request,
service_uuid: str,
limit: int = 100,
format: str = Query(
None, description="Response format: 'json' for JSON, default is HTML"
),
level: str = Query(None, description="Filter logs by level: INFO, WARNING, ERROR"),
since: str = Query(
None, description="Fetch logs since ISO timestamp, e.g., 2025-06-11T13:32:00"
),
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Logs endpoint accessed",
extra={
"service_uuid": service_uuid,
"format": format,
"level": level,
"since": since,
"limit": limit,
"client_ip": client_ip,
},
)
if service_uuid != SERVICE_UUID:
logger.warning(f"Invalid service UUID: {service_uuid}")
return JSONResponse(
status_code=404, content={"error": "Service UUID not found"}
)
try:
logs = log_buffer.get_logs(limit=limit, level=level, since=since)
log_data = {"service_uuid": service_uuid, "log_count": len(logs), "logs": logs}
logger.debug(f"Fetched {len(logs)} logs for response")
except Exception as e:
logger.error(f"Error fetching logs: {e}", exc_info=True)
return JSONResponse(status_code=500, content={"error": "Failed to fetch logs"})
if format == "json":
logger.debug("Returning JSON response")
return JSONResponse(content=log_data)
logger.debug("Rendering logs.html template")
try:
return templates.TemplateResponse(
"logs.html",
{
"request": request,
"service_uuid": service_uuid,
"logs": logs,
"log_count": len(logs),
"url_for": request.url_for, # Pass url_for for dynamic URL generation
"root_path": request.scope.get(
"root_path", ""
), # Pass root_path for JS base URL
},
)
except Exception as e:
logger.error(f"Error rendering logs.html: {e}", exc_info=True)
return JSONResponse(
status_code=500, content={"error": "Failed to render logs page"}
)
@app.put("/{service_uuid}/{node_uuid}/")
async def update_node_status(
service_uuid: str, node_uuid: str, status_update: StatusUpdate, request: Request
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Received node status update",
extra={
"event_type": "node_status_update",
"client_ip": client_ip,
"service_uuid": service_uuid,
"node_uuid": node_uuid,
"data": status_update.dict(),
},
)
if service_uuid != SERVICE_UUID:
logger.warning(
"Node sent status to wrong service UUID",
extra={"client_node_uuid": node_uuid, "target_uuid": service_uuid},
)
return {"error": "Service UUID mismatch", "peers": []}
try:
database.update_system_metrics(
node_uuid=node_uuid,
timestamp=status_update.timestamp,
uptime_seconds=status_update.status.uptime_seconds,
load_avg=status_update.status.load_avg,
memory_usage_percent=status_update.status.memory_usage_percent,
)
for target_uuid, latency in status_update.pings.items():
database.update_ping_metrics(
node_uuid=node_uuid,
target_uuid=target_uuid,
timestamp=status_update.timestamp,
latency_ms=latency,
)
except Exception as e:
logger.error(f"Database update failed: {e}", exc_info=True)
current_time_utc = datetime.now(timezone.utc)
known_nodes_db[node_uuid] = {
"last_seen": current_time_utc.isoformat(),
"ip": request.client.host, # Keep original client.host here as it's the direct connection
"status": status_update.status.dict(),
"uptime_seconds": status_update.status.uptime_seconds,
"load_avg": status_update.status.load_avg,
"memory_usage_percent": status_update.status.memory_usage_percent,
}
health_status_for_log = get_node_health(known_nodes_db[node_uuid])
logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}")
peer_list = {
uuid: {"last_seen": data["last_seen"], "ip": data["ip"]}
for uuid, data in known_nodes_db.items()
if uuid != node_uuid
}
return {"message": "Status received", "peers": peer_list}
@app.get("/nodes/status")
async def get_all_nodes_status():
logger.info("Fetching all nodes status for UI.")
# Prune inactive nodes from known_nodes_db before processing
current_time_utc = datetime.now(timezone.utc)
nodes_to_remove = []
for node_uuid, data in known_nodes_db.items():
last_seen_dt = datetime.fromisoformat(data["last_seen"]).replace(
tzinfo=timezone.utc
)
if (
current_time_utc - last_seen_dt
).total_seconds() > NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS:
nodes_to_remove.append(node_uuid)
logger.info(f"Removing inactive node {node_uuid} from known_nodes_db.")
for node_uuid in nodes_to_remove:
known_nodes_db.pop(node_uuid)
response_nodes = []
for node_uuid, data in known_nodes_db.items():
current_health = get_node_health(data)
connections = {}
for target_uuid in known_nodes_db: # Only iterate over currently active nodes
if target_uuid != node_uuid:
ping_data = database.get_ping_data(
node_uuid, target_uuid, start_time="-300s"
)
latency_ms = None
if ping_data and ping_data["data"]["latency"]:
# Get the most recent non-None latency
for latency in reversed(ping_data["data"]["latency"]):
if latency is not None and not (
isinstance(latency, float) and latency == 0.0
): # Exclude 0.0 which might be a default
latency_ms = float(latency)
break
connections[target_uuid] = latency_ms
response_nodes.append(
{
"uuid": node_uuid,
"last_seen": data["last_seen"],
"ip": data["ip"],
"health_status": current_health,
"uptime_seconds": data.get("uptime_seconds"),
"load_avg": data.get("load_avg"),
"memory_usage_percent": data.get("memory_usage_percent"),
"connections": connections,
}
)
return {"nodes": response_nodes}
@app.get("/health")
async def health_check():
return {"status": "ok"}