""" Database layer for Kattila Manager. SQLite with WAL mode. All schemas from DESIGN.md are created on init. """ import sqlite3 import json import time import threading import logging logger = logging.getLogger(__name__) DB_PATH = "kattila_manager.db" _local = threading.local() def get_conn() -> sqlite3.Connection: """Return a thread-local SQLite connection.""" if not hasattr(_local, "conn") or _local.conn is None: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA foreign_keys=ON;") _local.conn = conn return _local.conn def init_db(): """Create all tables and indexes if they don't exist.""" conn = get_conn() conn.executescript(""" CREATE TABLE IF NOT EXISTS agents ( agent_id TEXT PRIMARY KEY, hostname TEXT NOT NULL, agent_version INTEGER NOT NULL, fleet_id TEXT NOT NULL, registered_at INTEGER NOT NULL, last_seen_at INTEGER NOT NULL, last_tick INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'online' ); CREATE INDEX IF NOT EXISTS idx_agents_last_seen ON agents(last_seen_at); CREATE TABLE IF NOT EXISTS reports ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, tick INTEGER NOT NULL, timestamp INTEGER NOT NULL, report_type TEXT NOT NULL, report_json TEXT NOT NULL, received_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE ); CREATE UNIQUE INDEX IF NOT EXISTS idx_reports_agent_tick ON reports(agent_id, tick); CREATE TABLE IF NOT EXISTS topology_edges ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_agent_id TEXT NOT NULL, to_agent_id TEXT NOT NULL, edge_type TEXT NOT NULL, metadata TEXT DEFAULT '{}', last_seen INTEGER NOT NULL, is_active INTEGER NOT NULL DEFAULT 1 ); CREATE UNIQUE INDEX IF NOT EXISTS idx_edges_pair ON topology_edges(from_agent_id, to_agent_id, edge_type); CREATE TABLE IF NOT EXISTS agent_interfaces ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, interface_name TEXT NOT NULL, mac_address TEXT, addresses_json TEXT, is_virtual INTEGER NOT NULL DEFAULT 0, vpn_type TEXT, last_seen_at INTEGER NOT NULL, FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE ); CREATE UNIQUE INDEX IF NOT EXISTS idx_agent_interfaces ON agent_interfaces(agent_id, interface_name); CREATE TABLE IF NOT EXISTS alarms ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_id TEXT NOT NULL, alarm_type TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', details_json TEXT DEFAULT '{}', created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), dismissed_at INTEGER, FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_alarms_agent_status ON alarms(agent_id, status); """) conn.commit() logger.info("Database initialized at %s", DB_PATH) # ── Agent operations ───────────────────────────────────────────────────────── def upsert_agent(agent_id: str, hostname: str, agent_version: int, fleet_id: str, tick: int): """Insert or update an agent record (auto-register on first valid report).""" now = int(time.time()) conn = get_conn() conn.execute(""" INSERT INTO agents (agent_id, hostname, agent_version, fleet_id, registered_at, last_seen_at, last_tick, status) VALUES (?, ?, ?, ?, ?, ?, ?, 'online') ON CONFLICT(agent_id) DO UPDATE SET hostname = excluded.hostname, agent_version = excluded.agent_version, fleet_id = excluded.fleet_id, last_seen_at = excluded.last_seen_at, last_tick = excluded.last_tick, status = 'online' """, (agent_id, hostname, agent_version, fleet_id, now, now, tick)) conn.commit() def get_all_agents() -> list[dict]: conn = get_conn() rows = conn.execute("SELECT * FROM agents ORDER BY last_seen_at DESC").fetchall() return [dict(r) for r in rows] def get_agent(agent_id: str) -> dict | None: conn = get_conn() row = conn.execute("SELECT * FROM agents WHERE agent_id = ?", (agent_id,)).fetchone() return dict(row) if row else None def delete_agent(agent_id: str): conn = get_conn() conn.execute("DELETE FROM agents WHERE agent_id = ?", (agent_id,)) conn.commit() def mark_stale_agents(timeout_seconds: int = 120): """Mark agents as offline if not seen within timeout.""" cutoff = int(time.time()) - timeout_seconds conn = get_conn() conn.execute(""" UPDATE agents SET status = 'offline' WHERE last_seen_at < ? AND status != 'offline' """, (cutoff,)) conn.commit() # ── Report operations ──────────────────────────────────────────────────────── def insert_report(agent_id: str, tick: int, timestamp: int, report_type: str, report_json: str): conn = get_conn() try: conn.execute(""" INSERT INTO reports (agent_id, tick, timestamp, report_type, report_json) VALUES (?, ?, ?, ?, ?) """, (agent_id, tick, timestamp, report_type, report_json)) conn.commit() except sqlite3.IntegrityError: logger.warning("Duplicate report from %s tick %d — skipping", agent_id, tick) # ── Interface operations ───────────────────────────────────────────────────── def update_interfaces(agent_id: str, interfaces: list[dict]) -> list[str]: """ Update agent_interfaces table. Returns list of change descriptions (for alarm generation). """ now = int(time.time()) conn = get_conn() changes = [] # Get current known interfaces for this agent existing = conn.execute( "SELECT interface_name FROM agent_interfaces WHERE agent_id = ?", (agent_id,) ).fetchall() existing_names = {r["interface_name"] for r in existing} reported_names = set() for iface in interfaces: name = iface.get("name", "") if not name: continue reported_names.add(name) addresses = json.dumps(iface.get("addresses", [])) is_virtual = 1 if iface.get("is_virtual", False) else 0 vpn_type = iface.get("vpn_type") conn.execute(""" INSERT INTO agent_interfaces (agent_id, interface_name, mac_address, addresses_json, is_virtual, vpn_type, last_seen_at) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(agent_id, interface_name) DO UPDATE SET mac_address = excluded.mac_address, addresses_json = excluded.addresses_json, is_virtual = excluded.is_virtual, vpn_type = excluded.vpn_type, last_seen_at = excluded.last_seen_at """, (agent_id, name, iface.get("mac", ""), addresses, is_virtual, vpn_type, now)) # Detect disappeared interfaces disappeared = existing_names - reported_names for ifname in disappeared: changes.append(f"Interface '{ifname}' disappeared") # Detect new interfaces appeared = reported_names - existing_names for ifname in appeared: if existing_names: # only alarm if agent had prior interfaces changes.append(f"New interface '{ifname}' appeared") conn.commit() return changes def get_agent_interfaces(agent_id: str) -> list[dict]: conn = get_conn() rows = conn.execute( "SELECT * FROM agent_interfaces WHERE agent_id = ?", (agent_id,) ).fetchall() return [dict(r) for r in rows] # ── Topology edge operations ──────────────────────────────────────────────── def upsert_edge(from_agent_id: str, to_agent_id: str, edge_type: str, metadata: dict | None = None): now = int(time.time()) meta_json = json.dumps(metadata or {}) conn = get_conn() conn.execute(""" INSERT INTO topology_edges (from_agent_id, to_agent_id, edge_type, metadata, last_seen, is_active) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(from_agent_id, to_agent_id, edge_type) DO UPDATE SET metadata = excluded.metadata, last_seen = excluded.last_seen, is_active = 1 """, (from_agent_id, to_agent_id, edge_type, meta_json, now)) conn.commit() def get_all_edges() -> list[dict]: conn = get_conn() rows = conn.execute("SELECT * FROM topology_edges WHERE is_active = 1").fetchall() return [dict(r) for r in rows] # ── Alarm operations ──────────────────────────────────────────────────────── def create_alarm(agent_id: str, alarm_type: str, details: dict | None = None): conn = get_conn() conn.execute(""" INSERT INTO alarms (agent_id, alarm_type, details_json) VALUES (?, ?, ?) """, (agent_id, alarm_type, json.dumps(details or {}))) conn.commit() logger.info("ALARM [%s] %s: %s", agent_id, alarm_type, details) def get_active_alarms() -> list[dict]: conn = get_conn() rows = conn.execute( "SELECT * FROM alarms WHERE status = 'active' ORDER BY created_at DESC" ).fetchall() return [dict(r) for r in rows] def dismiss_alarm(alarm_id: int): now = int(time.time()) conn = get_conn() conn.execute(""" UPDATE alarms SET status = 'dismissed', dismissed_at = ? WHERE id = ? """, (now, alarm_id)) conn.commit() def reset_agent(agent_id: str): """Wipe all state for a specific agent.""" conn = get_conn() conn.execute("DELETE FROM agents WHERE agent_id = ?", (agent_id,)) conn.commit() logger.info("Reset all state for agent %s", agent_id) def reset_all(): """Wipe the entire fleet state.""" conn = get_conn() conn.execute("DELETE FROM topology_edges") conn.execute("DELETE FROM agent_interfaces") conn.execute("DELETE FROM reports") conn.execute("DELETE FROM alarms") conn.execute("DELETE FROM agents") conn.commit() logger.info("Full fleet reset executed")