Files
2026-04-17 20:15:24 +03:00

386 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Kattila Manager — Flask API and Web UI.
Receives agent reports, serves fleet status, and renders the network topology map.
"""
import hashlib
import ipaddress
import json
import logging
import os
import time
from flask import Flask, jsonify, request, render_template_string
import db
import security
import processor
# ── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
# ── App init ─────────────────────────────────────────────────────────────────
app = Flask(__name__)
def load_env():
"""Load .env file from parent directory."""
paths = [".env", "../.env"]
for p in paths:
if os.path.isfile(p):
with open(p) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, val = line.split("=", 1)
os.environ.setdefault(key, val)
break
load_env()
db.init_db()
security.start_key_poller()
# ── IP anonymization (from POC) ─────────────────────────────────────────────
def is_public_ip(ip_str: str) -> bool:
"""Returns True if the IP is a global, public address."""
try:
ip = ipaddress.ip_address(ip_str)
return ip.is_global and not ip.is_private and not ip.is_loopback
except ValueError:
return False
def anonymize_ip(ip: str) -> str:
"""
Private RFC1918 addresses: reveal subnet, hide host octet.
Public addresses: replace with a short stable SHA hash fingerprint.
"""
# Strip CIDR notation if present
ip_bare = ip.split("/")[0]
parts = ip_bare.split(".")
if len(parts) != 4:
return "???"
try:
first = int(parts[0])
except ValueError:
return "???"
if first == 10:
return f"10.{parts[1]}.*.*"
if first == 172 and 16 <= int(parts[1]) <= 31:
return f"172.{parts[1]}.*.*"
if first == 192 and parts[1] == "168":
return "192.168.*.*"
token = hashlib.sha256(ip_bare.encode()).hexdigest()[:6]
return f"[pub:{token}]"
# ── API Endpoints ────────────────────────────────────────────────────────────
@app.route("/status/healthcheck")
def healthcheck():
return jsonify({"status": "ok"})
@app.route("/status/updates", methods=["POST"])
def receive_update():
"""Main ingress point for agent reports (direct or relayed)."""
report = request.get_json(silent=True)
if not report:
return jsonify({"error": "invalid_json"}), 400
# Check if this is a relay envelope
if "relay_path" in report and "payload" in report:
inner = report["payload"]
valid, err = security.validate_report(inner)
if not valid:
logger.warning("Security check failed for relayed report: %s "
"(agent: %s, from: %s)",
err, inner.get("agent_id"), request.remote_addr)
return jsonify({"error": err}), 403
processor.process_relay(report)
return jsonify({"status": "ok", "relayed": True})
# Direct report
valid, err = security.validate_report(report)
if not valid:
logger.warning("Security check failed: %s (agent: %s, from: %s)",
err, report.get("agent_id"), request.remote_addr)
status_code = 403
if err == "hmac_invalid":
status_code = 401
return jsonify({"error": err}), status_code
processor.process_report(report)
return jsonify({"status": "ok"})
@app.route("/status/register", methods=["POST"])
def register_agent():
"""
Agent registration endpoint.
Agents auto-register via /status/updates, but this allows explicit
first-contact registration as well.
"""
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid_json"}), 400
agent_id = data.get("agent_id", "")
hostname = data.get("hostname", "unknown")
fleet_id = data.get("fleet_id", "")
if not agent_id:
return jsonify({"error": "missing_agent_id"}), 400
db.upsert_agent(agent_id, hostname, data.get("agent_version", 1),
fleet_id, 0)
logger.info("Registered agent %s (%s)", agent_id, hostname)
return jsonify({"status": "registered", "agent_id": agent_id})
@app.route("/status/alarms")
def get_alarms():
"""Return all active alarms."""
alarms = db.get_active_alarms()
return jsonify(alarms)
@app.route("/status/alarms/<int:alarm_id>/dismiss", methods=["POST"])
def dismiss_alarm(alarm_id):
"""Dismiss a specific alarm."""
db.dismiss_alarm(alarm_id)
return jsonify({"status": "dismissed"})
@app.route("/status/agents")
def list_agents():
"""List all known agents and their status."""
db.mark_stale_agents()
agents = db.get_all_agents()
# Enrich with interface data
for agent in agents:
ifaces = db.get_agent_interfaces(agent["agent_id"])
agent["interfaces"] = ifaces
return jsonify(agents)
@app.route("/status/admin/reset", methods=["POST"])
def admin_reset():
"""Reset a specific agent or the entire fleet."""
data = request.get_json(silent=True) or {}
agent_id = data.get("agent_id")
if agent_id:
db.reset_agent(agent_id)
return jsonify({"status": "reset", "agent_id": agent_id})
else:
db.reset_all()
return jsonify({"status": "full_reset"})
# ── Visualization data endpoint ─────────────────────────────────────────────
@app.route("/status/data")
def graph_data():
"""Return nodes and edges for the vis-network graph."""
db.mark_stale_agents()
agents = db.get_all_agents()
edges = db.get_all_edges()
now = time.time()
nodes = []
for a in agents:
ifaces = db.get_agent_interfaces(a["agent_id"])
# Determine level: 0 = has public IP (hub), 1 = private only (spoke)
level = 1
anon_ips = []
for iface in ifaces:
try:
addrs = json.loads(iface.get("addresses_json", "[]"))
except (json.JSONDecodeError, TypeError):
addrs = []
for addr in addrs:
ip_bare = addr.split("/")[0]
if is_public_ip(ip_bare):
level = 0
anon_ips.append(anonymize_ip(addr))
age = int(now - a["last_seen_at"])
is_alive = age < 300
nodes.append({
"id": a["agent_id"],
"label": f"<b>{a['hostname'].upper()}</b>\n"
f"{', '.join(anon_ips[:3])}",
"level": level,
"color": "#2ecc71" if is_alive else "#e74c3c",
"title": f"Agent: {a['agent_id']}\n"
f"Last seen: {age}s ago\n"
f"Status: {a['status']}",
})
vis_edges = []
seen_pairs: set[tuple] = set()
for e in edges:
pair = tuple(sorted([e["from_agent_id"], e["to_agent_id"]]))
if pair in seen_pairs:
continue
seen_pairs.add(pair)
meta = json.loads(e.get("metadata", "{}"))
handshake = meta.get("latest_handshake", 0)
if handshake == 0:
color = "#95a5a6"
elif (now - handshake) <= 120:
color = "#2ecc71"
elif (now - handshake) <= 86400:
color = "#f1c40f"
else:
color = "#e74c3c"
vis_edges.append({
"from": e["from_agent_id"],
"to": e["to_agent_id"],
"label": e["edge_type"],
"color": color,
"width": 3 if color == "#e74c3c" else 2,
"title": f"{e['edge_type']} link\n"
f"Handshake: {int(now - handshake)}s ago"
if handshake else f"{e['edge_type']} link\nNo handshake",
})
return jsonify({"nodes": nodes, "edges": vis_edges})
# ── Web UI (vis-network topology map) ────────────────────────────────────────
@app.route("/status")
def index():
return render_template_string(HTML_TEMPLATE)
HTML_TEMPLATE = """<!DOCTYPE html>
<html>
<head>
<title>Kattila — Network Map</title>
<script src="https://unpkg.com/vis-network/standalone/umd/vis-network.min.js"></script>
<style>
*, *::before, *::after { box-sizing: border-box; }
body {
font-family: 'Segoe UI', system-ui, sans-serif;
background: #0d1117; color: #e6edf3; margin: 0;
}
#map { width: 100vw; height: 100vh; }
#hud {
position: absolute; top: 12px; left: 12px;
background: rgba(13,17,23,0.88); backdrop-filter: blur(6px);
border: 1px solid #30363d; border-radius: 8px;
padding: 12px 16px; font-size: 12px; z-index: 10; min-width: 160px;
}
#hud b {
display: block; margin-bottom: 6px;
font-size: 13px; color: #58a6ff;
}
.dot {
display: inline-block; width: 9px; height: 9px;
border-radius: 50%; margin-right: 6px;
}
.row { margin: 3px 0; }
#clock {
margin-top: 10px; font-size: 10px; color: #8b949e;
border-top: 1px solid #30363d; padding-top: 6px;
}
#stats {
margin-top: 6px; font-size: 10px; color: #8b949e;
}
</style>
</head>
<body>
<div id="hud">
<b>Kattila Network Map</b>
<div class="row"><span class="dot" style="background:#2ecc71"></span>Active (&lt; 2 min)</div>
<div class="row"><span class="dot" style="background:#f1c40f"></span>Stale (&gt; 2 min)</div>
<div class="row"><span class="dot" style="background:#e74c3c"></span>Broken (&gt; 24 h)</div>
<div class="row"><span class="dot" style="background:#95a5a6"></span>No handshake</div>
<div id="stats"></div>
<div id="clock"></div>
</div>
<div id="map"></div>
<script>
const nodes = new vis.DataSet();
const edges = new vis.DataSet();
const net = new vis.Network(
document.getElementById('map'),
{ nodes, edges },
{
layout: {
hierarchical: {
direction: 'UD',
nodeSpacing: 300,
levelSeparation: 200
}
},
physics: false,
nodes: {
shape: 'box',
font: { multi: 'html', color: '#e6edf3', size: 13 },
margin: 10,
color: { background: '#161b22', border: '#30363d' },
},
edges: {
arrows: 'to',
font: { size: 10, color: '#8b949e', strokeWidth: 0 },
smooth: { type: 'curvedCW', roundness: 0.15 },
},
}
);
async function refresh() {
try {
const res = await fetch('/status/data');
const data = await res.json();
nodes.update(data.nodes);
edges.update(data.edges);
document.getElementById('stats').textContent =
data.nodes.length + ' nodes, ' + data.edges.length + ' links';
document.getElementById('clock').textContent =
'Updated ' + new Date().toLocaleTimeString();
} catch (e) {
document.getElementById('clock').textContent = 'Fetch error';
}
}
setInterval(refresh, 10000);
refresh();
</script>
</body>
</html>"""
# ── Main ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5086)