Files
node-monitor/app/main.py
2025-06-11 22:27:46 +03:00

366 lines
13 KiB
Python

import os
import uuid
import json
import logging
from datetime import datetime, timezone
from fastapi import FastAPI, Request, status, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field, validator
from typing import Dict, List, Annotated
import uuid as uuid_lib
from collections import deque
from pythonjsonlogger import jsonlogger
import sys
from .database import RRDDatabase
# --- Service Configuration ---
SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4()))
database = RRDDatabase()
# --- Logging Configuration ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class BufferHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.formatter = jsonlogger.JsonFormatter()
def emit(self, record):
try:
# Format the record as a JSON string and then parse it back to a dict
# This ensures consistency with the jsonlogger's output format
log_entry = json.loads(self.formatter.format(record))
log_buffer.add_log(log_entry)
except Exception as e:
print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr)
class LogBuffer:
def __init__(self, maxlen=1000):
self.buffer = deque(maxlen=maxlen)
def add_log(self, record):
# Assuming 'record' here is already a dictionary parsed from the JSON log string
timestamp = record.get('asctime') or datetime.utcnow().isoformat()
self.buffer.append({
'timestamp': timestamp,
'level': record.get('levelname'),
'message': record.get('message'),
'extra': {k: v for k, v in record.items()
if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']}
})
def get_logs(self, limit=100, level=None, since=None):
logger.debug(f"Fetching logs with limit={limit}, level={level}, since={since}")
logs = list(self.buffer)
# Apply level filter
if level and level.strip():
level = level.upper()
valid_levels = {'INFO', 'WARNING', 'ERROR', 'DEBUG'} # Added DEBUG for completeness
if level in valid_levels:
logs = [log for log in logs if log['level'].upper() == level]
else:
logger.warning(f"Invalid log level: {level}")
# Apply since filter
if since:
try:
# Handle 'Z' for UTC and ensure timezone awareness for comparison
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00')).astimezone(timezone.utc)
logs = [log for log in logs if
datetime.fromisoformat(log['timestamp'].replace('Z', '+00:00')).astimezone(timezone.utc) >= since_dt]
except ValueError:
logger.warning(f"Invalid 'since' timestamp: {since}")
logger.debug(f"Returning {len(logs[-limit:])} logs")
return logs[-limit:]
log_buffer = LogBuffer()
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
logHandler.setFormatter(formatter)
if not logger.handlers:
logger.addHandler(logHandler)
logger.addHandler(BufferHandler())
# --- FastAPI Application ---
app = FastAPI(
title="Node Monitoring System",
description=f"A distributed monitoring system. Service UUID: {SERVICE_UUID}"
)
templates = Jinja2Templates(directory="app/web/templates")
app.mount("/static", StaticFiles(directory="app/web/static"), name="static")
# --- Data Models ---
class NodeStatusModel(BaseModel):
uptime_seconds: int
load_avg: Annotated[List[float], Field(min_length=3, max_length=3)]
memory_usage_percent: float
class PingModel(BaseModel):
pings: Dict[Annotated[str, Field(pattern=r'^[0-9a-fA-F-]{36}$')], float]
class StatusUpdate(BaseModel):
node: str = Field(..., description="Node UUID")
timestamp: datetime
status: NodeStatusModel
pings: Dict[str, float]
@validator('node')
def validate_node_uuid(cls, v):
try:
uuid_lib.UUID(v)
return v
except ValueError:
raise ValueError('Invalid UUID format')
@validator('pings')
def validate_ping_uuids(cls, v):
for key in v.keys():
try:
uuid_lib.UUID(key)
except ValueError:
raise ValueError(f'Invalid UUID format in pings: {key}')
return v
# --- Node Management and Health Logic ---
known_nodes_db: Dict[str, Dict] = {}
LOAD_AVG_WARNING_THRESHOLD = 1.5
LOAD_AVG_CRITICAL_THRESHOLD = 3.0
MEMORY_WARNING_THRESHOLD = 75.0
MEMORY_CRITICAL_THRESHOLD = 90.0
LAST_SEEN_CRITICAL_THRESHOLD_SECONDS = 30
def get_node_health(node_data: Dict) -> str:
last_seen_str = node_data.get("last_seen")
if last_seen_str:
last_seen_dt = datetime.fromisoformat(last_seen_str).replace(tzinfo=timezone.utc)
time_since_last_seen = (datetime.now(timezone.utc) - last_seen_dt).total_seconds()
if time_since_last_seen > LAST_SEEN_CRITICAL_THRESHOLD_SECONDS:
return "critical"
else:
return "unknown"
status_model_data = node_data.get("status")
if not status_model_data:
return "unknown"
try:
status = NodeStatusModel(**status_model_data)
except Exception:
logger.error(f"Could not parse status data for node {node_data.get('uuid')}", exc_info=True)
return "unknown"
load_1min = status.load_avg[0]
if load_1min >= LOAD_AVG_CRITICAL_THRESHOLD:
return "critical"
elif load_1min >= LOAD_AVG_WARNING_THRESHOLD:
return "warning"
if status.memory_usage_percent >= MEMORY_CRITICAL_THRESHOLD:
return "critical"
elif status.memory_usage_percent >= MEMORY_WARNING_THRESHOLD:
return "warning"
return "healthy"
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Web root accessed",
extra={'client_ip': client_ip, 'service_uuid': SERVICE_UUID}
)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"service_uuid": SERVICE_UUID,
"url_for": request.url_for, # Pass url_for for dynamic URL generation
"root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL
}
)
@app.get("/{service_uuid}/logs")
async def get_logs(
request: Request,
service_uuid: str,
limit: int = 100,
format: str = Query(None, description="Response format: 'json' for JSON, default is HTML"),
level: str = Query(None, description="Filter logs by level: INFO, WARNING, ERROR"),
since: str = Query(None, description="Fetch logs since ISO timestamp, e.g., 2025-06-11T13:32:00")
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Logs endpoint accessed",
extra={
'service_uuid': service_uuid,
'format': format,
'level': level,
'since': since,
'limit': limit,
'client_ip': client_ip
}
)
if service_uuid != SERVICE_UUID:
logger.warning(f"Invalid service UUID: {service_uuid}")
return JSONResponse(
status_code=404,
content={"error": "Service UUID not found"}
)
try:
logs = log_buffer.get_logs(limit=limit, level=level, since=since)
log_data = {
"service_uuid": service_uuid,
"log_count": len(logs),
"logs": logs
}
logger.debug(f"Fetched {len(logs)} logs for response")
except Exception as e:
logger.error(f"Error fetching logs: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": "Failed to fetch logs"}
)
if format == "json":
logger.debug("Returning JSON response")
return JSONResponse(content=log_data)
logger.debug("Rendering logs.html template")
try:
return templates.TemplateResponse(
"logs.html",
{
"request": request,
"service_uuid": service_uuid,
"logs": logs,
"log_count": len(logs),
"url_for": request.url_for, # Pass url_for for dynamic URL generation
"root_path": request.scope.get('root_path', '') # Pass root_path for JS base URL
}
)
except Exception as e:
logger.error(f"Error rendering logs.html: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": "Failed to render logs page"}
)
@app.put("/{service_uuid}/{node_uuid}/")
async def update_node_status(
service_uuid: str,
node_uuid: str,
status_update: StatusUpdate,
request: Request
):
# Use X-Forwarded-For if available, otherwise client.host
client_ip = request.headers.get("x-forwarded-for", request.client.host)
logger.info(
"Received node status update",
extra={
'event_type': 'node_status_update',
'client_ip': client_ip,
'service_uuid': service_uuid,
'node_uuid': node_uuid,
'data': status_update.dict()
}
)
if service_uuid != SERVICE_UUID:
logger.warning(
"Node sent status to wrong service UUID",
extra={'client_node_uuid': node_uuid, 'target_uuid': service_uuid}
)
return {"error": "Service UUID mismatch", "peers": []}
try:
database.update_system_metrics(
node_uuid=node_uuid,
timestamp=status_update.timestamp,
uptime_seconds=status_update.status.uptime_seconds,
load_avg=status_update.status.load_avg,
memory_usage_percent=status_update.status.memory_usage_percent
)
for target_uuid, latency in status_update.pings.items():
database.update_ping_metrics(
node_uuid=node_uuid,
target_uuid=target_uuid,
timestamp=status_update.timestamp,
latency_ms=latency
)
except Exception as e:
logger.error(f"Database update failed: {e}", exc_info=True)
current_time_utc = datetime.now(timezone.utc)
known_nodes_db[node_uuid] = {
"last_seen": current_time_utc.isoformat(),
"ip": request.client.host, # Keep original client.host here as it's the direct connection
"status": status_update.status.dict(),
"uptime_seconds": status_update.status.uptime_seconds,
"load_avg": status_update.status.load_avg,
"memory_usage_percent": status_update.status.memory_usage_percent
}
health_status_for_log = get_node_health(known_nodes_db[node_uuid])
logger.info(f"Node {node_uuid} updated. Health: {health_status_for_log}")
peer_list = {uuid: {"last_seen": data["last_seen"], "ip": data["ip"]}
for uuid, data in known_nodes_db.items() if uuid != node_uuid}
return {"message": "Status received", "peers": peer_list}
@app.get("/nodes/status")
async def get_all_nodes_status():
logger.info("Fetching all nodes status for UI.")
response_nodes = []
for node_uuid, data in known_nodes_db.items():
current_health = get_node_health(data)
connections = {}
for target_uuid in known_nodes_db:
if target_uuid != node_uuid:
ping_data = database.get_ping_data(node_uuid, target_uuid, start_time="-300s")
latency_ms = None
if ping_data and ping_data['data']['latency']:
# Get the most recent non-None latency
for latency in reversed(ping_data['data']['latency']):
if latency is not None and not (isinstance(latency, float) and latency == 0.0): # Exclude 0.0 which might be a default
latency_ms = float(latency)
break
connections[target_uuid] = latency_ms
response_nodes.append({
"uuid": node_uuid,
"last_seen": data["last_seen"],
"ip": data["ip"],
"health_status": current_health,
"uptime_seconds": data.get("uptime_seconds"),
"load_avg": data.get("load_avg"),
"memory_usage_percent": data.get("memory_usage_percent"),
"connections": connections
})
return {"nodes": response_nodes}
@app.get("/health")
async def health_check():
return {"status": "ok"}