106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
"""
|
|
Data processor for Kattila Manager.
|
|
Core business logic: processes validated agent reports and updates
|
|
the database state (agents, interfaces, topology, alarms).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
|
|
import db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def process_report(report: dict):
|
|
"""
|
|
Process a validated agent report.
|
|
Called after security checks have passed.
|
|
"""
|
|
agent_id = report["agent_id"]
|
|
tick = report["tick"]
|
|
timestamp = report["timestamp"]
|
|
agent_version = report.get("agent_version", 0)
|
|
fleet_id = report.get("fleet_id", "")
|
|
report_type = report.get("type", "report")
|
|
data = report.get("data", {})
|
|
|
|
hostname = data.get("hostname", "unknown")
|
|
|
|
# 1. Upsert agent (auto-register)
|
|
db.upsert_agent(agent_id, hostname, agent_version, fleet_id, tick)
|
|
logger.info("Processed report from %s (%s) tick=%d", agent_id, hostname, tick)
|
|
|
|
# 2. Store raw report for auditing
|
|
db.insert_report(agent_id, tick, timestamp, report_type,
|
|
json.dumps(report))
|
|
|
|
# 3. Update interfaces and detect changes
|
|
interfaces = data.get("interfaces", [])
|
|
if interfaces:
|
|
changes = db.update_interfaces(agent_id, interfaces)
|
|
for change in changes:
|
|
db.create_alarm(agent_id, "interface_change",
|
|
{"description": change})
|
|
|
|
# 4. Update topology edges from wg_peers
|
|
wg_peers = data.get("wg_peers", [])
|
|
_update_topology(agent_id, wg_peers)
|
|
|
|
|
|
def _update_topology(agent_id: str, wg_peers: list[dict]):
|
|
"""
|
|
Infer topology edges from wireguard peer data.
|
|
Cross-references peer public keys against known agent interfaces
|
|
to create edges.
|
|
"""
|
|
# Build a lookup of pubkey -> agent_id from all known agent interfaces
|
|
# This is done per-report for simplicity; could be cached for performance
|
|
all_agents = db.get_all_agents()
|
|
|
|
# For each wg peer, try to match the public key to another known agent
|
|
for peer in wg_peers:
|
|
pubkey = peer.get("public_key", "")
|
|
endpoint = peer.get("endpoint", "")
|
|
iface = peer.get("interface", "")
|
|
|
|
# Store the edge with metadata even if we can't resolve the target
|
|
# agent yet — the metadata (pubkey, endpoint) is still valuable
|
|
metadata = {
|
|
"public_key": pubkey,
|
|
"endpoint": endpoint,
|
|
"interface": iface,
|
|
"transfer_rx": peer.get("transfer_rx", 0),
|
|
"transfer_tx": peer.get("transfer_tx", 0),
|
|
"latest_handshake": peer.get("latest_handshake", 0),
|
|
}
|
|
|
|
# The target is unknown until we can cross-reference pubkeys
|
|
# For now, use the pubkey hash as a placeholder target ID
|
|
target_id = f"pubkey:{pubkey[:16]}" if pubkey else "unknown"
|
|
|
|
db.upsert_edge(agent_id, target_id, "wireguard", metadata)
|
|
|
|
|
|
def process_relay(envelope: dict):
|
|
"""
|
|
Process a relayed report envelope.
|
|
Extracts the inner payload and processes it as a normal report
|
|
after recording the relay path.
|
|
"""
|
|
relay_path = envelope.get("relay_path", [])
|
|
payload = envelope.get("payload", {})
|
|
|
|
if not payload:
|
|
logger.warning("Empty relay payload received")
|
|
return
|
|
|
|
logger.info("Processing relayed report via path: %s", " -> ".join(relay_path))
|
|
|
|
# Process the inner report normally
|
|
process_report(payload)
|
|
|
|
# Also record relay edges in the topology
|
|
for i in range(len(relay_path) - 1):
|
|
db.upsert_edge(relay_path[i], relay_path[i + 1], "relay")
|