""" Kattila Manager — Flask API and Web UI. Receives agent reports, serves fleet status, and renders the network topology map. """ import hashlib import ipaddress import json import logging import os import time from flask import Flask, jsonify, request, render_template_string import db import security import processor # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) logger = logging.getLogger(__name__) # ── App init ───────────────────────────────────────────────────────────────── app = Flask(__name__) def load_env(): """Load .env file from parent directory.""" paths = [".env", "../.env"] for p in paths: if os.path.isfile(p): with open(p) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, val = line.split("=", 1) os.environ.setdefault(key, val) break load_env() db.init_db() security.start_key_poller() # ── IP anonymization (from POC) ───────────────────────────────────────────── def is_public_ip(ip_str: str) -> bool: """Returns True if the IP is a global, public address.""" try: ip = ipaddress.ip_address(ip_str) return ip.is_global and not ip.is_private and not ip.is_loopback except ValueError: return False def anonymize_ip(ip: str) -> str: """ Private RFC1918 addresses: reveal subnet, hide host octet. Public addresses: replace with a short stable SHA hash fingerprint. """ # Strip CIDR notation if present ip_bare = ip.split("/")[0] parts = ip_bare.split(".") if len(parts) != 4: return "???" try: first = int(parts[0]) except ValueError: return "???" if first == 10: return f"10.{parts[1]}.*.*" if first == 172 and 16 <= int(parts[1]) <= 31: return f"172.{parts[1]}.*.*" if first == 192 and parts[1] == "168": return "192.168.*.*" token = hashlib.sha256(ip_bare.encode()).hexdigest()[:6] return f"[pub:{token}]" # ── API Endpoints ──────────────────────────────────────────────────────────── @app.route("/status/healthcheck") def healthcheck(): return jsonify({"status": "ok"}) @app.route("/status/updates", methods=["POST"]) def receive_update(): """Main ingress point for agent reports (direct or relayed).""" report = request.get_json(silent=True) if not report: return jsonify({"error": "invalid_json"}), 400 # Check if this is a relay envelope if "relay_path" in report and "payload" in report: inner = report["payload"] valid, err = security.validate_report(inner) if not valid: logger.warning("Security check failed for relayed report: %s " "(agent: %s, from: %s)", err, inner.get("agent_id"), request.remote_addr) return jsonify({"error": err}), 403 processor.process_relay(report) return jsonify({"status": "ok", "relayed": True}) # Direct report valid, err = security.validate_report(report) if not valid: logger.warning("Security check failed: %s (agent: %s, from: %s)", err, report.get("agent_id"), request.remote_addr) status_code = 403 if err == "hmac_invalid": status_code = 401 return jsonify({"error": err}), status_code processor.process_report(report) return jsonify({"status": "ok"}) @app.route("/status/register", methods=["POST"]) def register_agent(): """ Agent registration endpoint. Agents auto-register via /status/updates, but this allows explicit first-contact registration as well. """ data = request.get_json(silent=True) if not data: return jsonify({"error": "invalid_json"}), 400 agent_id = data.get("agent_id", "") hostname = data.get("hostname", "unknown") fleet_id = data.get("fleet_id", "") if not agent_id: return jsonify({"error": "missing_agent_id"}), 400 db.upsert_agent(agent_id, hostname, data.get("agent_version", 1), fleet_id, 0) logger.info("Registered agent %s (%s)", agent_id, hostname) return jsonify({"status": "registered", "agent_id": agent_id}) @app.route("/status/alarms") def get_alarms(): """Return all active alarms.""" alarms = db.get_active_alarms() return jsonify(alarms) @app.route("/status/alarms//dismiss", methods=["POST"]) def dismiss_alarm(alarm_id): """Dismiss a specific alarm.""" db.dismiss_alarm(alarm_id) return jsonify({"status": "dismissed"}) @app.route("/status/agents") def list_agents(): """List all known agents and their status.""" db.mark_stale_agents() agents = db.get_all_agents() # Enrich with interface data for agent in agents: ifaces = db.get_agent_interfaces(agent["agent_id"]) agent["interfaces"] = ifaces return jsonify(agents) @app.route("/status/admin/reset", methods=["POST"]) def admin_reset(): """Reset a specific agent or the entire fleet.""" data = request.get_json(silent=True) or {} agent_id = data.get("agent_id") if agent_id: db.reset_agent(agent_id) return jsonify({"status": "reset", "agent_id": agent_id}) else: db.reset_all() return jsonify({"status": "full_reset"}) # ── Visualization data endpoint ───────────────────────────────────────────── @app.route("/status/data") def graph_data(): """Return nodes and edges for the vis-network graph.""" db.mark_stale_agents() agents = db.get_all_agents() edges = db.get_all_edges() now = time.time() nodes = [] for a in agents: ifaces = db.get_agent_interfaces(a["agent_id"]) # Determine level: 0 = has public IP (hub), 1 = private only (spoke) level = 1 anon_ips = [] for iface in ifaces: try: addrs = json.loads(iface.get("addresses_json", "[]")) except (json.JSONDecodeError, TypeError): addrs = [] for addr in addrs: ip_bare = addr.split("/")[0] if is_public_ip(ip_bare): level = 0 anon_ips.append(anonymize_ip(addr)) age = int(now - a["last_seen_at"]) is_alive = age < 300 nodes.append({ "id": a["agent_id"], "label": f"{a['hostname'].upper()}\n" f"{', '.join(anon_ips[:3])}", "level": level, "color": "#2ecc71" if is_alive else "#e74c3c", "title": f"Agent: {a['agent_id']}\n" f"Last seen: {age}s ago\n" f"Status: {a['status']}", }) vis_edges = [] seen_pairs: set[tuple] = set() for e in edges: pair = tuple(sorted([e["from_agent_id"], e["to_agent_id"]])) if pair in seen_pairs: continue seen_pairs.add(pair) meta = json.loads(e.get("metadata", "{}")) handshake = meta.get("latest_handshake", 0) if handshake == 0: color = "#95a5a6" elif (now - handshake) <= 120: color = "#2ecc71" elif (now - handshake) <= 86400: color = "#f1c40f" else: color = "#e74c3c" vis_edges.append({ "from": e["from_agent_id"], "to": e["to_agent_id"], "label": e["edge_type"], "color": color, "width": 3 if color == "#e74c3c" else 2, "title": f"{e['edge_type']} link\n" f"Handshake: {int(now - handshake)}s ago" if handshake else f"{e['edge_type']} link\nNo handshake", }) return jsonify({"nodes": nodes, "edges": vis_edges}) # ── Web UI (vis-network topology map) ──────────────────────────────────────── @app.route("/status") def index(): return render_template_string(HTML_TEMPLATE) HTML_TEMPLATE = """ Kattila — Network Map
Kattila Network Map
Active (< 2 min)
Stale (> 2 min)
Broken (> 24 h)
No handshake
""" # ── Main ───────────────────────────────────────────────────────────────────── if __name__ == "__main__": app.run(host="0.0.0.0", port=5086)