It works now
This commit is contained in:
257
app/database.py
Normal file
257
app/database.py
Normal file
@ -0,0 +1,257 @@
|
||||
import os
|
||||
import rrdtool
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RRDDatabase:
|
||||
def __init__(self, data_dir: str = None):
|
||||
# Use environment variable or default to /data
|
||||
if data_dir is None:
|
||||
data_dir = os.environ.get("DATA_DIR", "/data")
|
||||
|
||||
self.data_dir = Path(data_dir)
|
||||
|
||||
# Create data directory if it doesn't exist
|
||||
try:
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"Using data directory: {self.data_dir}")
|
||||
except PermissionError:
|
||||
logger.error(f"Permission denied creating data directory: {self.data_dir}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create data directory {self.data_dir}: {e}")
|
||||
raise
|
||||
|
||||
# RRD configuration
|
||||
self.step = 60 # 1-minute intervals
|
||||
self.heartbeat = 120 # 2-minute heartbeat (allow 1 missed update)
|
||||
|
||||
# Retention policy (6 months total)
|
||||
self.rra_config = [
|
||||
"RRA:AVERAGE:0.5:1:1440", # 1-min avg for 24 hours (1440 points)
|
||||
"RRA:AVERAGE:0.5:60:744", # 1-hour avg for 31 days (744 points)
|
||||
"RRA:AVERAGE:0.5:1440:180", # 1-day avg for 6 months (180 points)
|
||||
"RRA:MAX:0.5:1:1440", # 1-min max for 24 hours
|
||||
"RRA:MAX:0.5:60:744", # 1-hour max for 31 days
|
||||
"RRA:MIN:0.5:60:744", # 1-hour min for 31 days
|
||||
]
|
||||
|
||||
def _get_node_dir(self, node_uuid: str) -> Path:
|
||||
"""Get the directory path for a specific node's RRD files."""
|
||||
node_dir = self.data_dir / node_uuid
|
||||
node_dir.mkdir(exist_ok=True)
|
||||
return node_dir
|
||||
|
||||
def _create_system_rrd(self, node_uuid: str) -> str:
|
||||
"""Create RRD file for system metrics (uptime, load, memory)."""
|
||||
rrd_file = self._get_node_dir(node_uuid) / "system.rrd"
|
||||
|
||||
if rrd_file.exists():
|
||||
return str(rrd_file)
|
||||
|
||||
try:
|
||||
rrdtool.create(
|
||||
str(rrd_file),
|
||||
"--step", str(self.step),
|
||||
# Data sources
|
||||
f"DS:uptime:GAUGE:{self.heartbeat}:0:U", # Uptime in seconds
|
||||
f"DS:load1:GAUGE:{self.heartbeat}:0:100", # 1-min load average
|
||||
f"DS:load5:GAUGE:{self.heartbeat}:0:100", # 5-min load average
|
||||
f"DS:load15:GAUGE:{self.heartbeat}:0:100", # 15-min load average
|
||||
f"DS:memory:GAUGE:{self.heartbeat}:0:100", # Memory usage %
|
||||
# Round Robin Archives
|
||||
*self.rra_config
|
||||
)
|
||||
logger.info(f"Created system RRD for node {node_uuid}")
|
||||
return str(rrd_file)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create system RRD for {node_uuid}: {e}")
|
||||
raise
|
||||
|
||||
def _create_ping_rrd(self, node_uuid: str, target_uuid: str) -> str:
|
||||
"""Create RRD file for ping metrics between two nodes."""
|
||||
rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd"
|
||||
|
||||
if rrd_file.exists():
|
||||
return str(rrd_file)
|
||||
|
||||
try:
|
||||
rrdtool.create(
|
||||
str(rrd_file),
|
||||
"--step", str(self.step),
|
||||
# Data sources for ping metrics
|
||||
f"DS:latency:GAUGE:{self.heartbeat}:0:10000", # Ping latency in ms
|
||||
f"DS:loss:GAUGE:{self.heartbeat}:0:100", # Packet loss %
|
||||
# Round Robin Archives
|
||||
*self.rra_config
|
||||
)
|
||||
logger.info(f"Created ping RRD for {node_uuid} -> {target_uuid}")
|
||||
return str(rrd_file)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create ping RRD for {node_uuid}->{target_uuid}: {e}")
|
||||
raise
|
||||
|
||||
def update_system_metrics(self, node_uuid: str, timestamp: datetime,
|
||||
uptime_seconds: int, load_avg: List[float],
|
||||
memory_usage_percent: float):
|
||||
"""Update system metrics for a node."""
|
||||
try:
|
||||
rrd_file = self._create_system_rrd(node_uuid)
|
||||
|
||||
# Convert datetime to Unix timestamp
|
||||
unix_time = int(timestamp.timestamp())
|
||||
|
||||
# Format: timestamp:uptime:load1:load5:load15:memory
|
||||
values = f"{unix_time}:{uptime_seconds}:{load_avg[0]}:{load_avg[1]}:{load_avg[2]}:{memory_usage_percent}"
|
||||
|
||||
rrdtool.update(rrd_file, values)
|
||||
logger.debug(f"Updated system metrics for {node_uuid}: {values}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update system metrics for {node_uuid}: {e}")
|
||||
raise
|
||||
|
||||
def update_ping_metrics(self, node_uuid: str, target_uuid: str,
|
||||
timestamp: datetime, latency_ms: float):
|
||||
"""Update ping metrics between two nodes."""
|
||||
try:
|
||||
rrd_file = self._create_ping_rrd(node_uuid, target_uuid)
|
||||
|
||||
unix_time = int(timestamp.timestamp())
|
||||
|
||||
# For now, we only track latency. Loss can be calculated from missing updates
|
||||
values = f"{unix_time}:{latency_ms}:0" # 0% loss (could be enhanced)
|
||||
|
||||
rrdtool.update(rrd_file, values)
|
||||
logger.debug(f"Updated ping metrics {node_uuid}->{target_uuid}: {latency_ms}ms")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update ping metrics {node_uuid}->{target_uuid}: {e}")
|
||||
raise
|
||||
|
||||
def get_system_data(self, node_uuid: str, start_time: str = "-24h",
|
||||
end_time: str = "now") -> Optional[Dict]:
|
||||
"""Retrieve system metrics data for a node."""
|
||||
try:
|
||||
rrd_file = self._get_node_dir(node_uuid) / "system.rrd"
|
||||
if not rrd_file.exists():
|
||||
return None
|
||||
|
||||
result = rrdtool.fetch(
|
||||
str(rrd_file),
|
||||
"AVERAGE",
|
||||
"--start", start_time,
|
||||
"--end", end_time
|
||||
)
|
||||
|
||||
# Parse RRDtool fetch result
|
||||
start, end, step = result[0]
|
||||
ds_names = result[1] # ['uptime', 'load1', 'load5', 'load15', 'memory']
|
||||
data_points = result[2]
|
||||
|
||||
# Convert to more usable format
|
||||
timestamps = []
|
||||
data = {ds: [] for ds in ds_names}
|
||||
|
||||
current_time = start
|
||||
for point in data_points:
|
||||
timestamps.append(current_time)
|
||||
for i, ds in enumerate(ds_names):
|
||||
value = point[i] if point[i] is not None else 0
|
||||
data[ds].append(value)
|
||||
current_time += step
|
||||
|
||||
return {
|
||||
'timestamps': timestamps,
|
||||
'data': data,
|
||||
'step': step
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get system data for {node_uuid}: {e}")
|
||||
return None
|
||||
|
||||
def get_ping_data(self, node_uuid: str, target_uuid: str,
|
||||
start_time: str = "-24h", end_time: str = "now") -> Optional[Dict]:
|
||||
"""Retrieve ping metrics between two nodes."""
|
||||
try:
|
||||
rrd_file = self._get_node_dir(node_uuid) / f"ping_{target_uuid}.rrd"
|
||||
if not rrd_file.exists():
|
||||
return None
|
||||
|
||||
result = rrdtool.fetch(
|
||||
str(rrd_file),
|
||||
"AVERAGE",
|
||||
"--start", start_time,
|
||||
"--end", end_time
|
||||
)
|
||||
|
||||
start, end, step = result[0]
|
||||
ds_names = result[1] # ['latency', 'loss']
|
||||
data_points = result[2]
|
||||
|
||||
timestamps = []
|
||||
data = {ds: [] for ds in ds_names}
|
||||
|
||||
current_time = start
|
||||
for point in data_points:
|
||||
timestamps.append(current_time)
|
||||
for i, ds in enumerate(ds_names):
|
||||
value = point[i] if point[i] is not None else 0
|
||||
data[ds].append(value)
|
||||
current_time += step
|
||||
|
||||
return {
|
||||
'timestamps': timestamps,
|
||||
'data': data,
|
||||
'step': step
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get ping data {node_uuid}->{target_uuid}: {e}")
|
||||
return None
|
||||
|
||||
def list_nodes(self) -> List[str]:
|
||||
"""Get list of all nodes with RRD data."""
|
||||
try:
|
||||
nodes = []
|
||||
for item in self.data_dir.iterdir():
|
||||
if item.is_dir() and (item / "system.rrd").exists():
|
||||
nodes.append(item.name)
|
||||
return nodes
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list nodes: {e}")
|
||||
return []
|
||||
|
||||
def cleanup_old_data(self):
|
||||
"""Clean up RRD files older than 6 months (handled automatically by RRD retention)."""
|
||||
# RRD automatically handles data retention based on RRA configuration
|
||||
# This method could be used for cleaning up orphaned files
|
||||
cutoff_date = datetime.now() - timedelta(days=190) # 6+ months
|
||||
|
||||
try:
|
||||
for node_dir in self.data_dir.iterdir():
|
||||
if not node_dir.is_dir():
|
||||
continue
|
||||
|
||||
# Check if any RRD files have been modified recently
|
||||
rrd_files = list(node_dir.glob("*.rrd"))
|
||||
if not rrd_files:
|
||||
continue
|
||||
|
||||
# If all RRD files are old, the node is probably dead
|
||||
all_old = all(
|
||||
datetime.fromtimestamp(f.stat().st_mtime) < cutoff_date
|
||||
for f in rrd_files
|
||||
)
|
||||
|
||||
if all_old:
|
||||
logger.info(f"Node {node_dir.name} appears inactive for >6 months")
|
||||
# Could optionally remove the directory here
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed during cleanup: {e}")
|
Reference in New Issue
Block a user