From 11a565732d66e76d648b9b9e7799b63222033481 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Tue, 10 Jun 2025 23:18:48 +0300 Subject: [PATCH] midle point. error in constr. --- app/main.py | 112 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 40 deletions(-) diff --git a/app/main.py b/app/main.py index e2f0446..a55c0bc 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,3 @@ -# app/main.py import os import uuid @@ -16,16 +15,60 @@ import uuid as uuid_lib from collections import deque from pythonjsonlogger import jsonlogger +import sys + +from .database import RRDDatabase # --- Service Configuration --- # Generate a unique Service UUID on startup, or get it from an environment variable SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4())) +database = RRDDatabase() # --- Logging Configuration --- # Get the root logger logger = logging.getLogger() logger.setLevel(logging.INFO) +# Custom handler to capture logs +class BufferHandler(logging.Handler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Instantiate the formatter once for efficiency + self.formatter = jsonlogger.JsonFormatter() + + def emit(self, record): + try: + # Format the record as JSON string, then parse to dict + log_entry = json.loads(self.formatter.format(record)) + log_buffer.add_log(log_entry) + except Exception as e: + # Log the error to stderr, to avoid recursion or filling the buffer with errors + print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr) + # Optionally, you could log record.msg or record.exc_info here for more context + + +class LogBuffer: + def __init__(self, maxlen=1000): + self.buffer = deque(maxlen=maxlen) + + def add_log(self, record): + # Ensure 'asctime' is present or handle its absence + timestamp = record.get('asctime') or datetime.utcnow().isoformat() + self.buffer.append({ + 'timestamp': timestamp, + 'level': record.get('levelname'), + 'message': record.get('message'), + 'extra': {k: v for k, v in record.items() + if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']} + # Added more common LogRecord attributes to exclude from 'extra' + }) + + def get_logs(self, limit=100): + return list(self.buffer)[-limit:] + +# Create global log buffer +log_buffer = LogBuffer() + # Use a handler that streams to stdout logHandler = logging.StreamHandler() @@ -36,46 +79,15 @@ formatter = jsonlogger.JsonFormatter( ) logHandler.setFormatter(formatter) -# Add the handler to the root logger +# Add handlers to the root logger # Avoid adding handlers multiple times in a uvicorn environment if not logger.handlers: logger.addHandler(logHandler) + # Add buffer handler to logger ONLY ONCE + buffer_handler = BufferHandler() + logger.addHandler(buffer_handler) -class LogBuffer: - def __init__(self, maxlen=1000): - self.buffer = deque(maxlen=maxlen) - - def add_log(self, record): - self.buffer.append({ - 'timestamp': record.get('asctime'), - 'level': record.get('levelname'), - 'message': record.get('message'), - 'extra': {k: v for k, v in record.items() - if k not in ['asctime', 'levelname', 'message', 'name']} - }) - - def get_logs(self, limit=100): - return list(self.buffer)[-limit:] - -# Create global log buffer -log_buffer = LogBuffer() - -# Add buffer handler to logger -buffer_handler = BufferHandler() -logger.addHandler(buffer_handler) - -# Custom handler to capture logs -class BufferHandler(logging.Handler): - def emit(self, record): - try: - # Format the record as JSON - formatter = jsonlogger.JsonFormatter() - log_entry = json.loads(formatter.format(record)) - log_buffer.add_log(log_entry) - except Exception: - pass - # --- FastAPI Application --- app = FastAPI( title="Node Monitoring System", @@ -175,9 +187,6 @@ async def update_node_status( } ) - # In a real implementation, you would now update the RRD database here. - # e.g., database.update(node_uuid, status_update) - # For now, we simulate the logic. if service_uuid != SERVICE_UUID: logger.warning( "Node sent status to wrong service UUID", @@ -185,7 +194,30 @@ async def update_node_status( ) return {"error": "Service UUID mismatch", "peers": []} - # Auto-discovery logic: update our list of known nodes + # Update RRD database with system metrics + try: + database.update_system_metrics( + node_uuid=node_uuid, + timestamp=status_update.timestamp, + uptime_seconds=status_update.status.uptime_seconds, + load_avg=status_update.status.load_avg, + memory_usage_percent=status_update.status.memory_usage_percent + ) + + # Update ping metrics + for target_uuid, latency in status_update.pings.pings.items(): + database.update_ping_metrics( + node_uuid=node_uuid, + target_uuid=target_uuid, + timestamp=status_update.timestamp, + latency_ms=latency + ) + + except Exception as e: + logger.error(f"Database update failed: {e}") + # Continue processing even if DB update fails + + # Auto-discovery logic if node_uuid not in known_nodes_db: logger.info(f"New node discovered: {node_uuid}") # A real app would need a strategy to handle node addresses