Files
kattila.status/manager/processor.py
2026-04-17 20:15:24 +03:00

106 lines
3.4 KiB
Python

"""
Data processor for Kattila Manager.
Core business logic: processes validated agent reports and updates
the database state (agents, interfaces, topology, alarms).
"""
import json
import logging
import db
logger = logging.getLogger(__name__)
def process_report(report: dict):
"""
Process a validated agent report.
Called after security checks have passed.
"""
agent_id = report["agent_id"]
tick = report["tick"]
timestamp = report["timestamp"]
agent_version = report.get("agent_version", 0)
fleet_id = report.get("fleet_id", "")
report_type = report.get("type", "report")
data = report.get("data", {})
hostname = data.get("hostname", "unknown")
# 1. Upsert agent (auto-register)
db.upsert_agent(agent_id, hostname, agent_version, fleet_id, tick)
logger.info("Processed report from %s (%s) tick=%d", agent_id, hostname, tick)
# 2. Store raw report for auditing
db.insert_report(agent_id, tick, timestamp, report_type,
json.dumps(report))
# 3. Update interfaces and detect changes
interfaces = data.get("interfaces", [])
if interfaces:
changes = db.update_interfaces(agent_id, interfaces)
for change in changes:
db.create_alarm(agent_id, "interface_change",
{"description": change})
# 4. Update topology edges from wg_peers
wg_peers = data.get("wg_peers", [])
_update_topology(agent_id, wg_peers)
def _update_topology(agent_id: str, wg_peers: list[dict]):
"""
Infer topology edges from wireguard peer data.
Cross-references peer public keys against known agent interfaces
to create edges.
"""
# Build a lookup of pubkey -> agent_id from all known agent interfaces
# This is done per-report for simplicity; could be cached for performance
all_agents = db.get_all_agents()
# For each wg peer, try to match the public key to another known agent
for peer in wg_peers:
pubkey = peer.get("public_key", "")
endpoint = peer.get("endpoint", "")
iface = peer.get("interface", "")
# Store the edge with metadata even if we can't resolve the target
# agent yet — the metadata (pubkey, endpoint) is still valuable
metadata = {
"public_key": pubkey,
"endpoint": endpoint,
"interface": iface,
"transfer_rx": peer.get("transfer_rx", 0),
"transfer_tx": peer.get("transfer_tx", 0),
"latest_handshake": peer.get("latest_handshake", 0),
}
# The target is unknown until we can cross-reference pubkeys
# For now, use the pubkey hash as a placeholder target ID
target_id = f"pubkey:{pubkey[:16]}" if pubkey else "unknown"
db.upsert_edge(agent_id, target_id, "wireguard", metadata)
def process_relay(envelope: dict):
"""
Process a relayed report envelope.
Extracts the inner payload and processes it as a normal report
after recording the relay path.
"""
relay_path = envelope.get("relay_path", [])
payload = envelope.get("payload", {})
if not payload:
logger.warning("Empty relay payload received")
return
logger.info("Processing relayed report via path: %s", " -> ".join(relay_path))
# Process the inner report normally
process_report(payload)
# Also record relay edges in the topology
for i in range(len(relay_path) - 1):
db.upsert_edge(relay_path[i], relay_path[i + 1], "relay")