835 lines
20 KiB
Go
835 lines
20 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
const VERSION = "0.0.1"
|
|
|
|
type Config struct {
|
|
Port int `json:"port"`
|
|
InputServiceURL string `json:"input_service_url"`
|
|
DBDir string `json:"db_dir"`
|
|
MaxDBSizeMB int64 `json:"max_db_size_mb"`
|
|
RotationDays int `json:"rotation_days"`
|
|
KeepFiles int `json:"keep_files"`
|
|
HealthCheckPort int `json:"health_check_port"`
|
|
}
|
|
|
|
// Data structures matching ping_service output
|
|
type PingResult struct {
|
|
IP string `json:"ip"`
|
|
Sent int `json:"sent"`
|
|
Received int `json:"received"`
|
|
PacketLoss float64 `json:"packet_loss"`
|
|
AvgRtt int64 `json:"avg_rtt"` // nanoseconds
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Error string `json:"error,omitempty"`
|
|
Traceroute *TracerouteResult `json:"traceroute,omitempty"`
|
|
}
|
|
|
|
type TracerouteResult struct {
|
|
Method string `json:"method"`
|
|
Hops []TracerouteHop `json:"hops"`
|
|
Completed bool `json:"completed"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type TracerouteHop struct {
|
|
TTL int `json:"ttl"`
|
|
IP string `json:"ip"`
|
|
Rtt int64 `json:"rtt,omitempty"` // nanoseconds
|
|
Timeout bool `json:"timeout,omitempty"`
|
|
}
|
|
|
|
type Stats struct {
|
|
TotalResults int64 `json:"total_results"`
|
|
SuccessfulPings int64 `json:"successful_pings"`
|
|
FailedPings int64 `json:"failed_pings"`
|
|
HopsDiscovered int64 `json:"hops_discovered"`
|
|
HopsSent int64 `json:"hops_sent"`
|
|
LastResultTime time.Time `json:"last_result_time"`
|
|
CurrentDBFile string `json:"current_db_file"`
|
|
CurrentDBSize int64 `json:"current_db_size"`
|
|
LastRotation time.Time `json:"last_rotation"`
|
|
}
|
|
|
|
var (
|
|
config Config
|
|
db *sql.DB
|
|
dbMux sync.RWMutex
|
|
stats Stats
|
|
statsMux sync.RWMutex
|
|
sentHops = make(map[string]bool) // Track sent hops to avoid duplicates
|
|
sentHopsMux sync.RWMutex
|
|
verbose bool
|
|
startTime time.Time
|
|
)
|
|
|
|
func main() {
|
|
// CLI flags
|
|
port := flag.Int("port", 8081, "Port to listen on")
|
|
healthPort := flag.Int("health-port", 8091, "Health check port")
|
|
inputURL := flag.String("input-url", "http://localhost:8080/hops", "Input service URL for hop submission")
|
|
dbDir := flag.String("db-dir", "./output_data", "Directory to store database files")
|
|
maxSize := flag.Int64("max-size-mb", 100, "Maximum database size in MB before rotation")
|
|
rotationDays := flag.Int("rotation-days", 7, "Rotate database after this many days")
|
|
keepFiles := flag.Int("keep-files", 5, "Number of database files to keep")
|
|
verboseFlag := flag.Bool("v", false, "Enable verbose logging")
|
|
flag.BoolVar(verboseFlag, "verbose", false, "Enable verbose logging")
|
|
versionFlag := flag.Bool("version", false, "Show version")
|
|
help := flag.Bool("help", false, "Show help message")
|
|
flag.Parse()
|
|
|
|
if *versionFlag {
|
|
fmt.Printf("output-service version %s\n", VERSION)
|
|
os.Exit(0)
|
|
}
|
|
|
|
if *help {
|
|
fmt.Println("Output Service - Receive and store ping/traceroute results")
|
|
fmt.Printf("Version: %s\n\n", VERSION)
|
|
fmt.Println("Flags:")
|
|
flag.PrintDefaults()
|
|
os.Exit(0)
|
|
}
|
|
|
|
verbose = *verboseFlag
|
|
startTime = time.Now()
|
|
|
|
config = Config{
|
|
Port: *port,
|
|
InputServiceURL: *inputURL,
|
|
DBDir: *dbDir,
|
|
MaxDBSizeMB: *maxSize,
|
|
RotationDays: *rotationDays,
|
|
KeepFiles: *keepFiles,
|
|
HealthCheckPort: *healthPort,
|
|
}
|
|
|
|
// Create database directory if it doesn't exist
|
|
if err := os.MkdirAll(config.DBDir, 0755); err != nil {
|
|
log.Fatalf("Failed to create database directory: %v", err)
|
|
}
|
|
|
|
// Initialize database
|
|
if err := initDB(); err != nil {
|
|
log.Fatalf("Failed to initialize database: %v", err)
|
|
}
|
|
defer closeDB()
|
|
|
|
// Start background rotation checker
|
|
go rotationChecker()
|
|
|
|
// Setup HTTP handlers
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/results", handleResults)
|
|
mux.HandleFunc("/rotate", handleRotate)
|
|
mux.HandleFunc("/dump", handleDump)
|
|
|
|
// Health check handlers
|
|
healthMux := http.NewServeMux()
|
|
healthMux.HandleFunc("/health", handleHealth)
|
|
healthMux.HandleFunc("/service-info", handleServiceInfo)
|
|
healthMux.HandleFunc("/ready", handleReady)
|
|
healthMux.HandleFunc("/metrics", handleMetrics)
|
|
healthMux.HandleFunc("/stats", handleStats)
|
|
healthMux.HandleFunc("/recent", handleRecent)
|
|
|
|
// Create servers
|
|
server := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", config.Port),
|
|
Handler: mux,
|
|
}
|
|
|
|
healthServer := &http.Server{
|
|
Addr: fmt.Sprintf(":%d", config.HealthCheckPort),
|
|
Handler: healthMux,
|
|
}
|
|
|
|
// Graceful shutdown handling
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
log.Printf("🚀 Output Service v%s starting...", VERSION)
|
|
log.Printf("📥 Listening for results on http://localhost:%d/results", config.Port)
|
|
log.Printf("🏥 Health checks on http://localhost:%d", config.HealthCheckPort)
|
|
log.Printf("💾 Database directory: %s", config.DBDir)
|
|
log.Printf("🔄 Rotation: %d days OR %d MB, keeping %d files",
|
|
config.RotationDays, config.MaxDBSizeMB, config.KeepFiles)
|
|
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("Server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
if err := healthServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("Health server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal
|
|
<-sigChan
|
|
log.Println("\n🛑 Shutting down gracefully...")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := server.Shutdown(ctx); err != nil {
|
|
log.Printf("Server shutdown error: %v", err)
|
|
}
|
|
if err := healthServer.Shutdown(ctx); err != nil {
|
|
log.Printf("Health server shutdown error: %v", err)
|
|
}
|
|
|
|
log.Println("✅ Shutdown complete")
|
|
}
|
|
|
|
func initDB() error {
|
|
dbMux.Lock()
|
|
defer dbMux.Unlock()
|
|
|
|
// Find or create current database file
|
|
dbFile := getCurrentDBFile()
|
|
|
|
var err error
|
|
db, err = sql.Open("sqlite3", dbFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
// Create tables
|
|
schema := `
|
|
CREATE TABLE IF NOT EXISTS ping_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ip TEXT NOT NULL,
|
|
sent INTEGER,
|
|
received INTEGER,
|
|
packet_loss REAL,
|
|
avg_rtt INTEGER,
|
|
timestamp DATETIME,
|
|
error TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS traceroute_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ping_result_id INTEGER,
|
|
method TEXT,
|
|
completed BOOLEAN,
|
|
error TEXT,
|
|
FOREIGN KEY(ping_result_id) REFERENCES ping_results(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS traceroute_hops (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
traceroute_id INTEGER,
|
|
ttl INTEGER,
|
|
ip TEXT,
|
|
rtt INTEGER,
|
|
timeout BOOLEAN,
|
|
FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ping_ip ON ping_results(ip);
|
|
CREATE INDEX IF NOT EXISTS idx_ping_timestamp ON ping_results(timestamp);
|
|
CREATE INDEX IF NOT EXISTS idx_hop_ip ON traceroute_hops(ip);
|
|
`
|
|
|
|
if _, err := db.Exec(schema); err != nil {
|
|
return fmt.Errorf("failed to create schema: %w", err)
|
|
}
|
|
|
|
// Update stats
|
|
statsMux.Lock()
|
|
stats.CurrentDBFile = filepath.Base(dbFile)
|
|
stats.CurrentDBSize = getFileSize(dbFile)
|
|
statsMux.Unlock()
|
|
|
|
log.Printf("📂 Database initialized: %s", filepath.Base(dbFile))
|
|
return nil
|
|
}
|
|
|
|
func closeDB() {
|
|
dbMux.Lock()
|
|
defer dbMux.Unlock()
|
|
|
|
if db != nil {
|
|
db.Close()
|
|
}
|
|
}
|
|
|
|
func getCurrentDBFile() string {
|
|
// Check for most recent database file
|
|
files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db"))
|
|
if err != nil || len(files) == 0 {
|
|
// Create new file with current date
|
|
return filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02")))
|
|
}
|
|
|
|
// Sort and return most recent
|
|
sort.Strings(files)
|
|
return files[len(files)-1]
|
|
}
|
|
|
|
func getFileSize(path string) int64 {
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return info.Size()
|
|
}
|
|
|
|
func rotationChecker() {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
checkAndRotate()
|
|
}
|
|
}
|
|
|
|
func checkAndRotate() {
|
|
dbMux.RLock()
|
|
currentFile := getCurrentDBFile()
|
|
dbMux.RUnlock()
|
|
|
|
// Check size
|
|
size := getFileSize(currentFile)
|
|
sizeMB := size / (1024 * 1024)
|
|
|
|
// Check age
|
|
fileInfo, err := os.Stat(currentFile)
|
|
if err != nil {
|
|
return
|
|
}
|
|
age := time.Since(fileInfo.ModTime())
|
|
ageDays := int(age.Hours() / 24)
|
|
|
|
if sizeMB >= config.MaxDBSizeMB {
|
|
log.Printf("🔄 Database size (%d MB) exceeds limit (%d MB), rotating...", sizeMB, config.MaxDBSizeMB)
|
|
rotateDB()
|
|
} else if ageDays >= config.RotationDays {
|
|
log.Printf("🔄 Database age (%d days) exceeds limit (%d days), rotating...", ageDays, config.RotationDays)
|
|
rotateDB()
|
|
}
|
|
}
|
|
|
|
func rotateDB() error {
|
|
dbMux.Lock()
|
|
defer dbMux.Unlock()
|
|
|
|
// Close current database
|
|
if db != nil {
|
|
db.Close()
|
|
}
|
|
|
|
// Create new database file
|
|
newFile := filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02_15-04-05")))
|
|
|
|
var err error
|
|
db, err = sql.Open("sqlite3", newFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open new database: %w", err)
|
|
}
|
|
|
|
// Create schema in new database
|
|
schema := `
|
|
CREATE TABLE ping_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ip TEXT NOT NULL,
|
|
sent INTEGER,
|
|
received INTEGER,
|
|
packet_loss REAL,
|
|
avg_rtt INTEGER,
|
|
timestamp DATETIME,
|
|
error TEXT,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE traceroute_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ping_result_id INTEGER,
|
|
method TEXT,
|
|
completed BOOLEAN,
|
|
error TEXT,
|
|
FOREIGN KEY(ping_result_id) REFERENCES ping_results(id)
|
|
);
|
|
|
|
CREATE TABLE traceroute_hops (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
traceroute_id INTEGER,
|
|
ttl INTEGER,
|
|
ip TEXT,
|
|
rtt INTEGER,
|
|
timeout BOOLEAN,
|
|
FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id)
|
|
);
|
|
|
|
CREATE INDEX idx_ping_ip ON ping_results(ip);
|
|
CREATE INDEX idx_ping_timestamp ON ping_results(timestamp);
|
|
CREATE INDEX idx_hop_ip ON traceroute_hops(ip);
|
|
`
|
|
|
|
if _, err := db.Exec(schema); err != nil {
|
|
return fmt.Errorf("failed to create schema in new database: %w", err)
|
|
}
|
|
|
|
// Update stats
|
|
statsMux.Lock()
|
|
stats.CurrentDBFile = filepath.Base(newFile)
|
|
stats.CurrentDBSize = 0
|
|
stats.LastRotation = time.Now()
|
|
statsMux.Unlock()
|
|
|
|
// Cleanup old files
|
|
cleanupOldDBFiles()
|
|
|
|
log.Printf("✅ Rotated to new database: %s", filepath.Base(newFile))
|
|
return nil
|
|
}
|
|
|
|
func cleanupOldDBFiles() {
|
|
files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db"))
|
|
if err != nil || len(files) <= config.KeepFiles {
|
|
return
|
|
}
|
|
|
|
// Sort by name (chronological due to timestamp format)
|
|
sort.Strings(files)
|
|
|
|
// Remove oldest files
|
|
toRemove := len(files) - config.KeepFiles
|
|
for i := 0; i < toRemove; i++ {
|
|
if err := os.Remove(files[i]); err != nil {
|
|
log.Printf("⚠️ Failed to remove old database %s: %v", files[i], err)
|
|
} else {
|
|
log.Printf("🗑️ Removed old database: %s", filepath.Base(files[i]))
|
|
}
|
|
}
|
|
}
|
|
|
|
// HTTP Handlers
|
|
func handleResults(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
http.Error(w, "Failed to read body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
defer r.Body.Close()
|
|
|
|
var results []PingResult
|
|
if err := json.Unmarshal(body, &results); err != nil {
|
|
http.Error(w, "Invalid JSON", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if verbose {
|
|
log.Printf("📥 Received %d ping results", len(results))
|
|
}
|
|
|
|
// Process results
|
|
for _, result := range results {
|
|
if err := storeResult(&result); err != nil {
|
|
log.Printf("⚠️ Failed to store result for %s: %v", result.IP, err)
|
|
continue
|
|
}
|
|
|
|
// Update stats
|
|
statsMux.Lock()
|
|
stats.TotalResults++
|
|
if result.Error != "" {
|
|
stats.FailedPings++
|
|
} else {
|
|
stats.SuccessfulPings++
|
|
}
|
|
stats.LastResultTime = time.Now()
|
|
stats.CurrentDBSize = getFileSize(getCurrentDBFile())
|
|
statsMux.Unlock()
|
|
|
|
// Extract and send hops
|
|
if result.Traceroute != nil {
|
|
go extractAndSendHops(&result)
|
|
}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"status": "ok",
|
|
"received": len(results),
|
|
})
|
|
}
|
|
|
|
func storeResult(result *PingResult) error {
|
|
dbMux.RLock()
|
|
defer dbMux.RUnlock()
|
|
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert ping result
|
|
res, err := tx.Exec(`
|
|
INSERT INTO ping_results (ip, sent, received, packet_loss, avg_rtt, timestamp, error)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
`, result.IP, result.Sent, result.Received, result.PacketLoss, result.AvgRtt, result.Timestamp, result.Error)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pingID, err := res.LastInsertId()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert traceroute if present
|
|
if result.Traceroute != nil {
|
|
traceRes, err := tx.Exec(`
|
|
INSERT INTO traceroute_results (ping_result_id, method, completed, error)
|
|
VALUES (?, ?, ?, ?)
|
|
`, pingID, result.Traceroute.Method, result.Traceroute.Completed, result.Traceroute.Error)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
traceID, err := traceRes.LastInsertId()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert hops
|
|
for _, hop := range result.Traceroute.Hops {
|
|
_, err := tx.Exec(`
|
|
INSERT INTO traceroute_hops (traceroute_id, ttl, ip, rtt, timeout)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
`, traceID, hop.TTL, hop.IP, hop.Rtt, hop.Timeout)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func extractAndSendHops(result *PingResult) {
|
|
if result.Traceroute == nil {
|
|
return
|
|
}
|
|
|
|
var newHops []string
|
|
sentHopsMux.Lock()
|
|
for _, hop := range result.Traceroute.Hops {
|
|
if hop.IP != "" && !hop.Timeout && hop.IP != "*" {
|
|
if !sentHops[hop.IP] {
|
|
newHops = append(newHops, hop.IP)
|
|
sentHops[hop.IP] = true
|
|
|
|
statsMux.Lock()
|
|
stats.HopsDiscovered++
|
|
statsMux.Unlock()
|
|
}
|
|
}
|
|
}
|
|
sentHopsMux.Unlock()
|
|
|
|
if len(newHops) == 0 {
|
|
return
|
|
}
|
|
|
|
// Send to input service
|
|
payload := map[string]interface{}{
|
|
"hops": newHops,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
log.Printf("⚠️ Failed to marshal hops: %v", err)
|
|
return
|
|
}
|
|
|
|
resp, err := http.Post(config.InputServiceURL, "application/json", bytes.NewBuffer(jsonData))
|
|
if err != nil {
|
|
if verbose {
|
|
log.Printf("⚠️ Failed to send hops to input service: %v", err)
|
|
}
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode == http.StatusOK {
|
|
statsMux.Lock()
|
|
stats.HopsSent += int64(len(newHops))
|
|
statsMux.Unlock()
|
|
|
|
if verbose {
|
|
log.Printf("✅ Sent %d new hops to input service", len(newHops))
|
|
}
|
|
} else {
|
|
if verbose {
|
|
log.Printf("⚠️ Input service returned status %d", resp.StatusCode)
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleRotate(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
log.Println("🔄 Manual rotation triggered")
|
|
if err := rotateDB(); err != nil {
|
|
http.Error(w, fmt.Sprintf("Rotation failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"status": "rotated",
|
|
"file": stats.CurrentDBFile,
|
|
})
|
|
}
|
|
|
|
func handleDump(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
dbMux.RLock()
|
|
currentFile := getCurrentDBFile()
|
|
dbMux.RUnlock()
|
|
|
|
// Set headers for file download
|
|
w.Header().Set("Content-Type", "application/x-sqlite3")
|
|
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filepath.Base(currentFile)))
|
|
|
|
// Stream the file
|
|
file, err := os.Open(currentFile)
|
|
if err != nil {
|
|
http.Error(w, "Failed to open database", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
if _, err := io.Copy(w, file); err != nil {
|
|
log.Printf("⚠️ Failed to stream database: %v", err)
|
|
}
|
|
|
|
if verbose {
|
|
log.Printf("📤 Database dump sent: %s", filepath.Base(currentFile))
|
|
}
|
|
}
|
|
|
|
// ServiceInfo represents service metadata for discovery
|
|
type ServiceInfo struct {
|
|
ServiceType string `json:"service_type"`
|
|
Version string `json:"version"`
|
|
Name string `json:"name"`
|
|
InstanceID string `json:"instance_id"`
|
|
Capabilities []string `json:"capabilities"`
|
|
}
|
|
|
|
func handleServiceInfo(w http.ResponseWriter, r *http.Request) {
|
|
hostname, _ := os.Hostname()
|
|
if hostname == "" {
|
|
hostname = "unknown"
|
|
}
|
|
|
|
info := ServiceInfo{
|
|
ServiceType: "output",
|
|
Version: VERSION,
|
|
Name: "output_service",
|
|
InstanceID: hostname,
|
|
Capabilities: []string{"result_storage", "hop_extraction", "database_rotation"},
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(info)
|
|
}
|
|
|
|
func handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
statsMux.RLock()
|
|
defer statsMux.RUnlock()
|
|
|
|
health := map[string]interface{}{
|
|
"status": "healthy",
|
|
"version": VERSION,
|
|
"uptime": time.Since(startTime).String(),
|
|
"stats": stats,
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(health)
|
|
}
|
|
|
|
func handleReady(w http.ResponseWriter, r *http.Request) {
|
|
dbMux.RLock()
|
|
defer dbMux.RUnlock()
|
|
|
|
if db == nil {
|
|
http.Error(w, "Database not ready", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
if err := db.Ping(); err != nil {
|
|
http.Error(w, "Database not responding", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]string{"status": "ready"})
|
|
}
|
|
|
|
func handleMetrics(w http.ResponseWriter, r *http.Request) {
|
|
statsMux.RLock()
|
|
defer statsMux.RUnlock()
|
|
|
|
// Prometheus-style metrics
|
|
metrics := fmt.Sprintf(`# HELP output_service_total_results Total number of results processed
|
|
# TYPE output_service_total_results counter
|
|
output_service_total_results %d
|
|
|
|
# HELP output_service_successful_pings Total successful pings
|
|
# TYPE output_service_successful_pings counter
|
|
output_service_successful_pings %d
|
|
|
|
# HELP output_service_failed_pings Total failed pings
|
|
# TYPE output_service_failed_pings counter
|
|
output_service_failed_pings %d
|
|
|
|
# HELP output_service_hops_discovered Total hops discovered
|
|
# TYPE output_service_hops_discovered counter
|
|
output_service_hops_discovered %d
|
|
|
|
# HELP output_service_hops_sent Total hops sent to input service
|
|
# TYPE output_service_hops_sent counter
|
|
output_service_hops_sent %d
|
|
|
|
# HELP output_service_db_size_bytes Current database size in bytes
|
|
# TYPE output_service_db_size_bytes gauge
|
|
output_service_db_size_bytes %d
|
|
`,
|
|
stats.TotalResults,
|
|
stats.SuccessfulPings,
|
|
stats.FailedPings,
|
|
stats.HopsDiscovered,
|
|
stats.HopsSent,
|
|
stats.CurrentDBSize,
|
|
)
|
|
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
w.Write([]byte(metrics))
|
|
}
|
|
|
|
func handleStats(w http.ResponseWriter, r *http.Request) {
|
|
statsMux.RLock()
|
|
defer statsMux.RUnlock()
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
func handleRecent(w http.ResponseWriter, r *http.Request) {
|
|
// Parse query parameters
|
|
limitStr := r.URL.Query().Get("limit")
|
|
limit := 100
|
|
if limitStr != "" {
|
|
if l, err := fmt.Sscanf(limitStr, "%d", &limit); err == nil && l == 1 {
|
|
if limit > 1000 {
|
|
limit = 1000
|
|
}
|
|
}
|
|
}
|
|
|
|
ipFilter := r.URL.Query().Get("ip")
|
|
|
|
dbMux.RLock()
|
|
defer dbMux.RUnlock()
|
|
|
|
query := `
|
|
SELECT id, ip, sent, received, packet_loss, avg_rtt, timestamp, error
|
|
FROM ping_results
|
|
`
|
|
args := []interface{}{}
|
|
|
|
if ipFilter != "" {
|
|
query += " WHERE ip = ?"
|
|
args = append(args, ipFilter)
|
|
}
|
|
|
|
query += " ORDER BY timestamp DESC LIMIT ?"
|
|
args = append(args, limit)
|
|
|
|
rows, err := db.Query(query, args...)
|
|
if err != nil {
|
|
http.Error(w, "Query failed", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var results []map[string]interface{}
|
|
for rows.Next() {
|
|
var id int
|
|
var ip, errorMsg string
|
|
var sent, received int
|
|
var packetLoss float64
|
|
var avgRtt int64
|
|
var timestamp time.Time
|
|
|
|
if err := rows.Scan(&id, &ip, &sent, &received, &packetLoss, &avgRtt, ×tamp, &errorMsg); err != nil {
|
|
continue
|
|
}
|
|
|
|
result := map[string]interface{}{
|
|
"id": id,
|
|
"ip": ip,
|
|
"sent": sent,
|
|
"received": received,
|
|
"packet_loss": packetLoss,
|
|
"avg_rtt": avgRtt,
|
|
"timestamp": timestamp,
|
|
}
|
|
|
|
if errorMsg != "" {
|
|
result["error"] = errorMsg
|
|
}
|
|
|
|
results = append(results, result)
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(results)
|
|
}
|