Files
node-monitor/app/main.py
2025-06-12 23:12:19 +03:00

456 lines
15 KiB
Python

import os
import uuid
import json
import logging
from datetime import datetime, timezone, timedelta
from fastapi import FastAPI, Request, status, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field, validator
from typing import Dict, List, Annotated
import uuid as uuid_lib
from collections import deque
from dateutil.parser import isoparse # Import isoparse for robust date parsing
from pythonjsonlogger import jsonlogger
import sys
from .database import RRDDatabase
# --- Service Configuration ---
SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4()))
database = RRDDatabase()
# --- Logging Configuration ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class BufferHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Use the same formatter string as the StreamHandler for consistency
self.formatter = jsonlogger.JsonFormatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"
)
def emit(self, record):
try:
log_entry_str = self.formatter.format(record)
log_entry = json.loads(log_entry_str)
log_buffer.add_log(log_entry)
except Exception as e:
print(
f"Error in BufferHandler: Could not process log record: {e}",
file=sys.stderr,
)
class LogBuffer:
def __init__(self, maxlen=1000):
self.buffer = deque(maxlen=maxlen)
def add_log(self, record):
# Assuming 'record' here is already a dictionary parsed from the JSON log string
timestamp = record.get("asctime") or datetime.utcnow().isoformat()
self.buffer.append(
{
"timestamp": timestamp,
"level": record.get(
"levelname"
), # This should now correctly get 'levelname'
"message": record.get("message"),
"extra": {
k: v
for k, v in record.items()
if k
not in [
"asctime",
"levelname",
"message",
"name",
"lineno",
"filename",
"pathname",
"funcName",
"process",
"processName",
"thread",
"threadName",
]
},
}
)
def get_logs(self, limit=100, level=None, since=None):
logger.debug(f"Fetching logs with limit={limit}, level={level}, since={since}")
logs = list(self.buffer)
# Apply level filter
if level and level.strip():
level = level.upper()
valid_levels = {"INFO", "WARNING", "ERROR", "DEBUG"}
if level in valid_levels:
logs = [log for log in logs if log["level"].upper() == level]
else:
logger.warning(f"Invalid log level: {level}")
# Apply since filter
if since:
try:
# Use isoparse for robust parsing of ISO 8601 strings
since_dt = isoparse(since)
# If the parsed datetime is naive (no timezone info), assume it's UTC
if since_dt.tzinfo is None:
since_dt = since_dt.replace(tzinfo=timezone.utc)
else:
# If it has timezone info, convert it to UTC for consistent comparison
since_dt = since_dt.astimezone(timezone.utc)
logs = [
log
for log in logs
if datetime.fromisoformat(
log["timestamp"].replace("Z", "+00:00")
).astimezone(timezone.utc)
>= since_dt
]
except ValueError:
logger.warning(f"Invalid 'since' timestamp: {since}")
logger.debug(f"Returning {len(logs[-limit:])} logs")
return logs[-limit:]
log_buffer = LogBuffer()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter("%(asctime)s %(name)s %(levelname)s %(message)s")
logHandler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(logHandler)
logger.addHandler(BufferHandler())
# Configure Uvicorn's loggers to propagate to the root logger
# This ensures Uvicorn's startup and access logs are captured by our BufferHandler
logging.getLogger("uvicorn").propagate = True
logging.getLogger("uvicorn.access").propagate = True
logging.getLogger("uvicorn.error").propagate = True
# --- FastAPI Application ---
app = FastAPI(
title="Node Monitoring System",
description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}",
)
templates = Jinja2Templates(directory="app/web/templates")
app.mount("/static", StaticFiles(directory="app/web/static"), name="static")
# --- Data Models ---
class NodeStatusModel(BaseModel):
uptime_seconds: int
load_avg: Annotated[List[float], Field(min_length=3, max_length=3)]
memory_usage_percent: float
class PingModel(BaseModel):
pings: Dict[Annotated[str, Field(pattern=r"^[0-9a-fA-F-]{36}$")], float]
class StatusUpdate(BaseModel):
node: str = Field(..., description="Node UUID")
timestamp: datetime
status: NodeStatusModel
pings: Dict[str, float]
@validator("node")
def validate_node_uuid(cls, v):
try:
uuid_lib.UUID(v)
return v
except ValueError:
raise ValueError("Invalid UUID format")
@validator("pings")
def validate_ping_uuids(cls, v):
for key in v.keys():
try:
uuid_lib.UUID(key)
except ValueError:
raise ValueError(f"Invalid UUID format in pings: {key}")
return v
# --- Node Management and Health Logic ---
known_nodes_db: Dict[str, Dict] = {}
LOAD_AVG_WARNING_THRESHOLD = 1.5
LOAD_AVG_CRITICAL_THRESHOLD = 3.0
MEMORY_WARNING_THRESHOLD = 75.0
MEMORY_CRITICAL_THRESHOLD = 90.0
LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30
NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS = (
300 # Remove node from UI after 5 minutes of inactivity
)
def get_node_health(node_data: Dict) -> str:
last_seen_str = node_data.get("last_seen")
if last_seen_str:
last_seen_dt = datetime.fromisoformat(last_seen_str).replace(
tzinfo=timezone.utc
)
time_since_last_seen = (
datetime.now(timezone.utc) - last_seen_dt
).total_seconds()
if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS:
return "critical"
else:
return "unknown"
status_model_data = node_data.get("status")
if not status_model_data:
return "unknown"
try:
status = NodeStatusModel(**status_model_data)
except Exception:
logger.error(
f"Could not parse status data for node {node_data.get('uuid')}",
exc_info=True,
)
return "unknown"
load_1min = status.load_avg[0]
if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD:
return "critical"
elif load_1min >= LOAD_AVG_WARNING_THRESHOLD:
return "warning"
if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD:
return "critical"
elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD:
return "warning"
return "healthy"
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Web root accessed",
extra={"client_ip": client_ip, "service_uuid": SERVICE_UUID},
)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"service_uuid": SERVICE_UUID,
"url_for": request.url_for, # Pass url_for for dynamic URL generation
"root_path": request.scope.get(
"root_path", ""
), # Pass root_path for JS base URL
},
)
@app.get("/{service_uuid}/logs")
async def get_logs(
request: Request,
service_uuid: str,
limit: int = 100,
format: str = Query(
None, description="Response format: 'json' for JSON, default is HTML"
),
level: str = Query(None, description="Filter logs by level: INFO, WARNING, ERROR"),
since: str = Query(
None, description="Fetch logs since ISO timestamp, e.g., 2025-06-11T13:32:00"
),
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Logs endpoint accessed",
extra={
"service_uuid": service_uuid,
"format": format,
"level": level,
"since": since,
"limit": limit,
"client_ip": client_ip,
},
)
if service_uuid != SERVICE_UUID:
logger.warning(f"Invalid service UUID: {service_uuid}")
return JSONResponse(
status_code=404, content={"error": "Service UUID not found"}
)
try:
logs = log_buffer.get_logs(limit=limit, level=level, since=since)
log_data = {"service_uuid": service_uuid, "log_count": len(logs), "logs": logs}
logger.debug(f"Fetched {len(logs)} logs for response")
except Exception as e:
logger.error(f"Error fetching logs: {e}", exc_info=True)
return JSONResponse(status_code=500, content={"error": "Failed to fetch logs"})
if format == "json":
logger.debug("Returning JSON response")
return JSONResponse(content=log_data)
logger.debug("Rendering logs.html template")
try:
return templates.TemplateResponse(
"logs.html",
{
"request": request,
"service_uuid": service_uuid,
"logs": logs,
"log_count": len(logs),
"url_for": request.url_for, # Pass url_for for dynamic URL generation
"root_path": request.scope.get(
"root_path", ""
), # Pass root_path for JS base URL
},
)
except Exception as e:
logger.error(f"Error rendering logs.html: {e}", exc_info=True)
return JSONResponse(
status_code=500, content={"error": "Failed to render logs page"}
)
@app.put("/{service_uuid}/{node_uuid}/")
async def update_node_status(
service_uuid: str, node_uuid: str, status_update: StatusUpdate, request: Request
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Received node status update",
extra={
"event_type": "node_status_update",
"client_ip": client_ip,
"service_uuid": service_uuid,
"node_uuid": node_uuid,
"data": status_update.dict(),
},
)
if service_uuid != SERVICE_UUID:
logger.warning(
"Node sent status to wrong service UUID",
extra={"client_node_uuid": node_uuid, "target_uuid": service_uuid},
)
return {"error": "Service UUID mismatch", "peers": []}
try:
database.update_system_metrics(
node_uuid=node_uuid,
timestamp=status_update.timestamp,
uptime_seconds=status_update.status.uptime_seconds,
load_avg=status_update.status.load_avg,
memory_usage_percent=status_update.status.memory_usage_percent,
)
for target_uuid, latency in status_update.pings.items():
database.update_ping_metrics(
node_uuid=node_uuid,
target_uuid=target_uuid,
timestamp=status_update.timestamp,
latency_ms=latency,
)
except Exception as e:
logger.error(f"Database update failed: {e}", exc_info=True)
current_time_utc = datetime.now(timezone.utc)
known_nodes_db[node_uuid] = {
"last_seen": current_time_utc.isoformat(),
"ip": request.client.host, # Keep original client.host here as it's the direct connection
"status": status_update.status.dict(),
"uptime_seconds": status_update.status.uptime_seconds,
"load_avg": status_update.status.load_avg,
"memory_usage_percent": status_update.status.memory_usage_percent,
}
health_status_for_log = get_node_health(known_nodes_db[node_uuid])
logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}")
peer_list = {
uuid: {"last_seen": data["last_seen"], "ip": data["ip"]}
for uuid, data in known_nodes_db.items()
if uuid != node_uuid
}
return {"message": "Status received", "peers": peer_list}
@app.get("/nodes/status")
async def get_all_nodes_status():
logger.info("Fetching all nodes status for UI.")
# Prune inactive nodes from known_nodes_db before processing
current_time_utc = datetime.now(timezone.utc)
nodes_to_remove = []
for node_uuid, data in known_nodes_db.items():
last_seen_dt = datetime.fromisoformat(data["last_seen"]).replace(
tzinfo=timezone.utc
)
if (
current_time_utc - last_seen_dt
).total_seconds() > NODE_INACTIVE_REMOVAL_THRESHOLD_SECONDS:
nodes_to_remove.append(node_uuid)
logger.info(f"Removing inactive node {node_uuid} from known_nodes_db.")
for node_uuid in nodes_to_remove:
known_nodes_db.pop(node_uuid)
response_nodes = []
for node_uuid, data in known_nodes_db.items():
current_health = get_node_health(data)
connections = {}
for target_uuid in known_nodes_db: # Only iterate over currently active nodes
if target_uuid != node_uuid:
ping_data = database.get_ping_data(
node_uuid, target_uuid, start_time="-300s"
)
latency_ms = None
if ping_data and ping_data["data"]["latency"]:
# Get the most recent non-None latency
for latency in reversed(ping_data["data"]["latency"]):
if latency is not None and not (
isinstance(latency, float) and latency == 0.0
): # Exclude 0.0 which might be a default
latency_ms = float(latency)
break
connections[target_uuid] = latency_ms
response_nodes.append(
{
"uuid": node_uuid,
"last_seen": data["last_seen"],
"ip": data["ip"],
"health_status": current_health,
"uptime_seconds": data.get("uptime_seconds"),
"load_avg": data.get("load_avg"),
"memory_usage_percent": data.get("memory_usage_percent"),
"connections": connections,
}
)
return {"nodes": response_nodes}
@app.get("/health")
async def health_check():
return {"status": "ok"}