From 51e0355ec838ff5201b06e39cb62a7bb5f6a0e7c Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Fri, 17 Apr 2026 17:31:06 +0300 Subject: [PATCH] Clean up before starting planning. --- agent.py | 275 --------------------------------- kattila01.py | 288 ----------------------------------- kattila.py => kattila_poc.py | 0 3 files changed, 563 deletions(-) delete mode 100644 agent.py delete mode 100644 kattila01.py rename kattila.py => kattila_poc.py (100%) diff --git a/agent.py b/agent.py deleted file mode 100644 index 159b83d..0000000 --- a/agent.py +++ /dev/null @@ -1,275 +0,0 @@ -""" -kattila Agent -- Reports node state to manager (direct or via WG peer relay) -- Runs a relay server for segregated peers -- Authenticates with a pre-shared key (file, env, or DNS TXT) -""" - -import subprocess -import json -import requests -import socket -import time -import threading -import os -import logging - -from flask import Flask, request as freq, jsonify - -# ── CONFIG ──────────────────────────────────────────────────────────────────── - -MANAGER_URL = os.environ.get("MANAGER_URL", "http://10.37.11.2:5086/status/api/report") -RELAY_PORT = int(os.environ.get("RELAY_PORT", "5087")) -REPORT_INTERVAL = int(os.environ.get("REPORT_INTERVAL", "60")) -PSK_FILE = os.environ.get("PSK_FILE", "/etc/kattila.status/psk") -PSK_DNS_RECORD = os.environ.get("PSK_DNS_RECORD", "") # e.g. "_kattila.homelab.local" - - -# ── PSK LOADING ─────────────────────────────────────────────────────────────── - -def _load_psk_dns(record: str) -> str | None: - """ - Fetch PSK from a DNS TXT record. - - Store the key in your local/private DNS zone as: - _kattila.homelab.local. TXT "psk=your-key-here" - - This lets you rotate the key on all nodes without touching any config file — - just update the DNS record. Agents re-fetch on each startup (cached to disk - as fallback below). Only use a private/internal zone — never a public one. - """ - try: - out = subprocess.check_output( - ["dig", "+short", "TXT", record], - stderr=subprocess.DEVNULL, timeout=5 - ).decode() - for line in out.splitlines(): - clean = line.strip().strip('"') - if clean.startswith("psk="): - return clean[4:] - except Exception: - pass - return None - - -def load_psk() -> str: - key: str | None = None - - # 1. DNS TXT (if configured) — good for zero-touch key rotation - if PSK_DNS_RECORD: - key = _load_psk_dns(PSK_DNS_RECORD) - if key: - # Cache to disk for offline starts - try: - os.makedirs(os.path.dirname(PSK_FILE), exist_ok=True) - with open(PSK_FILE, "w") as f: - f.write(key) - except Exception: - pass - - # 2. File (also DNS cache fallback) - if not key and os.path.isfile(PSK_FILE): - with open(PSK_FILE) as f: - key = f.read().strip() - - # 3. Environment variable - if not key: - key = os.environ.get("kattila_PSK", "") - - if not key: - raise RuntimeError( - "No PSK found. Set kattila_PSK env var, write to " - f"{PSK_FILE}, or configure PSK_DNS_RECORD." - ) - return key - - -PSK = load_psk() -AUTH_HEADERS = {"X-kattila-PSK": PSK, "Content-Type": "application/json"} - - -# ── RELAY SERVER ────────────────────────────────────────────────────────────── - -relay_app = Flask("kattila-relay") -logging.getLogger("werkzeug").setLevel(logging.ERROR) - - -@relay_app.route("/relay", methods=["POST"]) -def relay(): - if freq.headers.get("X-kattila-PSK") != PSK: - return jsonify({"error": "unauthorized"}), 401 - try: - resp = requests.post( - MANAGER_URL, json=freq.get_json(), headers=AUTH_HEADERS, timeout=10 - ) - return jsonify(resp.json()), resp.status_code - except Exception as e: - return jsonify({"error": str(e)}), 502 - - -@relay_app.route("/health") -def health(): - return jsonify({"status": "ok"}) - - -def _run_relay(): - relay_app.run(host="0.0.0.0", port=RELAY_PORT, use_reloader=False) - - -# ── WIREGUARD HELPERS ───────────────────────────────────────────────────────── - -def get_wg_interface_names() -> set[str]: - """Return the names of actual WireGuard interfaces on this host.""" - try: - out = subprocess.check_output( - ["wg", "show", "interfaces"], stderr=subprocess.DEVNULL - ).decode().strip() - return set(out.split()) if out else set() - except Exception: - return set() - - -def parse_wg_dump() -> list[dict]: - """ - Parse 'wg show all dump'. Peer lines have 9 tab-separated fields: - ifname pubkey psk endpoint allowed_ips handshake rx tx keepalive - Interface lines have 5 fields (private_key, public_key, listen_port, fwmark) - and are skipped. - """ - peers = [] - try: - raw = subprocess.check_output( - ["wg", "show", "all", "dump"], stderr=subprocess.DEVNULL - ).decode().strip() - for line in raw.splitlines(): - parts = line.split("\t") - if len(parts) == 9: # peer line - ifname, pubkey, _psk, endpoint, allowed_ips, handshake_ts, *_ = parts - handshake_ts = int(handshake_ts) - idle = 0 if handshake_ts == 0 else int(time.time()) - handshake_ts - peers.append({ - "ifname": ifname, - "pubkey": pubkey, - "allowed_ips": allowed_ips, - "handshake": idle, - "status": "ok" if 0 < idle < 120 else "stale", - }) - except Exception as e: - print(f"[!] WireGuard dump error: {e}") - return peers - - -def get_peer_relay_ips() -> list[str]: - """ - Extract the /32 tunnel IPs of WireGuard peers — used to find relay candidates - when direct manager reporting fails. - """ - ips = [] - for peer in parse_wg_dump(): - for cidr in peer["allowed_ips"].split(","): - cidr = cidr.strip() - if "." in cidr and cidr.endswith("/32"): - ips.append(cidr[:-3]) - return ips - - -# ── DATA COLLECTION ─────────────────────────────────────────────────────────── - -def get_data() -> dict: - hostname = socket.gethostname() - wg_ifaces = get_wg_interface_names() - ifaces = [] - - try: - ip_data = json.loads( - subprocess.check_output( - ["ip", "-j", "addr"], stderr=subprocess.DEVNULL - ).decode() - ) - for item in ip_data: - name = item["ifname"] - if name == "lo" or name.startswith("br-") or name.startswith("docker"): - continue - addr = next( - (a["local"] for a in item.get("addr_info", []) if a["family"] == "inet"), - None, - ) - if not addr: - continue - - entry: dict = {"name": name, "ip": addr} - - # Only query WireGuard for actual WG interfaces — avoids - # "Unable to access interface: Operation not supported" spam - if name in wg_ifaces: - try: - pubkey = subprocess.check_output( - ["wg", "show", name, "public-key"], stderr=subprocess.DEVNULL - ).decode().strip() - entry["public_key"] = pubkey - except Exception: - pass - - ifaces.append(entry) - - except Exception as e: - print(f"[!] Interface collection error: {e}") - - return { - "hostname": hostname, - "interfaces": ifaces, - "wg_peers": parse_wg_dump(), - } - - -# ── REPORTING ───────────────────────────────────────────────────────────────── - -def report(payload: dict) -> bool: - hostname = payload["hostname"] - - # 1. Direct report to manager - try: - r = requests.post(MANAGER_URL, json=payload, headers=AUTH_HEADERS, timeout=10) - if r.status_code == 200: - print(f"[+] {hostname} → manager (direct)") - return True - print(f"[!] Manager returned {r.status_code}") - except Exception as e: - print(f"[!] Direct report failed: {e}") - - # 2. Relay through reachable WireGuard peers - candidates = get_peer_relay_ips() - if not candidates: - print("[-] No relay candidates found") - return False - - for ip in candidates: - url = f"http://{ip}:{RELAY_PORT}/relay" - try: - r = requests.post(url, json=payload, headers=AUTH_HEADERS, timeout=8) - if r.status_code == 200: - print(f"[+] {hostname} → relay {ip}") - return True - print(f"[!] Relay {ip} returned {r.status_code}") - except Exception as e: - print(f"[!] Relay {ip} unreachable: {e}") - - print(f"[-] All reporting paths failed for {hostname}") - return False - - -# ── MAIN ────────────────────────────────────────────────────────────────────── - -if __name__ == "__main__": - t = threading.Thread(target=_run_relay, daemon=True) - t.start() - print(f"[*] Relay server listening on :{RELAY_PORT}") - print(f"[*] Reporting to {MANAGER_URL} every {REPORT_INTERVAL}s") - - while True: - try: - payload = get_data() - report(payload) - except Exception as e: - print(f"[!] Unexpected error: {e}") - time.sleep(REPORT_INTERVAL) diff --git a/kattila01.py b/kattila01.py deleted file mode 100644 index 0ec329d..0000000 --- a/kattila01.py +++ /dev/null @@ -1,288 +0,0 @@ -""" -kattila Manager -- Accepts reports from agents (direct or relayed) -- Requires pre-shared key authentication -- Anonymizes IPs in the public-facing graph -""" - -import sqlite3 -import hashlib -import time -import os - -from flask import Flask, render_template_string, jsonify, request, abort - -app = Flask(__name__) -DB_PATH = "network.db" - -# ── PSK AUTH ────────────────────────────────────────────────────────────────── - -PSK_FILE = os.environ.get("PSK_FILE", "/etc/kattila/psk") - - -def load_psk() -> str: - if os.path.isfile(PSK_FILE): - with open(PSK_FILE) as f: - return f.read().strip() - key = os.environ.get("kattila_PSK", "") - if not key: - raise RuntimeError(f"No PSK found. Write to {PSK_FILE} or set kattila_PSK.") - return key - - -PSK = load_psk() - - -def require_psk(): - if request.headers.get("X-kattila-PSK") != PSK: - abort(401) - - -# ── DATABASE ────────────────────────────────────────────────────────────────── - - -def init_db(): - with sqlite3.connect(DB_PATH) as conn: - conn.execute("""CREATE TABLE IF NOT EXISTS servers - (name TEXT PRIMARY KEY, last_seen INTEGER, level INTEGER)""") - conn.execute("""CREATE TABLE IF NOT EXISTS interfaces - (server_name TEXT, ifname TEXT, local_ip TEXT, public_key TEXT)""") - conn.execute( - """CREATE TABLE IF NOT EXISTS wg_peers - (server_name TEXT, ifname TEXT, peer_pubkey TEXT, handshake INTEGER, status TEXT)""" - ) - - -init_db() - -# level 0 = public hub, level 1 = private spoke -SERVER_CONFIG = { - "pussi": 0, - "node03": 0, - "koti": 1, - "tuoppi2": 1, - "node04": 1, -} - - -# ── IP ANONYMIZATION ────────────────────────────────────────────────────────── - - -def anonymize_ip(ip: str) -> str: - """ - Private RFC1918 addresses: reveal subnet, hide host octet. - Public addresses: replace with a short stable hash (same IP always - produces the same token, so topology is still readable). - """ - parts = ip.split(".") - if len(parts) != 4: - return "???" - - first, second = parts[0], parts[1] - try: - f = int(first) - except ValueError: - return "???" - - if f == 10: - return f"10.{second}.*.*" - if f == 172 and 16 <= int(second) <= 31: - return f"172.{second}.*.*" - if f == 192 and second == "168": - return f"192.168.*.*" - - # Public IP — stable 6-char token so the graph stays consistent across - # refreshes without leaking the actual address - token = hashlib.sha256(ip.encode()).hexdigest()[:6] - return f"[pub:{token}]" - - -# ── API ─────────────────────────────────────────────────────────────────────── - - -@app.route("/api/report", methods=["POST"]) -def api_report(): - require_psk() - data = request.json - hostname = data["hostname"].lower() - now = int(time.time()) - level = SERVER_CONFIG.get(hostname, 1) - - with sqlite3.connect(DB_PATH) as conn: - conn.execute( - "INSERT OR REPLACE INTO servers VALUES (?, ?, ?)", - (hostname, now, level), - ) - conn.execute("DELETE FROM interfaces WHERE server_name = ?", (hostname,)) - for iface in data.get("interfaces", []): - conn.execute( - "INSERT INTO interfaces VALUES (?, ?, ?, ?)", - (hostname, iface["name"], iface["ip"], iface.get("public_key")), - ) - conn.execute("DELETE FROM wg_peers WHERE server_name = ?", (hostname,)) - for peer in data.get("wg_peers", []): - conn.execute( - "INSERT INTO wg_peers VALUES (?, ?, ?, ?, ?)", - ( - hostname, - peer["ifname"], - peer["pubkey"], - peer["handshake"], - peer["status"], - ), - ) - - return jsonify({"status": "ok"}) - - -@app.route("/status/data") -def get_graph_data(): - with sqlite3.connect(DB_PATH) as conn: - conn.row_factory = sqlite3.Row - servers = conn.execute("SELECT * FROM servers").fetchall() - interfaces = conn.execute("SELECT * FROM interfaces").fetchall() - peers = conn.execute("SELECT * FROM wg_peers").fetchall() - - key_to_server = { - i["public_key"]: i["server_name"] for i in interfaces if i["public_key"] - } - - now = time.time() - - nodes = [] - for s in servers: - anon_ips = [ - anonymize_ip(i["local_ip"]) - for i in interfaces - if i["server_name"] == s["name"] - ] - age = int(now - s["last_seen"]) - is_alive = age < 300 - nodes.append( - { - "id": s["name"], - "label": f"{s['name'].upper()}\n{', '.join(anon_ips[:2])}", - "level": s["level"], - "color": "#2ecc71" if is_alive else "#e74c3c", - "title": f"Last seen {age}s ago", # hover tooltip - } - ) - - edges = [] - seen_pairs: set[tuple] = set() - - for p in peers: - target = key_to_server.get(p["peer_pubkey"]) - if not target: - continue - - # De-duplicate bidirectional edges (A→B and B→A → one edge) - pair = tuple(sorted([p["server_name"], target])) - if pair in seen_pairs: - continue - seen_pairs.add(pair) - - h = p["handshake"] - if h == 0: - color = "#95a5a6" # grey — no handshake ever - elif h <= 120: - color = "#2ecc71" # green — active - elif h <= 86400: - color = "#f1c40f" # yellow — stale - else: - color = "#e74c3c" # red — effectively broken - - edges.append( - { - "from": p["server_name"], - "to": target, - "label": p["ifname"], - "color": color, - "width": 3 if color == "#e74c3c" else 2, - "title": f"{h}s since handshake" if h else "No handshake yet", - } - ) - - return jsonify({"nodes": nodes, "edges": edges}) - - -# ── FRONTEND ────────────────────────────────────────────────────────────────── - - -@app.route("/status") -def index(): - return render_template_string(HTML_TEMPLATE) - - -HTML_TEMPLATE = """ - - - kattila — Network Map - - - - -
- Link status -
Active (< 2 min)
-
Stale (> 2 min)
-
Broken (> 24 h)
-
No handshake
-
-
-
- - -""" - - -if __name__ == "__main__": - app.run(host="10.37.11.2", port=5086) diff --git a/kattila.py b/kattila_poc.py similarity index 100% rename from kattila.py rename to kattila_poc.py