Claude Code session 1.

This commit is contained in:
Kalzu Rekku
2026-01-08 12:11:26 +02:00
parent c59523060d
commit 6db2e58dcd
20 changed files with 5497 additions and 83 deletions

View File

@@ -1,7 +1,344 @@
# output service
# Output Service
Service to receive output from ping_service instances.
Builds database of mappable nodes.
Updates input services address lists with all working endpoints and working hops from the traces.
HTTP service that receives ping and traceroute results from distributed `ping_service` nodes, stores them in SQLite databases with automatic rotation, extracts intermediate hops from traceroute data, and feeds them back to `input_service`.
Have reporting api endpoints for the manager to monitor the progress.
## Purpose
- **Data Collection**: Store ping results and traceroute paths from multiple ping_service instances
- **Hop Discovery**: Extract intermediate hop IPs from traceroute data
- **Feedback Loop**: Send discovered hops to input_service to grow the target pool organically
- **Data Management**: Automatic database rotation and retention policy
- **Observability**: Expose metrics and statistics for monitoring
## Features
- **Multi-Instance Ready**: Each instance maintains its own SQLite database
- **Automatic Rotation**: Databases rotate weekly OR when reaching 100MB (whichever first)
- **Retention Policy**: Keeps 5 most recent database files, auto-deletes older ones
- **Hop Deduplication**: Tracks sent hops to minimize duplicate network traffic to input_service
- **Manual Operations**: API endpoints for manual rotation and database dumps
- **Health Monitoring**: Prometheus metrics, stats, and health checks
## Requirements
- Go 1.25+
- SQLite3 (via go-sqlite3 driver)
## Building
```bash
cd output_service
go build -o output_service main.go
```
## Usage
### Basic
```bash
./output_service
```
Starts on port 8081 for results, port 8091 for health checks.
### With Custom Configuration
```bash
./output_service \
--port=8082 \
--health-port=8092 \
--input-url=http://input-service:8080/hops \
--db-dir=/var/lib/output_service \
--max-size-mb=200 \
--rotation-days=14 \
--keep-files=10 \
--verbose
```
### Command Line Flags
| Flag | Default | Description |
|------|---------|-------------|
| `--port` | 8081 | Port for receiving results |
| `--health-port` | 8091 | Port for health/metrics endpoints |
| `--input-url` | `http://localhost:8080/hops` | Input service URL for hop submission |
| `--db-dir` | `./output_data` | Directory for database files |
| `--max-size-mb` | 100 | Max database size (MB) before rotation |
| `--rotation-days` | 7 | Rotate database after N days |
| `--keep-files` | 5 | Number of database files to retain |
| `-v, --verbose` | false | Enable verbose logging |
| `--version` | - | Show version |
| `--help` | - | Show help |
## API Endpoints
### Main Service (Port 8081)
#### `POST /results`
Receive ping results from ping_service nodes.
**Request Body**: JSON array of ping results
```json
[
{
"ip": "8.8.8.8",
"sent": 4,
"received": 4,
"packet_loss": 0,
"avg_rtt": 15000000,
"timestamp": "2026-01-07T22:30:00Z",
"traceroute": {
"method": "icmp",
"completed": true,
"hops": [
{"ttl": 1, "ip": "192.168.1.1", "rtt": 2000000},
{"ttl": 2, "ip": "10.0.0.1", "rtt": 5000000},
{"ttl": 3, "ip": "8.8.8.8", "rtt": 15000000}
]
}
}
]
```
**Response**:
```json
{
"status": "ok",
"received": 1
}
```
#### `POST /rotate`
Manually trigger database rotation.
**Response**:
```json
{
"status": "rotated",
"file": "results_2026-01-07_22-30-45.db"
}
```
#### `GET /dump`
Download current SQLite database file.
**Response**: Binary SQLite database file
### Health Service (Port 8091)
#### `GET /health`
Overall health status and statistics.
**Response**:
```json
{
"status": "healthy",
"version": "0.0.1",
"uptime": "2h15m30s",
"stats": {
"total_results": 15420,
"successful_pings": 14890,
"failed_pings": 530,
"hops_discovered": 2341,
"hops_sent": 2341,
"last_result_time": "2026-01-07T22:30:15Z",
"current_db_file": "results_2026-01-07.db",
"current_db_size": 52428800,
"last_rotation": "2026-01-07T00:00:00Z"
}
}
```
#### `GET /ready`
Readiness check (verifies database connectivity).
**Response**: `200 OK` if ready, `503 Service Unavailable` if not
#### `GET /metrics`
Prometheus-compatible metrics.
**Response** (text/plain):
```
# HELP output_service_total_results Total number of results processed
# TYPE output_service_total_results counter
output_service_total_results 15420
# HELP output_service_successful_pings Total successful pings
# TYPE output_service_successful_pings counter
output_service_successful_pings 14890
...
```
#### `GET /stats`
Detailed statistics in JSON format.
**Response**: Same as `stats` object in `/health`
#### `GET /recent?limit=100&ip=8.8.8.8`
Query recent ping results.
**Query Parameters**:
- `limit` (optional): Max results to return (default 100, max 1000)
- `ip` (optional): Filter by specific IP address
**Response**:
```json
[
{
"id": 12345,
"ip": "8.8.8.8",
"sent": 4,
"received": 4,
"packet_loss": 0,
"avg_rtt": 15000000,
"timestamp": "2026-01-07T22:30:00Z"
}
]
```
## Database Schema
### `ping_results`
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER | Primary key |
| ip | TEXT | Target IP address |
| sent | INTEGER | Packets sent |
| received | INTEGER | Packets received |
| packet_loss | REAL | Packet loss percentage |
| avg_rtt | INTEGER | Average RTT (nanoseconds) |
| timestamp | DATETIME | Ping timestamp |
| error | TEXT | Error message if failed |
| created_at | DATETIME | Record creation time |
**Indexes**: `ip`, `timestamp`
### `traceroute_results`
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER | Primary key |
| ping_result_id | INTEGER | Foreign key to ping_results |
| method | TEXT | Traceroute method (icmp/tcp) |
| completed | BOOLEAN | Whether trace completed |
| error | TEXT | Error message if failed |
### `traceroute_hops`
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER | Primary key |
| traceroute_id | INTEGER | Foreign key to traceroute_results |
| ttl | INTEGER | Time-to-live / hop number |
| ip | TEXT | Hop IP address |
| rtt | INTEGER | Round-trip time (nanoseconds) |
| timeout | BOOLEAN | Whether hop timed out |
**Indexes**: `ip` (for hop discovery)
## Database Rotation
Rotation triggers automatically when **either** condition is met:
- **Time**: Database age exceeds `rotation_days` (default 7 days)
- **Size**: Database size exceeds `max_size_mb` (default 100MB)
Rotation process:
1. Close current database connection
2. Create new database with timestamp filename (`results_2026-01-07_22-30-45.db`)
3. Initialize schema in new database
4. Delete oldest database files if count exceeds `keep_files`
Manual rotation: `curl -X POST http://localhost:8081/rotate`
## Hop Discovery and Feedback
1. **Extraction**: For each traceroute, extract non-timeout hop IPs
2. **Deduplication**: Track sent hops in memory to avoid re-sending
3. **Submission**: HTTP POST to input_service `/hops` endpoint:
```json
{
"hops": ["10.0.0.1", "172.16.5.3", "8.8.8.8"]
}
```
4. **Statistics**: Track `hops_discovered` and `hops_sent` metrics
## Multi-Instance Deployment
Each output_service instance:
- Maintains its **own SQLite database** in `db_dir`
- Manages its **own rotation schedule** independently
- Tracks its **own hop deduplication** (some duplicate hop submissions across instances are acceptable)
- Can receive results from **multiple ping_service nodes**
For central data aggregation:
- Use `/dump` endpoint to collect database files from all instances
- Merge databases offline for analysis/visualization
- Or use shared network storage for `db_dir` (with file locking considerations)
## Integration with ping_service
Configure ping_service to send results to output_service:
**`config.yaml`** (ping_service):
```yaml
output_file: "http://output-service:8081/results"
```
## Integration with input_service
Output service expects input_service to have a `/hops` endpoint:
**Expected endpoint**: `POST /hops`
**Payload**:
```json
{
"hops": ["10.0.0.1", "172.16.5.3"]
}
```
## Monitoring
**Check health**:
```bash
curl http://localhost:8091/health
```
**View metrics**:
```bash
curl http://localhost:8091/metrics
```
**Query recent failures**:
```bash
curl 'http://localhost:8091/recent?limit=50' | jq '.[] | select(.error != null)'
```
**Download database backup**:
```bash
curl http://localhost:8081/dump -o backup.db
```
## Development Testing
Use the Python demo output server to see example data format:
```bash
cd output_service
python3 http_ouput_demo.py # Note: file has typo in name
```
## Graceful Shutdown
Press `Ctrl+C` for graceful shutdown with 10s timeout.
The service will:
1. Stop accepting new requests
2. Finish processing in-flight requests
3. Close database connections cleanly
4. Exit
## Version
Current version: **0.0.1**
## Dependencies
- `github.com/mattn/go-sqlite3` - SQLite driver (requires CGO)

5
output_service/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module output-service
go 1.25.0
require github.com/mattn/go-sqlite3 v1.14.24

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
HTTP Output Server - Receives POST requests with JSON ping results
Usage: python3 http_output_server.py
"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import signal
import sys
from datetime import datetime
class OutputHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(f"\n{'='*60}")
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Received POST request")
print(f"{'='*60}")
try:
data = json.loads(post_data)
print(json.dumps(data, indent=2))
# Summary
print(f"\n📊 Summary:")
if isinstance(data, list):
print(f" Total results: {len(data)}")
for result in data:
ip = result.get('ip', 'unknown')
loss = result.get('packet_loss', 0)
avg_rtt = result.get('avg_rtt', 0)
error = result.get('error', '')
traceroute = result.get('traceroute')
if error:
print(f"{ip}: ERROR - {error}")
else:
rtt_ms = avg_rtt / 1_000_000 # Convert ns to ms
print(f"{ip}: {loss}% loss, avg RTT: {rtt_ms:.2f}ms")
if traceroute:
hops = traceroute.get('hops', [])
method = traceroute.get('method', 'unknown')
print(f" 🛤️ Traceroute ({method}): {len(hops)} hops")
for hop in hops[:5]: # Show first 5 hops
ttl = hop.get('ttl')
hop_ip = hop.get('ip', '*')
if hop.get('timeout'):
print(f" {ttl}. * (timeout)")
else:
hop_rtt = hop.get('rtt', 0) / 1_000_000
print(f" {ttl}. {hop_ip} ({hop_rtt:.2f}ms)")
if len(hops) > 5:
print(f" ... and {len(hops) - 5} more hops")
except json.JSONDecodeError:
print("⚠️ Invalid JSON received")
print(post_data.decode('utf-8', errors='replace'))
print(f"{'='*60}\n")
# Send response
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "received"}')
def log_message(self, format, *args):
# Suppress default logging
pass
def signal_handler(sig, frame):
print("\n\n🛑 Shutting down gracefully...")
sys.exit(0)
if __name__ == "__main__":
PORT = 8081
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
server = HTTPServer(('0.0.0.0', PORT), OutputHandler)
print(f"📥 HTTP Output Server running on http://localhost:{PORT}")
print(f" Waiting for POST requests with ping results...")
print(f" Press Ctrl+C to stop")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
print("\n✅ Server stopped cleanly")

834
output_service/main.go Normal file
View File

@@ -0,0 +1,834 @@
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"sort"
"sync"
"syscall"
"time"
_ "github.com/mattn/go-sqlite3"
)
const VERSION = "0.0.1"
type Config struct {
Port int `json:"port"`
InputServiceURL string `json:"input_service_url"`
DBDir string `json:"db_dir"`
MaxDBSizeMB int64 `json:"max_db_size_mb"`
RotationDays int `json:"rotation_days"`
KeepFiles int `json:"keep_files"`
HealthCheckPort int `json:"health_check_port"`
}
// Data structures matching ping_service output
type PingResult struct {
IP string `json:"ip"`
Sent int `json:"sent"`
Received int `json:"received"`
PacketLoss float64 `json:"packet_loss"`
AvgRtt int64 `json:"avg_rtt"` // nanoseconds
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
Traceroute *TracerouteResult `json:"traceroute,omitempty"`
}
type TracerouteResult struct {
Method string `json:"method"`
Hops []TracerouteHop `json:"hops"`
Completed bool `json:"completed"`
Error string `json:"error,omitempty"`
}
type TracerouteHop struct {
TTL int `json:"ttl"`
IP string `json:"ip"`
Rtt int64 `json:"rtt,omitempty"` // nanoseconds
Timeout bool `json:"timeout,omitempty"`
}
type Stats struct {
TotalResults int64 `json:"total_results"`
SuccessfulPings int64 `json:"successful_pings"`
FailedPings int64 `json:"failed_pings"`
HopsDiscovered int64 `json:"hops_discovered"`
HopsSent int64 `json:"hops_sent"`
LastResultTime time.Time `json:"last_result_time"`
CurrentDBFile string `json:"current_db_file"`
CurrentDBSize int64 `json:"current_db_size"`
LastRotation time.Time `json:"last_rotation"`
}
var (
config Config
db *sql.DB
dbMux sync.RWMutex
stats Stats
statsMux sync.RWMutex
sentHops = make(map[string]bool) // Track sent hops to avoid duplicates
sentHopsMux sync.RWMutex
verbose bool
startTime time.Time
)
func main() {
// CLI flags
port := flag.Int("port", 8081, "Port to listen on")
healthPort := flag.Int("health-port", 8091, "Health check port")
inputURL := flag.String("input-url", "http://localhost:8080/hops", "Input service URL for hop submission")
dbDir := flag.String("db-dir", "./output_data", "Directory to store database files")
maxSize := flag.Int64("max-size-mb", 100, "Maximum database size in MB before rotation")
rotationDays := flag.Int("rotation-days", 7, "Rotate database after this many days")
keepFiles := flag.Int("keep-files", 5, "Number of database files to keep")
verboseFlag := flag.Bool("v", false, "Enable verbose logging")
flag.BoolVar(verboseFlag, "verbose", false, "Enable verbose logging")
versionFlag := flag.Bool("version", false, "Show version")
help := flag.Bool("help", false, "Show help message")
flag.Parse()
if *versionFlag {
fmt.Printf("output-service version %s\n", VERSION)
os.Exit(0)
}
if *help {
fmt.Println("Output Service - Receive and store ping/traceroute results")
fmt.Printf("Version: %s\n\n", VERSION)
fmt.Println("Flags:")
flag.PrintDefaults()
os.Exit(0)
}
verbose = *verboseFlag
startTime = time.Now()
config = Config{
Port: *port,
InputServiceURL: *inputURL,
DBDir: *dbDir,
MaxDBSizeMB: *maxSize,
RotationDays: *rotationDays,
KeepFiles: *keepFiles,
HealthCheckPort: *healthPort,
}
// Create database directory if it doesn't exist
if err := os.MkdirAll(config.DBDir, 0755); err != nil {
log.Fatalf("Failed to create database directory: %v", err)
}
// Initialize database
if err := initDB(); err != nil {
log.Fatalf("Failed to initialize database: %v", err)
}
defer closeDB()
// Start background rotation checker
go rotationChecker()
// Setup HTTP handlers
mux := http.NewServeMux()
mux.HandleFunc("/results", handleResults)
mux.HandleFunc("/rotate", handleRotate)
mux.HandleFunc("/dump", handleDump)
// Health check handlers
healthMux := http.NewServeMux()
healthMux.HandleFunc("/health", handleHealth)
healthMux.HandleFunc("/service-info", handleServiceInfo)
healthMux.HandleFunc("/ready", handleReady)
healthMux.HandleFunc("/metrics", handleMetrics)
healthMux.HandleFunc("/stats", handleStats)
healthMux.HandleFunc("/recent", handleRecent)
// Create servers
server := &http.Server{
Addr: fmt.Sprintf(":%d", config.Port),
Handler: mux,
}
healthServer := &http.Server{
Addr: fmt.Sprintf(":%d", config.HealthCheckPort),
Handler: healthMux,
}
// Graceful shutdown handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
log.Printf("🚀 Output Service v%s starting...", VERSION)
log.Printf("📥 Listening for results on http://localhost:%d/results", config.Port)
log.Printf("🏥 Health checks on http://localhost:%d", config.HealthCheckPort)
log.Printf("💾 Database directory: %s", config.DBDir)
log.Printf("🔄 Rotation: %d days OR %d MB, keeping %d files",
config.RotationDays, config.MaxDBSizeMB, config.KeepFiles)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
go func() {
if err := healthServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Health server error: %v", err)
}
}()
// Wait for shutdown signal
<-sigChan
log.Println("\n🛑 Shutting down gracefully...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
if err := healthServer.Shutdown(ctx); err != nil {
log.Printf("Health server shutdown error: %v", err)
}
log.Println("✅ Shutdown complete")
}
func initDB() error {
dbMux.Lock()
defer dbMux.Unlock()
// Find or create current database file
dbFile := getCurrentDBFile()
var err error
db, err = sql.Open("sqlite3", dbFile)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
// Create tables
schema := `
CREATE TABLE IF NOT EXISTS ping_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
sent INTEGER,
received INTEGER,
packet_loss REAL,
avg_rtt INTEGER,
timestamp DATETIME,
error TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS traceroute_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ping_result_id INTEGER,
method TEXT,
completed BOOLEAN,
error TEXT,
FOREIGN KEY(ping_result_id) REFERENCES ping_results(id)
);
CREATE TABLE IF NOT EXISTS traceroute_hops (
id INTEGER PRIMARY KEY AUTOINCREMENT,
traceroute_id INTEGER,
ttl INTEGER,
ip TEXT,
rtt INTEGER,
timeout BOOLEAN,
FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id)
);
CREATE INDEX IF NOT EXISTS idx_ping_ip ON ping_results(ip);
CREATE INDEX IF NOT EXISTS idx_ping_timestamp ON ping_results(timestamp);
CREATE INDEX IF NOT EXISTS idx_hop_ip ON traceroute_hops(ip);
`
if _, err := db.Exec(schema); err != nil {
return fmt.Errorf("failed to create schema: %w", err)
}
// Update stats
statsMux.Lock()
stats.CurrentDBFile = filepath.Base(dbFile)
stats.CurrentDBSize = getFileSize(dbFile)
statsMux.Unlock()
log.Printf("📂 Database initialized: %s", filepath.Base(dbFile))
return nil
}
func closeDB() {
dbMux.Lock()
defer dbMux.Unlock()
if db != nil {
db.Close()
}
}
func getCurrentDBFile() string {
// Check for most recent database file
files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db"))
if err != nil || len(files) == 0 {
// Create new file with current date
return filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02")))
}
// Sort and return most recent
sort.Strings(files)
return files[len(files)-1]
}
func getFileSize(path string) int64 {
info, err := os.Stat(path)
if err != nil {
return 0
}
return info.Size()
}
func rotationChecker() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
checkAndRotate()
}
}
func checkAndRotate() {
dbMux.RLock()
currentFile := getCurrentDBFile()
dbMux.RUnlock()
// Check size
size := getFileSize(currentFile)
sizeMB := size / (1024 * 1024)
// Check age
fileInfo, err := os.Stat(currentFile)
if err != nil {
return
}
age := time.Since(fileInfo.ModTime())
ageDays := int(age.Hours() / 24)
if sizeMB >= config.MaxDBSizeMB {
log.Printf("🔄 Database size (%d MB) exceeds limit (%d MB), rotating...", sizeMB, config.MaxDBSizeMB)
rotateDB()
} else if ageDays >= config.RotationDays {
log.Printf("🔄 Database age (%d days) exceeds limit (%d days), rotating...", ageDays, config.RotationDays)
rotateDB()
}
}
func rotateDB() error {
dbMux.Lock()
defer dbMux.Unlock()
// Close current database
if db != nil {
db.Close()
}
// Create new database file
newFile := filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02_15-04-05")))
var err error
db, err = sql.Open("sqlite3", newFile)
if err != nil {
return fmt.Errorf("failed to open new database: %w", err)
}
// Create schema in new database
schema := `
CREATE TABLE ping_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
sent INTEGER,
received INTEGER,
packet_loss REAL,
avg_rtt INTEGER,
timestamp DATETIME,
error TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE traceroute_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ping_result_id INTEGER,
method TEXT,
completed BOOLEAN,
error TEXT,
FOREIGN KEY(ping_result_id) REFERENCES ping_results(id)
);
CREATE TABLE traceroute_hops (
id INTEGER PRIMARY KEY AUTOINCREMENT,
traceroute_id INTEGER,
ttl INTEGER,
ip TEXT,
rtt INTEGER,
timeout BOOLEAN,
FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id)
);
CREATE INDEX idx_ping_ip ON ping_results(ip);
CREATE INDEX idx_ping_timestamp ON ping_results(timestamp);
CREATE INDEX idx_hop_ip ON traceroute_hops(ip);
`
if _, err := db.Exec(schema); err != nil {
return fmt.Errorf("failed to create schema in new database: %w", err)
}
// Update stats
statsMux.Lock()
stats.CurrentDBFile = filepath.Base(newFile)
stats.CurrentDBSize = 0
stats.LastRotation = time.Now()
statsMux.Unlock()
// Cleanup old files
cleanupOldDBFiles()
log.Printf("✅ Rotated to new database: %s", filepath.Base(newFile))
return nil
}
func cleanupOldDBFiles() {
files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db"))
if err != nil || len(files) <= config.KeepFiles {
return
}
// Sort by name (chronological due to timestamp format)
sort.Strings(files)
// Remove oldest files
toRemove := len(files) - config.KeepFiles
for i := 0; i < toRemove; i++ {
if err := os.Remove(files[i]); err != nil {
log.Printf("⚠️ Failed to remove old database %s: %v", files[i], err)
} else {
log.Printf("🗑️ Removed old database: %s", filepath.Base(files[i]))
}
}
}
// HTTP Handlers
func handleResults(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read body", http.StatusBadRequest)
return
}
defer r.Body.Close()
var results []PingResult
if err := json.Unmarshal(body, &results); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
if verbose {
log.Printf("📥 Received %d ping results", len(results))
}
// Process results
for _, result := range results {
if err := storeResult(&result); err != nil {
log.Printf("⚠️ Failed to store result for %s: %v", result.IP, err)
continue
}
// Update stats
statsMux.Lock()
stats.TotalResults++
if result.Error != "" {
stats.FailedPings++
} else {
stats.SuccessfulPings++
}
stats.LastResultTime = time.Now()
stats.CurrentDBSize = getFileSize(getCurrentDBFile())
statsMux.Unlock()
// Extract and send hops
if result.Traceroute != nil {
go extractAndSendHops(&result)
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"received": len(results),
})
}
func storeResult(result *PingResult) error {
dbMux.RLock()
defer dbMux.RUnlock()
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Insert ping result
res, err := tx.Exec(`
INSERT INTO ping_results (ip, sent, received, packet_loss, avg_rtt, timestamp, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, result.IP, result.Sent, result.Received, result.PacketLoss, result.AvgRtt, result.Timestamp, result.Error)
if err != nil {
return err
}
pingID, err := res.LastInsertId()
if err != nil {
return err
}
// Insert traceroute if present
if result.Traceroute != nil {
traceRes, err := tx.Exec(`
INSERT INTO traceroute_results (ping_result_id, method, completed, error)
VALUES (?, ?, ?, ?)
`, pingID, result.Traceroute.Method, result.Traceroute.Completed, result.Traceroute.Error)
if err != nil {
return err
}
traceID, err := traceRes.LastInsertId()
if err != nil {
return err
}
// Insert hops
for _, hop := range result.Traceroute.Hops {
_, err := tx.Exec(`
INSERT INTO traceroute_hops (traceroute_id, ttl, ip, rtt, timeout)
VALUES (?, ?, ?, ?, ?)
`, traceID, hop.TTL, hop.IP, hop.Rtt, hop.Timeout)
if err != nil {
return err
}
}
}
return tx.Commit()
}
func extractAndSendHops(result *PingResult) {
if result.Traceroute == nil {
return
}
var newHops []string
sentHopsMux.Lock()
for _, hop := range result.Traceroute.Hops {
if hop.IP != "" && !hop.Timeout && hop.IP != "*" {
if !sentHops[hop.IP] {
newHops = append(newHops, hop.IP)
sentHops[hop.IP] = true
statsMux.Lock()
stats.HopsDiscovered++
statsMux.Unlock()
}
}
}
sentHopsMux.Unlock()
if len(newHops) == 0 {
return
}
// Send to input service
payload := map[string]interface{}{
"hops": newHops,
}
jsonData, err := json.Marshal(payload)
if err != nil {
log.Printf("⚠️ Failed to marshal hops: %v", err)
return
}
resp, err := http.Post(config.InputServiceURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
if verbose {
log.Printf("⚠️ Failed to send hops to input service: %v", err)
}
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
statsMux.Lock()
stats.HopsSent += int64(len(newHops))
statsMux.Unlock()
if verbose {
log.Printf("✅ Sent %d new hops to input service", len(newHops))
}
} else {
if verbose {
log.Printf("⚠️ Input service returned status %d", resp.StatusCode)
}
}
}
func handleRotate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
log.Println("🔄 Manual rotation triggered")
if err := rotateDB(); err != nil {
http.Error(w, fmt.Sprintf("Rotation failed: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "rotated",
"file": stats.CurrentDBFile,
})
}
func handleDump(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
dbMux.RLock()
currentFile := getCurrentDBFile()
dbMux.RUnlock()
// Set headers for file download
w.Header().Set("Content-Type", "application/x-sqlite3")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filepath.Base(currentFile)))
// Stream the file
file, err := os.Open(currentFile)
if err != nil {
http.Error(w, "Failed to open database", http.StatusInternalServerError)
return
}
defer file.Close()
if _, err := io.Copy(w, file); err != nil {
log.Printf("⚠️ Failed to stream database: %v", err)
}
if verbose {
log.Printf("📤 Database dump sent: %s", filepath.Base(currentFile))
}
}
// ServiceInfo represents service metadata for discovery
type ServiceInfo struct {
ServiceType string `json:"service_type"`
Version string `json:"version"`
Name string `json:"name"`
InstanceID string `json:"instance_id"`
Capabilities []string `json:"capabilities"`
}
func handleServiceInfo(w http.ResponseWriter, r *http.Request) {
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
info := ServiceInfo{
ServiceType: "output",
Version: VERSION,
Name: "output_service",
InstanceID: hostname,
Capabilities: []string{"result_storage", "hop_extraction", "database_rotation"},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
}
func handleHealth(w http.ResponseWriter, r *http.Request) {
statsMux.RLock()
defer statsMux.RUnlock()
health := map[string]interface{}{
"status": "healthy",
"version": VERSION,
"uptime": time.Since(startTime).String(),
"stats": stats,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(health)
}
func handleReady(w http.ResponseWriter, r *http.Request) {
dbMux.RLock()
defer dbMux.RUnlock()
if db == nil {
http.Error(w, "Database not ready", http.StatusServiceUnavailable)
return
}
if err := db.Ping(); err != nil {
http.Error(w, "Database not responding", http.StatusServiceUnavailable)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ready"})
}
func handleMetrics(w http.ResponseWriter, r *http.Request) {
statsMux.RLock()
defer statsMux.RUnlock()
// Prometheus-style metrics
metrics := fmt.Sprintf(`# HELP output_service_total_results Total number of results processed
# TYPE output_service_total_results counter
output_service_total_results %d
# HELP output_service_successful_pings Total successful pings
# TYPE output_service_successful_pings counter
output_service_successful_pings %d
# HELP output_service_failed_pings Total failed pings
# TYPE output_service_failed_pings counter
output_service_failed_pings %d
# HELP output_service_hops_discovered Total hops discovered
# TYPE output_service_hops_discovered counter
output_service_hops_discovered %d
# HELP output_service_hops_sent Total hops sent to input service
# TYPE output_service_hops_sent counter
output_service_hops_sent %d
# HELP output_service_db_size_bytes Current database size in bytes
# TYPE output_service_db_size_bytes gauge
output_service_db_size_bytes %d
`,
stats.TotalResults,
stats.SuccessfulPings,
stats.FailedPings,
stats.HopsDiscovered,
stats.HopsSent,
stats.CurrentDBSize,
)
w.Header().Set("Content-Type", "text/plain")
w.Write([]byte(metrics))
}
func handleStats(w http.ResponseWriter, r *http.Request) {
statsMux.RLock()
defer statsMux.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}
func handleRecent(w http.ResponseWriter, r *http.Request) {
// Parse query parameters
limitStr := r.URL.Query().Get("limit")
limit := 100
if limitStr != "" {
if l, err := fmt.Sscanf(limitStr, "%d", &limit); err == nil && l == 1 {
if limit > 1000 {
limit = 1000
}
}
}
ipFilter := r.URL.Query().Get("ip")
dbMux.RLock()
defer dbMux.RUnlock()
query := `
SELECT id, ip, sent, received, packet_loss, avg_rtt, timestamp, error
FROM ping_results
`
args := []interface{}{}
if ipFilter != "" {
query += " WHERE ip = ?"
args = append(args, ipFilter)
}
query += " ORDER BY timestamp DESC LIMIT ?"
args = append(args, limit)
rows, err := db.Query(query, args...)
if err != nil {
http.Error(w, "Query failed", http.StatusInternalServerError)
return
}
defer rows.Close()
var results []map[string]interface{}
for rows.Next() {
var id int
var ip, errorMsg string
var sent, received int
var packetLoss float64
var avgRtt int64
var timestamp time.Time
if err := rows.Scan(&id, &ip, &sent, &received, &packetLoss, &avgRtt, &timestamp, &errorMsg); err != nil {
continue
}
result := map[string]interface{}{
"id": id,
"ip": ip,
"sent": sent,
"received": received,
"packet_loss": packetLoss,
"avg_rtt": avgRtt,
"timestamp": timestamp,
}
if errorMsg != "" {
result["error"] = errorMsg
}
results = append(results, result)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(results)
}