""" Data processor for Kattila Manager. Core business logic: processes validated agent reports and updates the database state (agents, interfaces, topology, alarms). """ import json import logging import db logger = logging.getLogger(__name__) def process_report(report: dict): """ Process a validated agent report. Called after security checks have passed. """ agent_id = report["agent_id"] tick = report["tick"] timestamp = report["timestamp"] agent_version = report.get("agent_version", 0) fleet_id = report.get("fleet_id", "") report_type = report.get("type", "report") data = report.get("data", {}) hostname = data.get("hostname", "unknown") # 1. Upsert agent (auto-register) db.upsert_agent(agent_id, hostname, agent_version, fleet_id, tick) logger.info("Processed report from %s (%s) tick=%d", agent_id, hostname, tick) # 2. Store raw report for auditing db.insert_report(agent_id, tick, timestamp, report_type, json.dumps(report)) # 3. Update interfaces and detect changes interfaces = data.get("interfaces", []) if interfaces: changes = db.update_interfaces(agent_id, interfaces) for change in changes: db.create_alarm(agent_id, "interface_change", {"description": change}) # 4. Update topology edges from wg_peers wg_peers = data.get("wg_peers", []) _update_topology(agent_id, wg_peers) def _update_topology(agent_id: str, wg_peers: list[dict]): """ Infer topology edges from wireguard peer data. Cross-references peer public keys against known agent interfaces to create edges. """ # Build a lookup of pubkey -> agent_id from all known agent interfaces # This is done per-report for simplicity; could be cached for performance all_agents = db.get_all_agents() # For each wg peer, try to match the public key to another known agent for peer in wg_peers: pubkey = peer.get("public_key", "") endpoint = peer.get("endpoint", "") iface = peer.get("interface", "") # Store the edge with metadata even if we can't resolve the target # agent yet — the metadata (pubkey, endpoint) is still valuable metadata = { "public_key": pubkey, "endpoint": endpoint, "interface": iface, "transfer_rx": peer.get("transfer_rx", 0), "transfer_tx": peer.get("transfer_tx", 0), "latest_handshake": peer.get("latest_handshake", 0), } # The target is unknown until we can cross-reference pubkeys # For now, use the pubkey hash as a placeholder target ID target_id = f"pubkey:{pubkey[:16]}" if pubkey else "unknown" db.upsert_edge(agent_id, target_id, "wireguard", metadata) def process_relay(envelope: dict): """ Process a relayed report envelope. Extracts the inner payload and processes it as a normal report after recording the relay path. """ relay_path = envelope.get("relay_path", []) payload = envelope.get("payload", {}) if not payload: logger.warning("Empty relay payload received") return logger.info("Processing relayed report via path: %s", " -> ".join(relay_path)) # Process the inner report normally process_report(payload) # Also record relay edges in the topology for i in range(len(relay_path) - 1): db.upsert_edge(relay_path[i], relay_path[i + 1], "relay")