Makefile and agent start.
This commit is contained in:
239
DESIGN.md
Normal file
239
DESIGN.md
Normal file
@@ -0,0 +1,239 @@
|
||||
# Kattila.status - Design Specification
|
||||
|
||||
Kattila.status is a virtual network topology monitor designed for multi-layer, multi-network environments (VPN meshes, Wireguard, OpenVPN). It follows an **Agent-Manager** architecture with a pure push-based messaging model.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
subgraph "Agents (Debian/Linux)"
|
||||
A1[Agent 1]
|
||||
A2[Agent 2]
|
||||
A3[Agent 3]
|
||||
end
|
||||
|
||||
subgraph "Manager (Python/Flask)"
|
||||
M[Manager API]
|
||||
DB[(SQLite WAL)]
|
||||
UI[Web UI / Vis-network]
|
||||
end
|
||||
|
||||
A1 -->|HTTP/JSON| M
|
||||
A2 -->|Relay| A1
|
||||
A3 -->|Relay| A2
|
||||
M <--> DB
|
||||
UI <--> M
|
||||
```
|
||||
|
||||
### API Endpoints
|
||||
|
||||
#### Agent API (Listen: 5087)
|
||||
| Endpoint | Method | Description |
|
||||
| :--- | :--- | :--- |
|
||||
| `/status/healthcheck` | GET | Returns simple health status. |
|
||||
| `/status/reset` | POST | Wipes local SQLite state and triggers re-registration. |
|
||||
| `/status/peer` | GET | Returns local interface/route info (for relay peers). |
|
||||
| `/status/relay` | POST | Accepts an enveloped report for forwarding to the Manager. |
|
||||
|
||||
#### Manager API (Listen: 5086)
|
||||
| Endpoint | Method | Description |
|
||||
| :--- | :--- | :--- |
|
||||
| `/status/updates` | POST | Receives periodic reports from agents. |
|
||||
| `/status/register` | POST | First contact; issues a unique `agent_id`. |
|
||||
| `/status/healthcheck` | GET | Manager heartheat check. |
|
||||
| `/status/alarms` | GET | Fetches active network anomalies. |
|
||||
| `/status/agents` | GET | Lists all known agents and their status. |
|
||||
| `/status/admin/reset` | POST | Resets specific agent or fleet state. |
|
||||
|
||||
---
|
||||
|
||||
## Data Model
|
||||
|
||||
### Manager DB (`kattila_manager.db`)
|
||||
|
||||
#### `agents`
|
||||
Tracks the fleet registry and presence.
|
||||
```sql
|
||||
CREATE TABLE agents (
|
||||
agent_id TEXT PRIMARY KEY,
|
||||
hostname TEXT NOT NULL,
|
||||
agent_version INTEGER NOT NULL,
|
||||
fleet_id TEXT NOT NULL,
|
||||
registered_at INTEGER NOT NULL,
|
||||
last_seen_at INTEGER NOT NULL,
|
||||
last_tick INTEGER NOT NULL DEFAULT 0,
|
||||
status TEXT NOT NULL DEFAULT 'online' -- online, offline, warning
|
||||
);
|
||||
CREATE INDEX idx_agents_last_seen ON agents(last_seen_at);
|
||||
```
|
||||
|
||||
#### `reports`
|
||||
Auditing and replay protection.
|
||||
```sql
|
||||
CREATE TABLE reports (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent_id TEXT NOT NULL,
|
||||
tick INTEGER NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
report_type TEXT NOT NULL, -- 'report', 'relay', 'register'
|
||||
report_json TEXT NOT NULL,
|
||||
received_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
|
||||
FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE
|
||||
);
|
||||
CREATE UNIQUE INDEX idx_reports_agent_tick ON reports(agent_id, tick);
|
||||
```
|
||||
|
||||
#### `topology_edges`
|
||||
Inferred links between agents.
|
||||
```sql
|
||||
CREATE TABLE topology_edges (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
from_agent_id TEXT NOT NULL,
|
||||
to_agent_id TEXT NOT NULL,
|
||||
edge_type TEXT NOT NULL, -- 'wireguard', 'openvpn', 'physical', 'relay'
|
||||
metadata TEXT DEFAULT '{}', -- JSON for pubkeys, RTT, etc.
|
||||
last_seen INTEGER NOT NULL,
|
||||
is_active INTEGER NOT NULL DEFAULT 1
|
||||
);
|
||||
CREATE UNIQUE INDEX idx_edges_pair ON topology_edges(from_agent_id, to_agent_id, edge_type);
|
||||
```
|
||||
|
||||
#### `agent_interfaces`
|
||||
Tracks network interfaces an agent reports, allowing detection of when they come and go.
|
||||
```sql
|
||||
CREATE TABLE agent_interfaces (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent_id TEXT NOT NULL,
|
||||
interface_name TEXT NOT NULL,
|
||||
mac_address TEXT,
|
||||
addresses_json TEXT,
|
||||
is_virtual INTEGER NOT NULL DEFAULT 0,
|
||||
vpn_type TEXT,
|
||||
last_seen_at INTEGER NOT NULL,
|
||||
FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE
|
||||
);
|
||||
CREATE UNIQUE INDEX idx_agent_interfaces ON agent_interfaces(agent_id, interface_name);
|
||||
```
|
||||
|
||||
#### `alarms`
|
||||
Event log for network changes and issues, tracking state and timestamps.
|
||||
```sql
|
||||
CREATE TABLE alarms (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent_id TEXT NOT NULL,
|
||||
alarm_type TEXT NOT NULL, -- e.g., 'link_down', 'new_peer'
|
||||
status TEXT NOT NULL DEFAULT 'active', -- 'active', 'dismissed'
|
||||
details_json TEXT DEFAULT '{}',
|
||||
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
|
||||
dismissed_at INTEGER,
|
||||
FOREIGN KEY (agent_id) REFERENCES agents(agent_id) ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX idx_alarms_agent_status ON alarms(agent_id, status);
|
||||
```
|
||||
|
||||
## Communication Protocol
|
||||
|
||||
### Security & Hardware
|
||||
- **Authentication**: HMAC-SHA256 verification using a fleet-wide Bootstrap PSK.
|
||||
- **Key Discovery & Transition**: The PSK is retrieved via DNS TXT, HTTP(S) URL, or local file and checked for changes hourly. The manager should accept the current and the 2 previous bootstrap keys to handle propagation delays, returning a specific error if an agent connects with an outdated key.
|
||||
- **Replay Protection**: Monotonic "ticks" and a sliding window nonce cache (120 entries).
|
||||
- **Time Sync**: 10-minute maximum clock skew allowance.
|
||||
|
||||
### Report Payload (Agent -> Manager)
|
||||
Agents send a `report` every 30 seconds (with randomized jitter).
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 1,
|
||||
"tick": 42,
|
||||
"type": "report",
|
||||
"nonce": "base64-random-nonce",
|
||||
"timestamp": 1744569900,
|
||||
"agent_id": "agent-7f3a9b2c1d",
|
||||
"agent_version": 5,
|
||||
"fleet_id": "sha256-psk-hash",
|
||||
"hmac": "hex-hmac-sha256",
|
||||
"data": {
|
||||
"hostname": "node-01",
|
||||
"uptime_seconds": 123456,
|
||||
"loadavg": [0.12, 0.34, 0.56],
|
||||
"interfaces": [
|
||||
{
|
||||
"name": "eth0",
|
||||
"mac": "aa:bb:cc:dd:ee:ff",
|
||||
"addresses": ["192.168.1.10/24"],
|
||||
"is_virtual": false,
|
||||
"vpn_type": null
|
||||
}
|
||||
],
|
||||
"routes": [
|
||||
{ "dst": "0.0.0.0/0", "via": "192.168.1.1", "dev": "eth0" }
|
||||
],
|
||||
"wg_peers": [
|
||||
{
|
||||
"public_key": "base64-key",
|
||||
"endpoint": "1.2.3.4:51820",
|
||||
"allowed_ips": ["10.0.0.2/32"],
|
||||
"last_handshake": 1744569800
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Relay Mechanism
|
||||
Used when an agent cannot reach the manager directly via the configured URL.
|
||||
- **Discovery**: The agent will scan its connected WireGuard networks for other agents (checking port 5087). It queries their `/status/peer` endpoint to find a forward path to the manager.
|
||||
- Supports up to 3 hops.
|
||||
|
||||
> [!IMPORTANT]
|
||||
> **Loop Detection**: Agents must check the `relay_path` array. If their own `agent_id` is present, the message is dropped to prevent infinite recursion.
|
||||
|
||||
---
|
||||
|
||||
## Data Model (Manager)
|
||||
|
||||
The Manager maintains the network state and inferred topology.
|
||||
|
||||
| Table | Purpose |
|
||||
| :--- | :--- |
|
||||
| `agents` | Fleet registry and presence tracking (heartbeat). |
|
||||
| `agent_interfaces` | Historical snapshot of network interfaces. |
|
||||
| `topology_edges` | Inferred links between agents (Physical, VPN, Relay). |
|
||||
| `alarms` | Event log for changes (link down, new peer, etc.). |
|
||||
|
||||
---
|
||||
|
||||
## Visualization & UI
|
||||
|
||||
The network is visualized using **Vis-network.min.js** in a layered approach:
|
||||
1. **Layer 1 (Public)**: Servers with direct public IPs (masked as SHA fingerprints).
|
||||
2. **Layer 2 (Linked)**: Servers behind NAT but directly connected to Layer 1.
|
||||
3. **Layer 3 (Private)**: Isolated nodes reachable only via multi-hop paths.
|
||||
|
||||
---
|
||||
|
||||
## Operational Considerations
|
||||
|
||||
### Logging & Monitoring
|
||||
- **Agents**: Should log to `journald` at INFO level. Critical errors (e.g., SQLite corruption, no PSK) should be logged at ERROR.
|
||||
- **Manager**: Log each incoming report and security failure (HMAC mismatch) with the source agent IP and ID.
|
||||
|
||||
### Maintenance
|
||||
- **Database Vacuum**: Periodic `VACUUM` on the manager DB is recommended if tracking many historical reports.
|
||||
- **Relay Cleanup**: The `nonce_cache` should be cleaned every 10 minutes to prevent memory/storage bloat.
|
||||
|
||||
---
|
||||
|
||||
## Future Enhancements & Proposals
|
||||
|
||||
### 1. Alerting Integrations
|
||||
- **Webhooks**: Simple HTTP POST to external services (Slack, Discord) when an `alarm` is created.
|
||||
|
||||
### 2. Historical Topology "Time-Travel"
|
||||
- Store topology snapshots every hour.
|
||||
- Allow the UI to "scrub" through history to see when a specific link was added or lost.
|
||||
|
||||
### 3. Advanced Visualization
|
||||
- **Geographic Map Overlay**: If agents provide coordinates (or inferred via GeoIP), display nodes on a world map.
|
||||
- **Link Bandwidth Visualization**: Thicker lines for higher capacity links (e.g., physical vs. relay).
|
||||
16
Makefile
Normal file
16
Makefile
Normal file
@@ -0,0 +1,16 @@
|
||||
.PHONY: all build-agent build-manager clean
|
||||
|
||||
all: build-agent build-manager
|
||||
|
||||
build-agent:
|
||||
@echo "Building Go agent for x86_64..."
|
||||
cd agent && GOOS=linux GOARCH=amd64 go build -o bin/agent-amd64 .
|
||||
@echo "Building Go agent for aarch64..."
|
||||
cd agent && GOOS=linux GOARCH=arm64 go build -o bin/agent-arm64 .
|
||||
|
||||
build-manager:
|
||||
@echo "Setting up Python manager dependencies..."
|
||||
@echo "Run 'cd manager && pip install -r requirements.txt' in your environment to install dependencies."
|
||||
|
||||
clean:
|
||||
rm -rf agent/bin
|
||||
55
agent/# Kattila Agent Implementation Plan.md
Normal file
55
agent/# Kattila Agent Implementation Plan.md
Normal file
@@ -0,0 +1,55 @@
|
||||
# Kattila Agent Implementation Plan
|
||||
|
||||
This document outlines the detailed architecture and implementation steps for the Go-based Kattila Agent.
|
||||
|
||||
## Overview
|
||||
The Kattila Agent continuously gathers network topology information from the host OS (using `ip` and `wg` commands), cryptographically signs the data, and pushes it to the Kattila Manager. If direct communication fails, it uses peer scanning to find a relay path.
|
||||
|
||||
## User Review Required
|
||||
> [!IMPORTANT]
|
||||
> - Do we assume `wg`, `ip` commands are always available in the `PATH` of the agent?
|
||||
> - The TXT record is returned with wrapping quotes (e.g., `"955f333e5b9cc..."`). The agent will strip these quotes. Is the PSK used exactly as-is for the HMAC key?
|
||||
> - For Wireguard peer scanning during relay fallback: Will the agent scan the *entire subnet* of `allowed ips` (e.g., `172.16.100.8/29`) to find other agents on port `5087`, or just guess based on endpoints? Scanning the small subnet is usually reliable.
|
||||
> - We should parse `wg show all dump` instead of raw `wg` if possible, since it's much easier and safer to parse TSV outputs programmatically. Is it okay to use `wg show all dump` instead of human-readable `wg`?
|
||||
|
||||
## Proposed Architecture / Packages
|
||||
|
||||
### 1. `config` Package
|
||||
- Responsibilities: Load `.env` file containing `DNS`, `MANAGER_URL`, etc. Provide access to environment configurations.
|
||||
- Store the agent's unique ID (which is generated and saved locally on first run to persist across restarts until `/status/reset`).
|
||||
|
||||
### 2. `security` Package
|
||||
- **Key Discovery**: Periodically resolve the TXT record of the configured `DNS` name to get the Bootstrap PSK. Strip any surrounding quotes. Keep a history of the current and two previous keys.
|
||||
- **HMAC Generation**: Provide a function to calculate `HMAC-SHA256` of JSON payloads using the current PSK.
|
||||
- **Nonce Generation**: Generate cryptographically secure base64 strings for the `nonce` field.
|
||||
|
||||
### 3. `network` Package
|
||||
- Execute OS commands and parse their outputs:
|
||||
- `ip -j a`: Parse the JSON output into `Interface` structs.
|
||||
- `ip -j -4 r`: Parse the JSON output into `Route` structs.
|
||||
- `wg show all dump`: Parse the TSV wireguard connections. If `wg` human-readable parsing is strictly required, we will build a custom state-machine parser for the provided format.
|
||||
- Maintain a gathering function `GatherStatus()` that bundles all these details into the expected `data` payload.
|
||||
|
||||
### 4. `api` Package (Agent HTTP Server)
|
||||
- Runs an HTTP server on `0.0.0.0:5087` using standard `net/http`.
|
||||
- Endpoints:
|
||||
- `GET /status/healthcheck`: Return `200 OK {"status": "ok"}`
|
||||
- `POST /status/reset`: Delete local `agent_id` state, delete internal cache, and trigger a fresh registration loop.
|
||||
- `GET /status/peer`: Return local network info so peers can decide routing paths.
|
||||
- `POST /status/relay`: Accept relay payloads, ensure own `agent_id` is not in `relay_path` (loop detection), and forward to manager.
|
||||
|
||||
### 5. `reporter` Package (Main Loop)
|
||||
- Triggers every 30 seconds.
|
||||
- Gathers data from `network` package.
|
||||
- Wraps it in the report envelope: `version`, `tick`, `type`, `nonce`, `timestamp`, `agent_id`, `hmac`.
|
||||
- Sends POST request to Manager.
|
||||
- **Relay Fallback**: On failure, queries local wireguard interfaces, pings port `5087` on known subnets, and attempts to find a working peer to relay through.
|
||||
|
||||
## Verification Plan
|
||||
### Automated testing
|
||||
- Write unit tests for parsing the provided `ip` and `wg` example files.
|
||||
- Write unit test for the PSK rotation logic.
|
||||
|
||||
### Manual Verification
|
||||
- Run the agent locally and verify the logs show successful gathering of interfaces and routes.
|
||||
- Force a bad manager URL and observe logs indicating relay peer scanning behavior.
|
||||
78
agent/api/api.go
Normal file
78
agent/api/api.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"kattila-agent/config"
|
||||
"kattila-agent/network"
|
||||
)
|
||||
|
||||
var handleRelay func(body []byte) error
|
||||
|
||||
func StartServer(relayHandler func([]byte) error) {
|
||||
handleRelay = relayHandler
|
||||
|
||||
http.HandleFunc("/status/healthcheck", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
|
||||
})
|
||||
|
||||
http.HandleFunc("/status/reset", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
config.ResetAgentID()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "reset_triggered"})
|
||||
})
|
||||
|
||||
http.HandleFunc("/status/peer", func(w http.ResponseWriter, r *http.Request) {
|
||||
data, err := network.GatherSystemData()
|
||||
if err != nil {
|
||||
http.Error(w, "Failed to gather data", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(data)
|
||||
})
|
||||
|
||||
http.HandleFunc("/status/relay", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Read error", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if handleRelay != nil {
|
||||
err = handleRelay(body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]string{"status": "relayed"})
|
||||
})
|
||||
|
||||
port := config.Cfg.AgentPort
|
||||
if port == "" {
|
||||
port = "5087"
|
||||
}
|
||||
addr := "0.0.0.0:" + port
|
||||
|
||||
go func() {
|
||||
log.Printf("api: Starting agent server on %s", addr)
|
||||
if err := http.ListenAndServe(addr, nil); err != nil {
|
||||
log.Fatalf("api: Server failed: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
87
agent/config/config.go
Normal file
87
agent/config/config.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
DNS string
|
||||
ManagerURL string
|
||||
AgentPort string
|
||||
AgentID string
|
||||
}
|
||||
|
||||
var Cfg *Config
|
||||
|
||||
func init() {
|
||||
Cfg = &Config{
|
||||
AgentPort: "5087", // default
|
||||
}
|
||||
}
|
||||
|
||||
func LoadConfig() {
|
||||
paths := []string{".env", "../.env"}
|
||||
for _, p := range paths {
|
||||
content, err := os.ReadFile(p)
|
||||
if err == nil {
|
||||
parseEnv(string(content))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if h := os.Getenv("MANAGER_URL"); h != "" {
|
||||
Cfg.ManagerURL = h
|
||||
}
|
||||
if Cfg.ManagerURL == "" {
|
||||
Cfg.ManagerURL = "http://localhost:5086" // Default
|
||||
}
|
||||
|
||||
if d := os.Getenv("DNS"); d != "" {
|
||||
Cfg.DNS = d
|
||||
}
|
||||
|
||||
loadOrGenerateAgentID()
|
||||
}
|
||||
|
||||
func parseEnv(content string) {
|
||||
lines := strings.Split(content, "\n")
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" || strings.HasPrefix(line, "#") {
|
||||
continue
|
||||
}
|
||||
parts := strings.SplitN(line, "=", 2)
|
||||
if len(parts) == 2 {
|
||||
os.Setenv(parts[0], parts[1])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func loadOrGenerateAgentID() {
|
||||
path := "agent_id.txt"
|
||||
data, err := os.ReadFile(path)
|
||||
if err == nil {
|
||||
Cfg.AgentID = strings.TrimSpace(string(data))
|
||||
if Cfg.AgentID != "" {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Generate new AgentID
|
||||
b := make([]byte, 8)
|
||||
rand.Read(b)
|
||||
Cfg.AgentID = "agent-" + hex.EncodeToString(b)
|
||||
err = os.WriteFile(path, []byte(Cfg.AgentID), 0600)
|
||||
if err != nil {
|
||||
log.Printf("Warning: failed to save agent_id.txt: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func ResetAgentID() {
|
||||
os.Remove("agent_id.txt")
|
||||
loadOrGenerateAgentID()
|
||||
}
|
||||
3
agent/go.mod
Normal file
3
agent/go.mod
Normal file
@@ -0,0 +1,3 @@
|
||||
module kattila-agent
|
||||
|
||||
go 1.21
|
||||
30
agent/main.go
Normal file
30
agent/main.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"kattila-agent/api"
|
||||
"kattila-agent/config"
|
||||
"kattila-agent/reporter"
|
||||
"kattila-agent/security"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Println("Kattila Agent starting...")
|
||||
|
||||
config.LoadConfig()
|
||||
security.StartKeyPoller()
|
||||
|
||||
api.StartServer(reporter.HandleRelay)
|
||||
reporter.StartLoop()
|
||||
|
||||
// Wait for termination signal
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigChan
|
||||
|
||||
log.Println("Kattila Agent shutting down...")
|
||||
}
|
||||
55
agent/models/models.go
Normal file
55
agent/models/models.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package models
|
||||
|
||||
// Report is the top level JSON payload sent to the manager
|
||||
type Report struct {
|
||||
Version int `json:"version"`
|
||||
Tick int64 `json:"tick"`
|
||||
Type string `json:"type"` // "report", "relay", "register"
|
||||
Nonce string `json:"nonce"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
AgentID string `json:"agent_id"`
|
||||
AgentVersion int `json:"agent_version"`
|
||||
FleetID string `json:"fleet_id"`
|
||||
HMAC string `json:"hmac"`
|
||||
Data interface{} `json:"data"` // The actual payload logic
|
||||
}
|
||||
|
||||
// SystemData is the core system status payload
|
||||
type SystemData struct {
|
||||
Hostname string `json:"hostname"`
|
||||
UptimeSeconds int64 `json:"uptime_seconds"`
|
||||
LoadAvg []float64 `json:"loadavg"`
|
||||
Interfaces []Interface `json:"interfaces"`
|
||||
Routes []Route `json:"routes"`
|
||||
WGPeers []WGPeer `json:"wg_peers"`
|
||||
}
|
||||
|
||||
type Interface struct {
|
||||
Name string `json:"name"`
|
||||
MAC string `json:"mac"`
|
||||
Addresses []string `json:"addresses"`
|
||||
IsVirtual bool `json:"is_virtual"`
|
||||
VPNType *string `json:"vpn_type"` // "wireguard", "openvpn", etc.
|
||||
}
|
||||
|
||||
type Route struct {
|
||||
Dst string `json:"dst"`
|
||||
Via string `json:"via"`
|
||||
Dev string `json:"dev"`
|
||||
}
|
||||
|
||||
type WGPeer struct {
|
||||
Interface string `json:"interface"`
|
||||
PublicKey string `json:"public_key"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
AllowedIPs []string `json:"allowed_ips"`
|
||||
LatestHandshake int64 `json:"latest_handshake"`
|
||||
TransferRx int64 `json:"transfer_rx"`
|
||||
TransferTx int64 `json:"transfer_tx"`
|
||||
}
|
||||
|
||||
// RelayEnvelope is used when pushing a report over another peer
|
||||
type RelayEnvelope struct {
|
||||
RelayPath []string `json:"relay_path"` // array of agent_ids that routed this message
|
||||
Payload Report `json:"payload"`
|
||||
}
|
||||
194
agent/network/network.go
Normal file
194
agent/network/network.go
Normal file
@@ -0,0 +1,194 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"kattila-agent/models"
|
||||
)
|
||||
|
||||
type ipAddrInfo struct {
|
||||
Local string `json:"local"`
|
||||
PrefixLen int `json:"prefixlen"`
|
||||
}
|
||||
|
||||
type ipInterface struct {
|
||||
Ifname string `json:"ifname"`
|
||||
Address string `json:"address"` // MAC address in newer iproute2
|
||||
AddrInfo []ipAddrInfo `json:"addr_info"`
|
||||
}
|
||||
|
||||
func getInterfaces() ([]models.Interface, error) {
|
||||
cmd := exec.Command("ip", "-j", "a")
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var parsed []ipInterface
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var results []models.Interface
|
||||
for _, itf := range parsed {
|
||||
mac := itf.Address
|
||||
var addrs []string
|
||||
for _, info := range itf.AddrInfo {
|
||||
addrs = append(addrs, fmt.Sprintf("%s/%d", info.Local, info.PrefixLen))
|
||||
}
|
||||
|
||||
isVirtual := false
|
||||
var vpnType *string
|
||||
if strings.HasPrefix(itf.Ifname, "wg") || strings.HasPrefix(itf.Ifname, "tun") || strings.HasPrefix(itf.Ifname, "parvpn") || strings.HasPrefix(itf.Ifname, "home") || strings.HasPrefix(itf.Ifname, "tailscale") {
|
||||
isVirtual = true
|
||||
}
|
||||
|
||||
results = append(results, models.Interface{
|
||||
Name: itf.Ifname,
|
||||
MAC: mac,
|
||||
Addresses: addrs,
|
||||
IsVirtual: isVirtual,
|
||||
VPNType: vpnType,
|
||||
})
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
type ipRoute struct {
|
||||
Dst string `json:"dst"`
|
||||
Gateway string `json:"gateway"`
|
||||
Via string `json:"via"` // Sometimes present instead of gateway
|
||||
Dev string `json:"dev"`
|
||||
}
|
||||
|
||||
func getRoutes() ([]models.Route, error) {
|
||||
cmd := exec.Command("ip", "-j", "-4", "r")
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var parsed []ipRoute
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var results []models.Route
|
||||
for _, r := range parsed {
|
||||
via := r.Gateway
|
||||
if via == "" {
|
||||
via = r.Via
|
||||
}
|
||||
results = append(results, models.Route{
|
||||
Dst: r.Dst,
|
||||
Via: via,
|
||||
Dev: r.Dev,
|
||||
})
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func getWgPeers() ([]models.WGPeer, error) {
|
||||
cmd := exec.Command("wg", "show", "all", "dump")
|
||||
out, err := cmd.Output()
|
||||
if err != nil { // wg might fail or not be installed, ignore and return empty
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var peers []models.WGPeer
|
||||
lines := bytes.Split(out, []byte("\n"))
|
||||
for _, line := range lines {
|
||||
line = bytes.TrimSpace(line)
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
cols := strings.Split(string(line), "\t")
|
||||
// wg interface line: itf, privkey, pubkey, port, fwmark (5 cols)
|
||||
// wg peer line: itf, pubkey, psk, endpoint, allowed-ips, handshake, rx, tx, keepalive (9 cols)
|
||||
if len(cols) >= 8 {
|
||||
itf := cols[0]
|
||||
pubkey := cols[1]
|
||||
endpoint := cols[3]
|
||||
if endpoint == "(none)" {
|
||||
endpoint = ""
|
||||
}
|
||||
allowedIpsRaw := cols[4]
|
||||
if allowedIpsRaw == "(none)" {
|
||||
allowedIpsRaw = ""
|
||||
}
|
||||
allowedIps := strings.Split(allowedIpsRaw, ",")
|
||||
|
||||
handshake, _ := strconv.ParseInt(cols[5], 10, 64)
|
||||
rx, _ := strconv.ParseInt(cols[6], 10, 64)
|
||||
tx, _ := strconv.ParseInt(cols[7], 10, 64)
|
||||
|
||||
peers = append(peers, models.WGPeer{
|
||||
Interface: itf,
|
||||
PublicKey: pubkey,
|
||||
Endpoint: endpoint,
|
||||
AllowedIPs: allowedIps,
|
||||
LatestHandshake: handshake,
|
||||
TransferRx: rx,
|
||||
TransferTx: tx,
|
||||
})
|
||||
}
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
func GatherSystemData() (models.SystemData, error) {
|
||||
hostname, _ := exec.Command("hostname").Output()
|
||||
|
||||
// Simplified uptime extraction
|
||||
uptimeBytes, _ := exec.Command("cat", "/proc/uptime").Output()
|
||||
var uptimeSec int64
|
||||
if len(uptimeBytes) > 0 {
|
||||
parts := strings.Fields(string(uptimeBytes))
|
||||
if len(parts) > 0 {
|
||||
uptimeFloat, _ := strconv.ParseFloat(parts[0], 64)
|
||||
uptimeSec = int64(uptimeFloat)
|
||||
}
|
||||
}
|
||||
|
||||
loadavgBytes, _ := exec.Command("cat", "/proc/loadavg").Output()
|
||||
var loadavg []float64
|
||||
if len(loadavgBytes) > 0 {
|
||||
parts := strings.Fields(string(loadavgBytes))
|
||||
if len(parts) >= 3 {
|
||||
l1, _ := strconv.ParseFloat(parts[0], 64)
|
||||
l2, _ := strconv.ParseFloat(parts[1], 64)
|
||||
l3, _ := strconv.ParseFloat(parts[2], 64)
|
||||
loadavg = []float64{l1, l2, l3}
|
||||
}
|
||||
}
|
||||
|
||||
intfs, err := getInterfaces()
|
||||
if err != nil {
|
||||
intfs = []models.Interface{}
|
||||
}
|
||||
|
||||
routes, err := getRoutes()
|
||||
if err != nil {
|
||||
routes = []models.Route{}
|
||||
}
|
||||
|
||||
wgPeers, err := getWgPeers()
|
||||
if err != nil {
|
||||
wgPeers = []models.WGPeer{}
|
||||
}
|
||||
|
||||
return models.SystemData{
|
||||
Hostname: strings.TrimSpace(string(hostname)),
|
||||
UptimeSeconds: uptimeSec,
|
||||
LoadAvg: loadavg,
|
||||
Interfaces: intfs,
|
||||
Routes: routes,
|
||||
WGPeers: wgPeers,
|
||||
}, nil
|
||||
}
|
||||
191
agent/reporter/reporter.go
Normal file
191
agent/reporter/reporter.go
Normal file
@@ -0,0 +1,191 @@
|
||||
package reporter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"kattila-agent/config"
|
||||
"kattila-agent/models"
|
||||
"kattila-agent/network"
|
||||
"kattila-agent/security"
|
||||
)
|
||||
|
||||
var tickCounter int64 = 0
|
||||
|
||||
func StartLoop() {
|
||||
doReport() // run immediately
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
doReport()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func doReport() {
|
||||
data, err := network.GatherSystemData()
|
||||
if err != nil {
|
||||
log.Printf("reporter: gather error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
tickCounter++
|
||||
now := time.Now().Unix()
|
||||
|
||||
report := models.Report{
|
||||
Version: 1,
|
||||
Tick: tickCounter,
|
||||
Type: "report",
|
||||
Nonce: security.GenerateNonce(),
|
||||
Timestamp: now,
|
||||
AgentID: config.Cfg.AgentID,
|
||||
AgentVersion: 1,
|
||||
FleetID: security.FleetID(),
|
||||
Data: data,
|
||||
}
|
||||
|
||||
report.HMAC = security.SignPayload(report.Data)
|
||||
|
||||
err = pushToManager(report)
|
||||
if err != nil {
|
||||
log.Printf("reporter: direct push failed (%v). Attempting relay scan...", err)
|
||||
tryRelay(report, data)
|
||||
}
|
||||
}
|
||||
|
||||
func pushToManager(report models.Report) error {
|
||||
body, _ := json.Marshal(report)
|
||||
url := strings.TrimRight(config.Cfg.ManagerURL, "/") + "/status/updates"
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("bad status code %d: %s", resp.StatusCode, respBody)
|
||||
}
|
||||
log.Printf("reporter: Report successfully sent to Manager (tick %d)", report.Tick)
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandleRelay(body []byte) error {
|
||||
var envelope models.RelayEnvelope
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range envelope.RelayPath {
|
||||
if id == config.Cfg.AgentID {
|
||||
log.Println("reporter: Dropped relay request: routing loop detected")
|
||||
return errors.New("routing loop detected")
|
||||
}
|
||||
}
|
||||
|
||||
envelope.RelayPath = append(envelope.RelayPath, config.Cfg.AgentID)
|
||||
if len(envelope.RelayPath) > 3 {
|
||||
return errors.New("relay hop limit exceeded")
|
||||
}
|
||||
|
||||
envelopeBody, _ := json.Marshal(envelope)
|
||||
url := strings.TrimRight(config.Cfg.ManagerURL, "/") + "/status/updates"
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(envelopeBody))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("reporter: Manager unreachable during relay forward, hopping further...")
|
||||
|
||||
data, err := network.GatherSystemData()
|
||||
if err == nil {
|
||||
return tryRelayEnvelope(envelope, data)
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("bad status from manager: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
log.Printf("reporter: Successfully relayed message for %s", envelope.Payload.AgentID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func tryRelay(report models.Report, localData models.SystemData) {
|
||||
env := models.RelayEnvelope{
|
||||
RelayPath: []string{config.Cfg.AgentID},
|
||||
Payload: report,
|
||||
}
|
||||
err := tryRelayEnvelope(env, localData)
|
||||
if err != nil {
|
||||
log.Printf("reporter: Exhausted all relays, couldn't push report: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func tryRelayEnvelope(env models.RelayEnvelope, data models.SystemData) error {
|
||||
for _, wg := range data.WGPeers {
|
||||
for _, allowedRaw := range wg.AllowedIPs {
|
||||
ip, _, err := net.ParseCIDR(allowedRaw)
|
||||
if err != nil {
|
||||
ip = net.ParseIP(allowedRaw)
|
||||
}
|
||||
if ip != nil {
|
||||
ipTarget := ip.String()
|
||||
if pingPeer(ipTarget) {
|
||||
log.Printf("reporter: Found relay peer at %s, forwarding...", ipTarget)
|
||||
err := pushToRelay(ipTarget, env)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
log.Printf("reporter: Failed to push to relay %s: %v", ipTarget, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("no working relays found")
|
||||
}
|
||||
|
||||
func pingPeer(ip string) bool {
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
resp, err := client.Get(fmt.Sprintf("http://%s:%s/status/peer", ip, config.Cfg.AgentPort))
|
||||
if err == nil {
|
||||
defer resp.Body.Close()
|
||||
return resp.StatusCode == http.StatusOK
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func pushToRelay(ip string, env models.RelayEnvelope) error {
|
||||
body, _ := json.Marshal(env)
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Post(fmt.Sprintf("http://%s:%s/status/relay", ip, config.Cfg.AgentPort), "application/json", bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("relay rejected forwarding attempt with %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
100
agent/security/security.go
Normal file
100
agent/security/security.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package security
|
||||
|
||||
import (
|
||||
"crypto/hmac"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"kattila-agent/config"
|
||||
)
|
||||
|
||||
var (
|
||||
currentPSK string
|
||||
mu sync.RWMutex
|
||||
)
|
||||
|
||||
// StartKeyPoller checks the DNS record every hour
|
||||
func StartKeyPoller() {
|
||||
fetchKey()
|
||||
ticker := time.NewTicker(1 * time.Hour)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
fetchKey()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func fetchKey() {
|
||||
dnsName := config.Cfg.DNS
|
||||
if dnsName == "" {
|
||||
log.Println("security: No DNS configured for PSK")
|
||||
return
|
||||
}
|
||||
|
||||
txts, err := net.LookupTXT(dnsName)
|
||||
if err != nil {
|
||||
log.Printf("security: Failed to lookup TXT for %s: %v", dnsName, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(txts) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
key := txts[0]
|
||||
// Remove quotes if present
|
||||
key = strings.Trim(key, `"`)
|
||||
|
||||
mu.Lock()
|
||||
if currentPSK != key {
|
||||
log.Println("security: New PSK discovered via DNS")
|
||||
currentPSK = key
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func GetCurrentPSK() string {
|
||||
mu.RLock()
|
||||
defer mu.RUnlock()
|
||||
return currentPSK
|
||||
}
|
||||
|
||||
// FleetID generates a SHA256 of the PSK to uniquely identify the fleet
|
||||
func FleetID() string {
|
||||
psk := GetCurrentPSK()
|
||||
if psk == "" {
|
||||
return ""
|
||||
}
|
||||
hash := sha256.Sum256([]byte(psk))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
func GenerateNonce() string {
|
||||
b := make([]byte, 16)
|
||||
rand.Read(b)
|
||||
return base64.StdEncoding.EncodeToString(b)
|
||||
}
|
||||
|
||||
func SignPayload(data interface{}) string {
|
||||
psk := GetCurrentPSK()
|
||||
if psk == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
h := hmac.New(sha256.New, []byte(psk))
|
||||
h.Write(bytes)
|
||||
return hex.EncodeToString(h.Sum(nil))
|
||||
}
|
||||
10
manager/app.py
Normal file
10
manager/app.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from flask import Flask, jsonify
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route('/status/healthcheck')
|
||||
def healthcheck():
|
||||
return jsonify({"status": "ok"})
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5086)
|
||||
1
manager/requirements.txt
Normal file
1
manager/requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
Flask>=3.0.0
|
||||
Reference in New Issue
Block a user