midle point. error in constr.
This commit is contained in:
112
app/main.py
112
app/main.py
@ -1,4 +1,3 @@
|
|||||||
# app/main.py
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import uuid
|
import uuid
|
||||||
@ -16,16 +15,60 @@ import uuid as uuid_lib
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from pythonjsonlogger import jsonlogger
|
from pythonjsonlogger import jsonlogger
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from .database import RRDDatabase
|
||||||
|
|
||||||
# --- Service Configuration ---
|
# --- Service Configuration ---
|
||||||
# Generate a unique Service UUID on startup, or get it from an environment variable
|
# Generate a unique Service UUID on startup, or get it from an environment variable
|
||||||
SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4()))
|
SERVICE_UUID = os.environ.get("SERVICE_UUID", str(uuid.uuid4()))
|
||||||
|
database = RRDDatabase()
|
||||||
|
|
||||||
# --- Logging Configuration ---
|
# --- Logging Configuration ---
|
||||||
# Get the root logger
|
# Get the root logger
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
# Custom handler to capture logs
|
||||||
|
class BufferHandler(logging.Handler):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
# Instantiate the formatter once for efficiency
|
||||||
|
self.formatter = jsonlogger.JsonFormatter()
|
||||||
|
|
||||||
|
def emit(self, record):
|
||||||
|
try:
|
||||||
|
# Format the record as JSON string, then parse to dict
|
||||||
|
log_entry = json.loads(self.formatter.format(record))
|
||||||
|
log_buffer.add_log(log_entry)
|
||||||
|
except Exception as e:
|
||||||
|
# Log the error to stderr, to avoid recursion or filling the buffer with errors
|
||||||
|
print(f"Error in BufferHandler: Could not process log record: {e}", file=sys.stderr)
|
||||||
|
# Optionally, you could log record.msg or record.exc_info here for more context
|
||||||
|
|
||||||
|
|
||||||
|
class LogBuffer:
|
||||||
|
def __init__(self, maxlen=1000):
|
||||||
|
self.buffer = deque(maxlen=maxlen)
|
||||||
|
|
||||||
|
def add_log(self, record):
|
||||||
|
# Ensure 'asctime' is present or handle its absence
|
||||||
|
timestamp = record.get('asctime') or datetime.utcnow().isoformat()
|
||||||
|
self.buffer.append({
|
||||||
|
'timestamp': timestamp,
|
||||||
|
'level': record.get('levelname'),
|
||||||
|
'message': record.get('message'),
|
||||||
|
'extra': {k: v for k, v in record.items()
|
||||||
|
if k not in ['asctime', 'levelname', 'message', 'name', 'lineno', 'filename', 'pathname', 'funcName', 'process', 'processName', 'thread', 'threadName']}
|
||||||
|
# Added more common LogRecord attributes to exclude from 'extra'
|
||||||
|
})
|
||||||
|
|
||||||
|
def get_logs(self, limit=100):
|
||||||
|
return list(self.buffer)[-limit:]
|
||||||
|
|
||||||
|
# Create global log buffer
|
||||||
|
log_buffer = LogBuffer()
|
||||||
|
|
||||||
# Use a handler that streams to stdout
|
# Use a handler that streams to stdout
|
||||||
logHandler = logging.StreamHandler()
|
logHandler = logging.StreamHandler()
|
||||||
|
|
||||||
@ -36,46 +79,15 @@ formatter = jsonlogger.JsonFormatter(
|
|||||||
)
|
)
|
||||||
logHandler.setFormatter(formatter)
|
logHandler.setFormatter(formatter)
|
||||||
|
|
||||||
# Add the handler to the root logger
|
# Add handlers to the root logger
|
||||||
# Avoid adding handlers multiple times in a uvicorn environment
|
# Avoid adding handlers multiple times in a uvicorn environment
|
||||||
if not logger.handlers:
|
if not logger.handlers:
|
||||||
logger.addHandler(logHandler)
|
logger.addHandler(logHandler)
|
||||||
|
# Add buffer handler to logger ONLY ONCE
|
||||||
|
buffer_handler = BufferHandler()
|
||||||
|
logger.addHandler(buffer_handler)
|
||||||
|
|
||||||
|
|
||||||
class LogBuffer:
|
|
||||||
def __init__(self, maxlen=1000):
|
|
||||||
self.buffer = deque(maxlen=maxlen)
|
|
||||||
|
|
||||||
def add_log(self, record):
|
|
||||||
self.buffer.append({
|
|
||||||
'timestamp': record.get('asctime'),
|
|
||||||
'level': record.get('levelname'),
|
|
||||||
'message': record.get('message'),
|
|
||||||
'extra': {k: v for k, v in record.items()
|
|
||||||
if k not in ['asctime', 'levelname', 'message', 'name']}
|
|
||||||
})
|
|
||||||
|
|
||||||
def get_logs(self, limit=100):
|
|
||||||
return list(self.buffer)[-limit:]
|
|
||||||
|
|
||||||
# Create global log buffer
|
|
||||||
log_buffer = LogBuffer()
|
|
||||||
|
|
||||||
# Add buffer handler to logger
|
|
||||||
buffer_handler = BufferHandler()
|
|
||||||
logger.addHandler(buffer_handler)
|
|
||||||
|
|
||||||
# Custom handler to capture logs
|
|
||||||
class BufferHandler(logging.Handler):
|
|
||||||
def emit(self, record):
|
|
||||||
try:
|
|
||||||
# Format the record as JSON
|
|
||||||
formatter = jsonlogger.JsonFormatter()
|
|
||||||
log_entry = json.loads(formatter.format(record))
|
|
||||||
log_buffer.add_log(log_entry)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# --- FastAPI Application ---
|
# --- FastAPI Application ---
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="Node Monitoring System",
|
title="Node Monitoring System",
|
||||||
@ -175,9 +187,6 @@ async def update_node_status(
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# In a real implementation, you would now update the RRD database here.
|
|
||||||
# e.g., database.update(node_uuid, status_update)
|
|
||||||
# For now, we simulate the logic.
|
|
||||||
if service_uuid != SERVICE_UUID:
|
if service_uuid != SERVICE_UUID:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Node sent status to wrong service UUID",
|
"Node sent status to wrong service UUID",
|
||||||
@ -185,7 +194,30 @@ async def update_node_status(
|
|||||||
)
|
)
|
||||||
return {"error": "Service UUID mismatch", "peers": []}
|
return {"error": "Service UUID mismatch", "peers": []}
|
||||||
|
|
||||||
# Auto-discovery logic: update our list of known nodes
|
# Update RRD database with system metrics
|
||||||
|
try:
|
||||||
|
database.update_system_metrics(
|
||||||
|
node_uuid=node_uuid,
|
||||||
|
timestamp=status_update.timestamp,
|
||||||
|
uptime_seconds=status_update.status.uptime_seconds,
|
||||||
|
load_avg=status_update.status.load_avg,
|
||||||
|
memory_usage_percent=status_update.status.memory_usage_percent
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update ping metrics
|
||||||
|
for target_uuid, latency in status_update.pings.pings.items():
|
||||||
|
database.update_ping_metrics(
|
||||||
|
node_uuid=node_uuid,
|
||||||
|
target_uuid=target_uuid,
|
||||||
|
timestamp=status_update.timestamp,
|
||||||
|
latency_ms=latency
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Database update failed: {e}")
|
||||||
|
# Continue processing even if DB update fails
|
||||||
|
|
||||||
|
# Auto-discovery logic
|
||||||
if node_uuid not in known_nodes_db:
|
if node_uuid not in known_nodes_db:
|
||||||
logger.info(f"New node discovered: {node_uuid}")
|
logger.info(f"New node discovered: {node_uuid}")
|
||||||
# A real app would need a strategy to handle node addresses
|
# A real app would need a strategy to handle node addresses
|
||||||
|
Reference in New Issue
Block a user