From 99e0e0208c307c4b9d9dfb09c3ee41895b0281f7 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Fri, 17 Apr 2026 19:23:04 +0300 Subject: [PATCH] Makefile and agent start. --- DESIGN.md | 239 +++++++++++++++++++ Makefile | 16 ++ agent/# Kattila Agent Implementation Plan.md | 55 +++++ agent/api/api.go | 78 ++++++ agent/config/config.go | 87 +++++++ agent/go.mod | 3 + agent/main.go | 30 +++ agent/models/models.go | 55 +++++ agent/network/network.go | 194 +++++++++++++++ agent/reporter/reporter.go | 191 +++++++++++++++ agent/security/security.go | 100 ++++++++ kattila_poc.py | 2 +- manager/app.py | 10 + manager/requirements.txt | 1 + 14 files changed, 1060 insertions(+), 1 deletion(-) create mode 100644 DESIGN.md create mode 100644 Makefile create mode 100644 agent/# Kattila Agent Implementation Plan.md create mode 100644 agent/api/api.go create mode 100644 agent/config/config.go create mode 100644 agent/go.mod create mode 100644 agent/main.go create mode 100644 agent/models/models.go create mode 100644 agent/network/network.go create mode 100644 agent/reporter/reporter.go create mode 100644 agent/security/security.go create mode 100644 manager/app.py create mode 100644 manager/requirements.txt diff --git a/DESIGN.md b/DESIGN.md new file mode 100644 index 0000000..3daff71 --- /dev/null +++ b/DESIGN.md @@ -0,0 +1,239 @@ +# Kattila.status - Design Specification + +Kattila.status is a virtual network topology monitor designed for multi-layer, multi-network environments (VPN meshes, Wireguard, OpenVPN). It follows an **Agent-Manager** architecture with a pure push-based messaging model. + +## Architecture Overview + +```mermaid +graph TD + subgraph "Agents (Debian/Linux)" + A1[Agent 1] + A2[Agent 2] + A3[Agent 3] + end + + subgraph "Manager (Python/Flask)" + M[Manager API] + DB[(SQLite WAL)] + UI[Web UI / Vis-network] + end + + A1 -->|HTTP/JSON| M + A2 -->|Relay| A1 + A3 -->|Relay| A2 + M <--> DB + UI <--> M +``` + +### API Endpoints + +#### Agent API (Listen: 5087) +| Endpoint | Method | Description | +| :--- | :--- | :--- | +| `/status/healthcheck` | GET | Returns simple health status. | +| `/status/reset` | POST | Wipes local SQLite state and triggers re-registration. | +| `/status/peer` | GET | Returns local interface/route info (for relay peers). | +| `/status/relay` | POST | Accepts an enveloped report for forwarding to the Manager. | + +#### Manager API (Listen: 5086) +| Endpoint | Method | Description | +| :--- | :--- | :--- | +| `/status/updates` | POST | Receives periodic reports from agents. | +| `/status/register` | POST | First contact; issues a unique `agent_id`. | +| `/status/healthcheck` | GET | Manager heartheat check. | +| `/status/alarms` | GET | Fetches active network anomalies. | +| `/status/agents` | GET | Lists all known agents and their status. | +| `/status/admin/reset` | POST | Resets specific agent or fleet state. | + +--- + +## Data Model + +### Manager DB (`kattila_manager.db`) + +#### `agents` +Tracks the fleet registry and presence. +```sql +CREATE TABLE agents ( + agent_id TEXT PRIMARY KEY, + hostname TEXT NOT NULL, + agent_version INTEGER NOT NULL, + fleet_id TEXT NOT NULL, + registered_at INTEGER NOT NULL, + last_seen_at INTEGER NOT NULL, + last_tick INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'online' -- online, offline, warning +); +CREATE INDEX idx_agents_last_seen ON agents(last_seen_at); +``` + +#### `reports` +Auditing and replay protection. +```sql +CREATE TABLE reports ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + tick INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + report_type TEXT NOT NULL, -- 'report', 'relay', 'register' + report_json TEXT NOT NULL, + received_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE +); +CREATE UNIQUE INDEX idx_reports_agent_tick ON reports(agent_id, tick); +``` + +#### `topology_edges` +Inferred links between agents. +```sql +CREATE TABLE topology_edges ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + from_agent_id TEXT NOT NULL, + to_agent_id TEXT NOT NULL, + edge_type TEXT NOT NULL, -- 'wireguard', 'openvpn', 'physical', 'relay' + metadata TEXT DEFAULT '{}', -- JSON for pubkeys, RTT, etc. + last_seen INTEGER NOT NULL, + is_active INTEGER NOT NULL DEFAULT 1 +); +CREATE UNIQUE INDEX idx_edges_pair ON topology_edges(from_agent_id, to_agent_id, edge_type); +``` + +#### `agent_interfaces` +Tracks network interfaces an agent reports, allowing detection of when they come and go. +```sql +CREATE TABLE agent_interfaces ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + interface_name TEXT NOT NULL, + mac_address TEXT, + addresses_json TEXT, + is_virtual INTEGER NOT NULL DEFAULT 0, + vpn_type TEXT, + last_seen_at INTEGER NOT NULL, + FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE +); +CREATE UNIQUE INDEX idx_agent_interfaces ON agent_interfaces(agent_id, interface_name); +``` + +#### `alarms` +Event log for network changes and issues, tracking state and timestamps. +```sql +CREATE TABLE alarms ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + alarm_type TEXT NOT NULL, -- e.g., 'link_down', 'new_peer' + status TEXT NOT NULL DEFAULT 'active', -- 'active', 'dismissed' + details_json TEXT DEFAULT '{}', + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + dismissed_at INTEGER, + FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE +); +CREATE INDEX idx_alarms_agent_status ON alarms(agent_id, status); +``` + +## Communication Protocol + +### Security & Hardware +- **Authentication**: HMAC-SHA256 verification using a fleet-wide Bootstrap PSK. +- **Key Discovery & Transition**: The PSK is retrieved via DNS TXT, HTTP(S) URL, or local file and checked for changes hourly. The manager should accept the current and the 2 previous bootstrap keys to handle propagation delays, returning a specific error if an agent connects with an outdated key. +- **Replay Protection**: Monotonic "ticks" and a sliding window nonce cache (120 entries). +- **Time Sync**: 10-minute maximum clock skew allowance. + +### Report Payload (Agent -> Manager) +Agents send a `report` every 30 seconds (with randomized jitter). + +```json +{ + "version": 1, + "tick": 42, + "type": "report", + "nonce": "base64-random-nonce", + "timestamp": 1744569900, + "agent_id": "agent-7f3a9b2c1d", + "agent_version": 5, + "fleet_id": "sha256-psk-hash", + "hmac": "hex-hmac-sha256", + "data": { + "hostname": "node-01", + "uptime_seconds": 123456, + "loadavg": [0.12, 0.34, 0.56], + "interfaces": [ + { + "name": "eth0", + "mac": "aa:bb:cc:dd:ee:ff", + "addresses": ["192.168.1.10/24"], + "is_virtual": false, + "vpn_type": null + } + ], + "routes": [ + { "dst": "0.0.0.0/0", "via": "192.168.1.1", "dev": "eth0" } + ], + "wg_peers": [ + { + "public_key": "base64-key", + "endpoint": "1.2.3.4:51820", + "allowed_ips": ["10.0.0.2/32"], + "last_handshake": 1744569800 + } + ] + } +} +``` + +### Relay Mechanism +Used when an agent cannot reach the manager directly via the configured URL. +- **Discovery**: The agent will scan its connected WireGuard networks for other agents (checking port 5087). It queries their `/status/peer` endpoint to find a forward path to the manager. +- Supports up to 3 hops. + +> [!IMPORTANT] +> **Loop Detection**: Agents must check the `relay_path` array. If their own `agent_id` is present, the message is dropped to prevent infinite recursion. + +--- + +## Data Model (Manager) + +The Manager maintains the network state and inferred topology. + +| Table | Purpose | +| :--- | :--- | +| `agents` | Fleet registry and presence tracking (heartbeat). | +| `agent_interfaces` | Historical snapshot of network interfaces. | +| `topology_edges` | Inferred links between agents (Physical, VPN, Relay). | +| `alarms` | Event log for changes (link down, new peer, etc.). | + +--- + +## Visualization & UI + +The network is visualized using **Vis-network.min.js** in a layered approach: +1. **Layer 1 (Public)**: Servers with direct public IPs (masked as SHA fingerprints). +2. **Layer 2 (Linked)**: Servers behind NAT but directly connected to Layer 1. +3. **Layer 3 (Private)**: Isolated nodes reachable only via multi-hop paths. + +--- + +## Operational Considerations + +### Logging & Monitoring +- **Agents**: Should log to `journald` at INFO level. Critical errors (e.g., SQLite corruption, no PSK) should be logged at ERROR. +- **Manager**: Log each incoming report and security failure (HMAC mismatch) with the source agent IP and ID. + +### Maintenance +- **Database Vacuum**: Periodic `VACUUM` on the manager DB is recommended if tracking many historical reports. +- **Relay Cleanup**: The `nonce_cache` should be cleaned every 10 minutes to prevent memory/storage bloat. + +--- + +## Future Enhancements & Proposals + +### 1. Alerting Integrations +- **Webhooks**: Simple HTTP POST to external services (Slack, Discord) when an `alarm` is created. + +### 2. Historical Topology "Time-Travel" +- Store topology snapshots every hour. +- Allow the UI to "scrub" through history to see when a specific link was added or lost. + +### 3. Advanced Visualization +- **Geographic Map Overlay**: If agents provide coordinates (or inferred via GeoIP), display nodes on a world map. +- **Link Bandwidth Visualization**: Thicker lines for higher capacity links (e.g., physical vs. relay). diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..84356cd --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +.PHONY: all build-agent build-manager clean + +all: build-agent build-manager + +build-agent: + @echo "Building Go agent for x86_64..." + cd agent && GOOS=linux GOARCH=amd64 go build -o bin/agent-amd64 . + @echo "Building Go agent for aarch64..." + cd agent && GOOS=linux GOARCH=arm64 go build -o bin/agent-arm64 . + +build-manager: + @echo "Setting up Python manager dependencies..." + @echo "Run 'cd manager && pip install -r requirements.txt' in your environment to install dependencies." + +clean: + rm -rf agent/bin diff --git a/agent/# Kattila Agent Implementation Plan.md b/agent/# Kattila Agent Implementation Plan.md new file mode 100644 index 0000000..9b2ee8b --- /dev/null +++ b/agent/# Kattila Agent Implementation Plan.md @@ -0,0 +1,55 @@ +# Kattila Agent Implementation Plan + +This document outlines the detailed architecture and implementation steps for the Go-based Kattila Agent. + +## Overview +The Kattila Agent continuously gathers network topology information from the host OS (using `ip` and `wg` commands), cryptographically signs the data, and pushes it to the Kattila Manager. If direct communication fails, it uses peer scanning to find a relay path. + +## User Review Required +> [!IMPORTANT] +> - Do we assume `wg`, `ip` commands are always available in the `PATH` of the agent? +> - The TXT record is returned with wrapping quotes (e.g., `"955f333e5b9cc..."`). The agent will strip these quotes. Is the PSK used exactly as-is for the HMAC key? +> - For Wireguard peer scanning during relay fallback: Will the agent scan the *entire subnet* of `allowed ips` (e.g., `172.16.100.8/29`) to find other agents on port `5087`, or just guess based on endpoints? Scanning the small subnet is usually reliable. +> - We should parse `wg show all dump` instead of raw `wg` if possible, since it's much easier and safer to parse TSV outputs programmatically. Is it okay to use `wg show all dump` instead of human-readable `wg`? + +## Proposed Architecture / Packages + +### 1. `config` Package +- Responsibilities: Load `.env` file containing `DNS`, `MANAGER_URL`, etc. Provide access to environment configurations. +- Store the agent's unique ID (which is generated and saved locally on first run to persist across restarts until `/status/reset`). + +### 2. `security` Package +- **Key Discovery**: Periodically resolve the TXT record of the configured `DNS` name to get the Bootstrap PSK. Strip any surrounding quotes. Keep a history of the current and two previous keys. +- **HMAC Generation**: Provide a function to calculate `HMAC-SHA256` of JSON payloads using the current PSK. +- **Nonce Generation**: Generate cryptographically secure base64 strings for the `nonce` field. + +### 3. `network` Package +- Execute OS commands and parse their outputs: + - `ip -j a`: Parse the JSON output into `Interface` structs. + - `ip -j -4 r`: Parse the JSON output into `Route` structs. + - `wg show all dump`: Parse the TSV wireguard connections. If `wg` human-readable parsing is strictly required, we will build a custom state-machine parser for the provided format. +- Maintain a gathering function `GatherStatus()` that bundles all these details into the expected `data` payload. + +### 4. `api` Package (Agent HTTP Server) +- Runs an HTTP server on `0.0.0.0:5087` using standard `net/http`. +- Endpoints: + - `GET /status/healthcheck`: Return `200 OK {"status": "ok"}` + - `POST /status/reset`: Delete local `agent_id` state, delete internal cache, and trigger a fresh registration loop. + - `GET /status/peer`: Return local network info so peers can decide routing paths. + - `POST /status/relay`: Accept relay payloads, ensure own `agent_id` is not in `relay_path` (loop detection), and forward to manager. + +### 5. `reporter` Package (Main Loop) +- Triggers every 30 seconds. +- Gathers data from `network` package. +- Wraps it in the report envelope: `version`, `tick`, `type`, `nonce`, `timestamp`, `agent_id`, `hmac`. +- Sends POST request to Manager. +- **Relay Fallback**: On failure, queries local wireguard interfaces, pings port `5087` on known subnets, and attempts to find a working peer to relay through. + +## Verification Plan +### Automated testing +- Write unit tests for parsing the provided `ip` and `wg` example files. +- Write unit test for the PSK rotation logic. + +### Manual Verification +- Run the agent locally and verify the logs show successful gathering of interfaces and routes. +- Force a bad manager URL and observe logs indicating relay peer scanning behavior. diff --git a/agent/api/api.go b/agent/api/api.go new file mode 100644 index 0000000..d926927 --- /dev/null +++ b/agent/api/api.go @@ -0,0 +1,78 @@ +package api + +import ( + "encoding/json" + "io" + "log" + "net/http" + + "kattila-agent/config" + "kattila-agent/network" +) + +var handleRelay func(body []byte) error + +func StartServer(relayHandler func([]byte) error) { + handleRelay = relayHandler + + http.HandleFunc("/status/healthcheck", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) + }) + + http.HandleFunc("/status/reset", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + config.ResetAgentID() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "reset_triggered"}) + }) + + http.HandleFunc("/status/peer", func(w http.ResponseWriter, r *http.Request) { + data, err := network.GatherSystemData() + if err != nil { + http.Error(w, "Failed to gather data", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(data) + }) + + http.HandleFunc("/status/relay", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Read error", http.StatusBadRequest) + return + } + + if handleRelay != nil { + err = handleRelay(body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{"status": "relayed"}) + }) + + port := config.Cfg.AgentPort + if port == "" { + port = "5087" + } + addr := "0.0.0.0:" + port + + go func() { + log.Printf("api: Starting agent server on %s", addr) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("api: Server failed: %v", err) + } + }() +} diff --git a/agent/config/config.go b/agent/config/config.go new file mode 100644 index 0000000..c050939 --- /dev/null +++ b/agent/config/config.go @@ -0,0 +1,87 @@ +package config + +import ( + "crypto/rand" + "encoding/hex" + "log" + "os" + "strings" +) + +type Config struct { + DNS string + ManagerURL string + AgentPort string + AgentID string +} + +var Cfg *Config + +func init() { + Cfg = &Config{ + AgentPort: "5087", // default + } +} + +func LoadConfig() { + paths := []string{".env", "../.env"} + for _, p := range paths { + content, err := os.ReadFile(p) + if err == nil { + parseEnv(string(content)) + break + } + } + + if h := os.Getenv("MANAGER_URL"); h != "" { + Cfg.ManagerURL = h + } + if Cfg.ManagerURL == "" { + Cfg.ManagerURL = "http://localhost:5086" // Default + } + + if d := os.Getenv("DNS"); d != "" { + Cfg.DNS = d + } + + loadOrGenerateAgentID() +} + +func parseEnv(content string) { + lines := strings.Split(content, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + os.Setenv(parts[0], parts[1]) + } + } +} + +func loadOrGenerateAgentID() { + path := "agent_id.txt" + data, err := os.ReadFile(path) + if err == nil { + Cfg.AgentID = strings.TrimSpace(string(data)) + if Cfg.AgentID != "" { + return + } + } + + // Generate new AgentID + b := make([]byte, 8) + rand.Read(b) + Cfg.AgentID = "agent-" + hex.EncodeToString(b) + err = os.WriteFile(path, []byte(Cfg.AgentID), 0600) + if err != nil { + log.Printf("Warning: failed to save agent_id.txt: %v", err) + } +} + +func ResetAgentID() { + os.Remove("agent_id.txt") + loadOrGenerateAgentID() +} diff --git a/agent/go.mod b/agent/go.mod new file mode 100644 index 0000000..f9cb8b9 --- /dev/null +++ b/agent/go.mod @@ -0,0 +1,3 @@ +module kattila-agent + +go 1.21 diff --git a/agent/main.go b/agent/main.go new file mode 100644 index 0000000..2fcc8ec --- /dev/null +++ b/agent/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "kattila-agent/api" + "kattila-agent/config" + "kattila-agent/reporter" + "kattila-agent/security" +) + +func main() { + log.Println("Kattila Agent starting...") + + config.LoadConfig() + security.StartKeyPoller() + + api.StartServer(reporter.HandleRelay) + reporter.StartLoop() + + // Wait for termination signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Println("Kattila Agent shutting down...") +} diff --git a/agent/models/models.go b/agent/models/models.go new file mode 100644 index 0000000..5237857 --- /dev/null +++ b/agent/models/models.go @@ -0,0 +1,55 @@ +package models + +// Report is the top level JSON payload sent to the manager +type Report struct { + Version int `json:"version"` + Tick int64 `json:"tick"` + Type string `json:"type"` // "report", "relay", "register" + Nonce string `json:"nonce"` + Timestamp int64 `json:"timestamp"` + AgentID string `json:"agent_id"` + AgentVersion int `json:"agent_version"` + FleetID string `json:"fleet_id"` + HMAC string `json:"hmac"` + Data interface{} `json:"data"` // The actual payload logic +} + +// SystemData is the core system status payload +type SystemData struct { + Hostname string `json:"hostname"` + UptimeSeconds int64 `json:"uptime_seconds"` + LoadAvg []float64 `json:"loadavg"` + Interfaces []Interface `json:"interfaces"` + Routes []Route `json:"routes"` + WGPeers []WGPeer `json:"wg_peers"` +} + +type Interface struct { + Name string `json:"name"` + MAC string `json:"mac"` + Addresses []string `json:"addresses"` + IsVirtual bool `json:"is_virtual"` + VPNType *string `json:"vpn_type"` // "wireguard", "openvpn", etc. +} + +type Route struct { + Dst string `json:"dst"` + Via string `json:"via"` + Dev string `json:"dev"` +} + +type WGPeer struct { + Interface string `json:"interface"` + PublicKey string `json:"public_key"` + Endpoint string `json:"endpoint"` + AllowedIPs []string `json:"allowed_ips"` + LatestHandshake int64 `json:"latest_handshake"` + TransferRx int64 `json:"transfer_rx"` + TransferTx int64 `json:"transfer_tx"` +} + +// RelayEnvelope is used when pushing a report over another peer +type RelayEnvelope struct { + RelayPath []string `json:"relay_path"` // array of agent_ids that routed this message + Payload Report `json:"payload"` +} diff --git a/agent/network/network.go b/agent/network/network.go new file mode 100644 index 0000000..25c1113 --- /dev/null +++ b/agent/network/network.go @@ -0,0 +1,194 @@ +package network + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strconv" + "strings" + + "kattila-agent/models" +) + +type ipAddrInfo struct { + Local string `json:"local"` + PrefixLen int `json:"prefixlen"` +} + +type ipInterface struct { + Ifname string `json:"ifname"` + Address string `json:"address"` // MAC address in newer iproute2 + AddrInfo []ipAddrInfo `json:"addr_info"` +} + +func getInterfaces() ([]models.Interface, error) { + cmd := exec.Command("ip", "-j", "a") + out, err := cmd.Output() + if err != nil { + return nil, err + } + + var parsed []ipInterface + if err := json.Unmarshal(out, &parsed); err != nil { + return nil, err + } + + var results []models.Interface + for _, itf := range parsed { + mac := itf.Address + var addrs []string + for _, info := range itf.AddrInfo { + addrs = append(addrs, fmt.Sprintf("%s/%d", info.Local, info.PrefixLen)) + } + + isVirtual := false + var vpnType *string + if strings.HasPrefix(itf.Ifname, "wg") || strings.HasPrefix(itf.Ifname, "tun") || strings.HasPrefix(itf.Ifname, "parvpn") || strings.HasPrefix(itf.Ifname, "home") || strings.HasPrefix(itf.Ifname, "tailscale") { + isVirtual = true + } + + results = append(results, models.Interface{ + Name: itf.Ifname, + MAC: mac, + Addresses: addrs, + IsVirtual: isVirtual, + VPNType: vpnType, + }) + } + return results, nil +} + +type ipRoute struct { + Dst string `json:"dst"` + Gateway string `json:"gateway"` + Via string `json:"via"` // Sometimes present instead of gateway + Dev string `json:"dev"` +} + +func getRoutes() ([]models.Route, error) { + cmd := exec.Command("ip", "-j", "-4", "r") + out, err := cmd.Output() + if err != nil { + return nil, err + } + + var parsed []ipRoute + if err := json.Unmarshal(out, &parsed); err != nil { + return nil, err + } + + var results []models.Route + for _, r := range parsed { + via := r.Gateway + if via == "" { + via = r.Via + } + results = append(results, models.Route{ + Dst: r.Dst, + Via: via, + Dev: r.Dev, + }) + } + return results, nil +} + +func getWgPeers() ([]models.WGPeer, error) { + cmd := exec.Command("wg", "show", "all", "dump") + out, err := cmd.Output() + if err != nil { // wg might fail or not be installed, ignore and return empty + return nil, nil + } + + var peers []models.WGPeer + lines := bytes.Split(out, []byte("\n")) + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + + cols := strings.Split(string(line), "\t") + // wg interface line: itf, privkey, pubkey, port, fwmark (5 cols) + // wg peer line: itf, pubkey, psk, endpoint, allowed-ips, handshake, rx, tx, keepalive (9 cols) + if len(cols) >= 8 { + itf := cols[0] + pubkey := cols[1] + endpoint := cols[3] + if endpoint == "(none)" { + endpoint = "" + } + allowedIpsRaw := cols[4] + if allowedIpsRaw == "(none)" { + allowedIpsRaw = "" + } + allowedIps := strings.Split(allowedIpsRaw, ",") + + handshake, _ := strconv.ParseInt(cols[5], 10, 64) + rx, _ := strconv.ParseInt(cols[6], 10, 64) + tx, _ := strconv.ParseInt(cols[7], 10, 64) + + peers = append(peers, models.WGPeer{ + Interface: itf, + PublicKey: pubkey, + Endpoint: endpoint, + AllowedIPs: allowedIps, + LatestHandshake: handshake, + TransferRx: rx, + TransferTx: tx, + }) + } + } + return peers, nil +} + +func GatherSystemData() (models.SystemData, error) { + hostname, _ := exec.Command("hostname").Output() + + // Simplified uptime extraction + uptimeBytes, _ := exec.Command("cat", "/proc/uptime").Output() + var uptimeSec int64 + if len(uptimeBytes) > 0 { + parts := strings.Fields(string(uptimeBytes)) + if len(parts) > 0 { + uptimeFloat, _ := strconv.ParseFloat(parts[0], 64) + uptimeSec = int64(uptimeFloat) + } + } + + loadavgBytes, _ := exec.Command("cat", "/proc/loadavg").Output() + var loadavg []float64 + if len(loadavgBytes) > 0 { + parts := strings.Fields(string(loadavgBytes)) + if len(parts) >= 3 { + l1, _ := strconv.ParseFloat(parts[0], 64) + l2, _ := strconv.ParseFloat(parts[1], 64) + l3, _ := strconv.ParseFloat(parts[2], 64) + loadavg = []float64{l1, l2, l3} + } + } + + intfs, err := getInterfaces() + if err != nil { + intfs = []models.Interface{} + } + + routes, err := getRoutes() + if err != nil { + routes = []models.Route{} + } + + wgPeers, err := getWgPeers() + if err != nil { + wgPeers = []models.WGPeer{} + } + + return models.SystemData{ + Hostname: strings.TrimSpace(string(hostname)), + UptimeSeconds: uptimeSec, + LoadAvg: loadavg, + Interfaces: intfs, + Routes: routes, + WGPeers: wgPeers, + }, nil +} diff --git a/agent/reporter/reporter.go b/agent/reporter/reporter.go new file mode 100644 index 0000000..c82e634 --- /dev/null +++ b/agent/reporter/reporter.go @@ -0,0 +1,191 @@ +package reporter + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "strings" + "time" + + "kattila-agent/config" + "kattila-agent/models" + "kattila-agent/network" + "kattila-agent/security" +) + +var tickCounter int64 = 0 + +func StartLoop() { + doReport() // run immediately + ticker := time.NewTicker(30 * time.Second) + go func() { + for range ticker.C { + doReport() + } + }() +} + +func doReport() { + data, err := network.GatherSystemData() + if err != nil { + log.Printf("reporter: gather error: %v", err) + return + } + + tickCounter++ + now := time.Now().Unix() + + report := models.Report{ + Version: 1, + Tick: tickCounter, + Type: "report", + Nonce: security.GenerateNonce(), + Timestamp: now, + AgentID: config.Cfg.AgentID, + AgentVersion: 1, + FleetID: security.FleetID(), + Data: data, + } + + report.HMAC = security.SignPayload(report.Data) + + err = pushToManager(report) + if err != nil { + log.Printf("reporter: direct push failed (%v). Attempting relay scan...", err) + tryRelay(report, data) + } +} + +func pushToManager(report models.Report) error { + body, _ := json.Marshal(report) + url := strings.TrimRight(config.Cfg.ManagerURL, "/") + "/status/updates" + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("bad status code %d: %s", resp.StatusCode, respBody) + } + log.Printf("reporter: Report successfully sent to Manager (tick %d)", report.Tick) + return nil +} + +func HandleRelay(body []byte) error { + var envelope models.RelayEnvelope + if err := json.Unmarshal(body, &envelope); err != nil { + return err + } + + for _, id := range envelope.RelayPath { + if id == config.Cfg.AgentID { + log.Println("reporter: Dropped relay request: routing loop detected") + return errors.New("routing loop detected") + } + } + + envelope.RelayPath = append(envelope.RelayPath, config.Cfg.AgentID) + if len(envelope.RelayPath) > 3 { + return errors.New("relay hop limit exceeded") + } + + envelopeBody, _ := json.Marshal(envelope) + url := strings.TrimRight(config.Cfg.ManagerURL, "/") + "/status/updates" + req, err := http.NewRequest("POST", url, bytes.NewBuffer(envelopeBody)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + log.Printf("reporter: Manager unreachable during relay forward, hopping further...") + + data, err := network.GatherSystemData() + if err == nil { + return tryRelayEnvelope(envelope, data) + } + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("bad status from manager: %d", resp.StatusCode) + } + + log.Printf("reporter: Successfully relayed message for %s", envelope.Payload.AgentID) + return nil +} + +func tryRelay(report models.Report, localData models.SystemData) { + env := models.RelayEnvelope{ + RelayPath: []string{config.Cfg.AgentID}, + Payload: report, + } + err := tryRelayEnvelope(env, localData) + if err != nil { + log.Printf("reporter: Exhausted all relays, couldn't push report: %v", err) + } +} + +func tryRelayEnvelope(env models.RelayEnvelope, data models.SystemData) error { + for _, wg := range data.WGPeers { + for _, allowedRaw := range wg.AllowedIPs { + ip, _, err := net.ParseCIDR(allowedRaw) + if err != nil { + ip = net.ParseIP(allowedRaw) + } + if ip != nil { + ipTarget := ip.String() + if pingPeer(ipTarget) { + log.Printf("reporter: Found relay peer at %s, forwarding...", ipTarget) + err := pushToRelay(ipTarget, env) + if err == nil { + return nil + } + log.Printf("reporter: Failed to push to relay %s: %v", ipTarget, err) + } + } + } + } + return errors.New("no working relays found") +} + +func pingPeer(ip string) bool { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(fmt.Sprintf("http://%s:%s/status/peer", ip, config.Cfg.AgentPort)) + if err == nil { + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK + } + return false +} + +func pushToRelay(ip string, env models.RelayEnvelope) error { + body, _ := json.Marshal(env) + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Post(fmt.Sprintf("http://%s:%s/status/relay", ip, config.Cfg.AgentPort), "application/json", bytes.NewBuffer(body)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("relay rejected forwarding attempt with %d", resp.StatusCode) + } + return nil +} diff --git a/agent/security/security.go b/agent/security/security.go new file mode 100644 index 0000000..ac83026 --- /dev/null +++ b/agent/security/security.go @@ -0,0 +1,100 @@ +package security + +import ( + "crypto/hmac" + "crypto/rand" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "encoding/json" + "log" + "net" + "strings" + "sync" + "time" + + "kattila-agent/config" +) + +var ( + currentPSK string + mu sync.RWMutex +) + +// StartKeyPoller checks the DNS record every hour +func StartKeyPoller() { + fetchKey() + ticker := time.NewTicker(1 * time.Hour) + go func() { + for range ticker.C { + fetchKey() + } + }() +} + +func fetchKey() { + dnsName := config.Cfg.DNS + if dnsName == "" { + log.Println("security: No DNS configured for PSK") + return + } + + txts, err := net.LookupTXT(dnsName) + if err != nil { + log.Printf("security: Failed to lookup TXT for %s: %v", dnsName, err) + return + } + + if len(txts) == 0 { + return + } + + key := txts[0] + // Remove quotes if present + key = strings.Trim(key, `"`) + + mu.Lock() + if currentPSK != key { + log.Println("security: New PSK discovered via DNS") + currentPSK = key + } + mu.Unlock() +} + +func GetCurrentPSK() string { + mu.RLock() + defer mu.RUnlock() + return currentPSK +} + +// FleetID generates a SHA256 of the PSK to uniquely identify the fleet +func FleetID() string { + psk := GetCurrentPSK() + if psk == "" { + return "" + } + hash := sha256.Sum256([]byte(psk)) + return hex.EncodeToString(hash[:]) +} + +func GenerateNonce() string { + b := make([]byte, 16) + rand.Read(b) + return base64.StdEncoding.EncodeToString(b) +} + +func SignPayload(data interface{}) string { + psk := GetCurrentPSK() + if psk == "" { + return "" + } + + bytes, err := json.Marshal(data) + if err != nil { + return "" + } + + h := hmac.New(sha256.New, []byte(psk)) + h.Write(bytes) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/kattila_poc.py b/kattila_poc.py index d6b6ac4..e7a9c93 100644 --- a/kattila_poc.py +++ b/kattila_poc.py @@ -292,4 +292,4 @@ HTML_TEMPLATE = """ if __name__ == "__main__": - app.run(host="10.37.11.2", port=5086) \ No newline at end of file + app.run(host="10.37.11.2", port=5086) diff --git a/manager/app.py b/manager/app.py new file mode 100644 index 0000000..68a1359 --- /dev/null +++ b/manager/app.py @@ -0,0 +1,10 @@ +from flask import Flask, jsonify + +app = Flask(__name__) + +@app.route('/status/healthcheck') +def healthcheck(): + return jsonify({"status": "ok"}) + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5086) diff --git a/manager/requirements.txt b/manager/requirements.txt new file mode 100644 index 0000000..11955a8 --- /dev/null +++ b/manager/requirements.txt @@ -0,0 +1 @@ +Flask>=3.0.0