package main import ( "bytes" "context" "database/sql" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "os/signal" "path/filepath" "sort" "sync" "syscall" "time" _ "github.com/mattn/go-sqlite3" ) const VERSION = "0.0.1" type Config struct { Port int `json:"port"` InputServiceURL string `json:"input_service_url"` DBDir string `json:"db_dir"` MaxDBSizeMB int64 `json:"max_db_size_mb"` RotationDays int `json:"rotation_days"` KeepFiles int `json:"keep_files"` HealthCheckPort int `json:"health_check_port"` } // Data structures matching ping_service output type PingResult struct { IP string `json:"ip"` Sent int `json:"sent"` Received int `json:"received"` PacketLoss float64 `json:"packet_loss"` AvgRtt int64 `json:"avg_rtt"` // nanoseconds Timestamp time.Time `json:"timestamp"` Error string `json:"error,omitempty"` Traceroute *TracerouteResult `json:"traceroute,omitempty"` } type TracerouteResult struct { Method string `json:"method"` Hops []TracerouteHop `json:"hops"` Completed bool `json:"completed"` Error string `json:"error,omitempty"` } type TracerouteHop struct { TTL int `json:"ttl"` IP string `json:"ip"` Rtt int64 `json:"rtt,omitempty"` // nanoseconds Timeout bool `json:"timeout,omitempty"` } type Stats struct { TotalResults int64 `json:"total_results"` SuccessfulPings int64 `json:"successful_pings"` FailedPings int64 `json:"failed_pings"` HopsDiscovered int64 `json:"hops_discovered"` HopsSent int64 `json:"hops_sent"` LastResultTime time.Time `json:"last_result_time"` CurrentDBFile string `json:"current_db_file"` CurrentDBSize int64 `json:"current_db_size"` LastRotation time.Time `json:"last_rotation"` } var ( config Config db *sql.DB dbMux sync.RWMutex stats Stats statsMux sync.RWMutex sentHops = make(map[string]time.Time) // Track sent hops with timestamp for eviction sentHopsMux sync.RWMutex verbose bool startTime time.Time sentHopsTTL = 24 * time.Hour // Time-to-live for hop deduplication cache ) func main() { // CLI flags port := flag.Int("port", 8081, "Port to listen on") healthPort := flag.Int("health-port", 8091, "Health check port") inputURL := flag.String("input-url", "http://localhost:8080/hops", "Input service URL for hop submission") dbDir := flag.String("db-dir", "./output_data", "Directory to store database files") maxSize := flag.Int64("max-size-mb", 100, "Maximum database size in MB before rotation") rotationDays := flag.Int("rotation-days", 7, "Rotate database after this many days") keepFiles := flag.Int("keep-files", 5, "Number of database files to keep") verboseFlag := flag.Bool("v", false, "Enable verbose logging") flag.BoolVar(verboseFlag, "verbose", false, "Enable verbose logging") versionFlag := flag.Bool("version", false, "Show version") help := flag.Bool("help", false, "Show help message") flag.Parse() if *versionFlag { fmt.Printf("output-service version %s\n", VERSION) os.Exit(0) } if *help { fmt.Println("Output Service - Receive and store ping/traceroute results") fmt.Printf("Version: %s\n\n", VERSION) fmt.Println("Flags:") flag.PrintDefaults() os.Exit(0) } verbose = *verboseFlag startTime = time.Now() config = Config{ Port: *port, InputServiceURL: *inputURL, DBDir: *dbDir, MaxDBSizeMB: *maxSize, RotationDays: *rotationDays, KeepFiles: *keepFiles, HealthCheckPort: *healthPort, } // Create database directory if it doesn't exist if err := os.MkdirAll(config.DBDir, 0755); err != nil { log.Fatalf("Failed to create database directory: %v", err) } // Initialize database if err := initDB(); err != nil { log.Fatalf("Failed to initialize database: %v", err) } defer closeDB() // Start background rotation checker go rotationChecker() // Setup HTTP handlers mux := http.NewServeMux() mux.HandleFunc("/results", handleResults) mux.HandleFunc("/rotate", handleRotate) mux.HandleFunc("/dump", handleDump) // Health check handlers healthMux := http.NewServeMux() healthMux.HandleFunc("/health", handleHealth) healthMux.HandleFunc("/service-info", handleServiceInfo) healthMux.HandleFunc("/ready", handleReady) healthMux.HandleFunc("/metrics", handleMetrics) healthMux.HandleFunc("/stats", handleStats) healthMux.HandleFunc("/recent", handleRecent) // Create servers server := &http.Server{ Addr: fmt.Sprintf(":%d", config.Port), Handler: mux, } healthServer := &http.Server{ Addr: fmt.Sprintf(":%d", config.HealthCheckPort), Handler: healthMux, } // Graceful shutdown handling sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) // Start cleanup goroutine for sentHops map to prevent unbounded growth go cleanupSentHops() go func() { log.Printf("๐Ÿš€ Output Service v%s starting...", VERSION) log.Printf("๐Ÿ“ฅ Listening for results on http://localhost:%d/results", config.Port) log.Printf("๐Ÿฅ Health checks on http://localhost:%d", config.HealthCheckPort) log.Printf("๐Ÿ’พ Database directory: %s", config.DBDir) log.Printf("๐Ÿ”„ Rotation: %d days OR %d MB, keeping %d files", config.RotationDays, config.MaxDBSizeMB, config.KeepFiles) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } }() go func() { if err := healthServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("Health server error: %v", err) } }() // Wait for shutdown signal <-sigChan log.Println("\n๐Ÿ›‘ Shutting down gracefully...") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { log.Printf("Server shutdown error: %v", err) } if err := healthServer.Shutdown(ctx); err != nil { log.Printf("Health server shutdown error: %v", err) } log.Println("โœ… Shutdown complete") } func initDB() error { dbMux.Lock() defer dbMux.Unlock() // Find or create current database file dbFile := getCurrentDBFile() var err error db, err = sql.Open("sqlite3", dbFile) if err != nil { return fmt.Errorf("failed to open database: %w", err) } // Create tables schema := ` CREATE TABLE IF NOT EXISTS ping_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL, sent INTEGER, received INTEGER, packet_loss REAL, avg_rtt INTEGER, timestamp DATETIME, error TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS traceroute_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, ping_result_id INTEGER, method TEXT, completed BOOLEAN, error TEXT, FOREIGN KEY(ping_result_id) REFERENCES ping_results(id) ); CREATE TABLE IF NOT EXISTS traceroute_hops ( id INTEGER PRIMARY KEY AUTOINCREMENT, traceroute_id INTEGER, ttl INTEGER, ip TEXT, rtt INTEGER, timeout BOOLEAN, FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id) ); CREATE INDEX IF NOT EXISTS idx_ping_ip ON ping_results(ip); CREATE INDEX IF NOT EXISTS idx_ping_timestamp ON ping_results(timestamp); CREATE INDEX IF NOT EXISTS idx_hop_ip ON traceroute_hops(ip); ` if _, err := db.Exec(schema); err != nil { return fmt.Errorf("failed to create schema: %w", err) } // Update stats statsMux.Lock() stats.CurrentDBFile = filepath.Base(dbFile) stats.CurrentDBSize = getFileSize(dbFile) statsMux.Unlock() log.Printf("๐Ÿ“‚ Database initialized: %s", filepath.Base(dbFile)) return nil } func closeDB() { dbMux.Lock() defer dbMux.Unlock() if db != nil { db.Close() } } func getCurrentDBFile() string { // Check for most recent database file files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db")) if err != nil || len(files) == 0 { // Create new file with current date return filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02"))) } // Sort and return most recent sort.Strings(files) return files[len(files)-1] } func getFileSize(path string) int64 { info, err := os.Stat(path) if err != nil { return 0 } return info.Size() } func rotationChecker() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for range ticker.C { checkAndRotate() } } func checkAndRotate() { dbMux.RLock() currentFile := getCurrentDBFile() dbMux.RUnlock() // Check size size := getFileSize(currentFile) sizeMB := size / (1024 * 1024) // Check age fileInfo, err := os.Stat(currentFile) if err != nil { return } age := time.Since(fileInfo.ModTime()) ageDays := int(age.Hours() / 24) if sizeMB >= config.MaxDBSizeMB { log.Printf("๐Ÿ”„ Database size (%d MB) exceeds limit (%d MB), rotating...", sizeMB, config.MaxDBSizeMB) rotateDB() } else if ageDays >= config.RotationDays { log.Printf("๐Ÿ”„ Database age (%d days) exceeds limit (%d days), rotating...", ageDays, config.RotationDays) rotateDB() } } func rotateDB() error { dbMux.Lock() defer dbMux.Unlock() // Close current database if db != nil { db.Close() } // Create new database file newFile := filepath.Join(config.DBDir, fmt.Sprintf("results_%s.db", time.Now().Format("2006-01-02_15-04-05"))) var err error db, err = sql.Open("sqlite3", newFile) if err != nil { return fmt.Errorf("failed to open new database: %w", err) } // Create schema in new database schema := ` CREATE TABLE ping_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL, sent INTEGER, received INTEGER, packet_loss REAL, avg_rtt INTEGER, timestamp DATETIME, error TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE traceroute_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, ping_result_id INTEGER, method TEXT, completed BOOLEAN, error TEXT, FOREIGN KEY(ping_result_id) REFERENCES ping_results(id) ); CREATE TABLE traceroute_hops ( id INTEGER PRIMARY KEY AUTOINCREMENT, traceroute_id INTEGER, ttl INTEGER, ip TEXT, rtt INTEGER, timeout BOOLEAN, FOREIGN KEY(traceroute_id) REFERENCES traceroute_results(id) ); CREATE INDEX idx_ping_ip ON ping_results(ip); CREATE INDEX idx_ping_timestamp ON ping_results(timestamp); CREATE INDEX idx_hop_ip ON traceroute_hops(ip); ` if _, err := db.Exec(schema); err != nil { return fmt.Errorf("failed to create schema in new database: %w", err) } // Update stats statsMux.Lock() stats.CurrentDBFile = filepath.Base(newFile) stats.CurrentDBSize = 0 stats.LastRotation = time.Now() statsMux.Unlock() // Cleanup old files cleanupOldDBFiles() log.Printf("โœ… Rotated to new database: %s", filepath.Base(newFile)) return nil } func cleanupOldDBFiles() { files, err := filepath.Glob(filepath.Join(config.DBDir, "results_*.db")) if err != nil || len(files) <= config.KeepFiles { return } // Sort by name (chronological due to timestamp format) sort.Strings(files) // Remove oldest files toRemove := len(files) - config.KeepFiles for i := 0; i < toRemove; i++ { if err := os.Remove(files[i]); err != nil { log.Printf("โš ๏ธ Failed to remove old database %s: %v", files[i], err) } else { log.Printf("๐Ÿ—‘๏ธ Removed old database: %s", filepath.Base(files[i])) } } } // HTTP Handlers func handleResults(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "Failed to read body", http.StatusBadRequest) return } defer r.Body.Close() var results []PingResult if err := json.Unmarshal(body, &results); err != nil { http.Error(w, "Invalid JSON", http.StatusBadRequest) return } if verbose { log.Printf("๐Ÿ“ฅ Received %d ping results", len(results)) } // Process results for _, result := range results { if err := storeResult(&result); err != nil { log.Printf("โš ๏ธ Failed to store result for %s: %v", result.IP, err) continue } // Update stats statsMux.Lock() stats.TotalResults++ if result.Error != "" { stats.FailedPings++ } else { stats.SuccessfulPings++ } stats.LastResultTime = time.Now() stats.CurrentDBSize = getFileSize(getCurrentDBFile()) statsMux.Unlock() // Extract and send hops if result.Traceroute != nil { go extractAndSendHops(&result) } } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(map[string]interface{}{ "status": "ok", "received": len(results), }) } func storeResult(result *PingResult) error { dbMux.RLock() defer dbMux.RUnlock() tx, err := db.Begin() if err != nil { return err } defer tx.Rollback() // Insert ping result res, err := tx.Exec(` INSERT INTO ping_results (ip, sent, received, packet_loss, avg_rtt, timestamp, error) VALUES (?, ?, ?, ?, ?, ?, ?) `, result.IP, result.Sent, result.Received, result.PacketLoss, result.AvgRtt, result.Timestamp, result.Error) if err != nil { return err } pingID, err := res.LastInsertId() if err != nil { return err } // Insert traceroute if present if result.Traceroute != nil { traceRes, err := tx.Exec(` INSERT INTO traceroute_results (ping_result_id, method, completed, error) VALUES (?, ?, ?, ?) `, pingID, result.Traceroute.Method, result.Traceroute.Completed, result.Traceroute.Error) if err != nil { return err } traceID, err := traceRes.LastInsertId() if err != nil { return err } // Insert hops for _, hop := range result.Traceroute.Hops { _, err := tx.Exec(` INSERT INTO traceroute_hops (traceroute_id, ttl, ip, rtt, timeout) VALUES (?, ?, ?, ?, ?) `, traceID, hop.TTL, hop.IP, hop.Rtt, hop.Timeout) if err != nil { return err } } } return tx.Commit() } func extractAndSendHops(result *PingResult) { if result.Traceroute == nil { return } var newHops []string sentHopsMux.Lock() now := time.Now() for _, hop := range result.Traceroute.Hops { if hop.IP != "" && !hop.Timeout && hop.IP != "*" { // Check if we've seen this hop recently (within TTL) lastSent, exists := sentHops[hop.IP] if !exists || now.Sub(lastSent) > sentHopsTTL { newHops = append(newHops, hop.IP) sentHops[hop.IP] = now statsMux.Lock() stats.HopsDiscovered++ statsMux.Unlock() } } } sentHopsMux.Unlock() if len(newHops) == 0 { return } // Send to input service payload := map[string]interface{}{ "hops": newHops, } jsonData, err := json.Marshal(payload) if err != nil { log.Printf("โš ๏ธ Failed to marshal hops: %v", err) return } resp, err := http.Post(config.InputServiceURL, "application/json", bytes.NewBuffer(jsonData)) if err != nil { if verbose { log.Printf("โš ๏ธ Failed to send hops to input service: %v", err) } return } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { statsMux.Lock() stats.HopsSent += int64(len(newHops)) statsMux.Unlock() if verbose { log.Printf("โœ… Sent %d new hops to input service", len(newHops)) } } else { if verbose { log.Printf("โš ๏ธ Input service returned status %d", resp.StatusCode) } } } // cleanupSentHops periodically removes old entries from sentHops map to prevent unbounded growth func cleanupSentHops() { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for range ticker.C { sentHopsMux.Lock() now := time.Now() removed := 0 for ip, timestamp := range sentHops { if now.Sub(timestamp) > sentHopsTTL { delete(sentHops, ip) removed++ } } if verbose && removed > 0 { log.Printf("๐Ÿงน Cleaned up %d expired hop entries (total: %d)", removed, len(sentHops)) } sentHopsMux.Unlock() } } func handleRotate(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } log.Println("๐Ÿ”„ Manual rotation triggered") if err := rotateDB(); err != nil { http.Error(w, fmt.Sprintf("Rotation failed: %v", err), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{ "status": "rotated", "file": stats.CurrentDBFile, }) } func handleDump(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } dbMux.RLock() currentFile := getCurrentDBFile() dbMux.RUnlock() // Set headers for file download w.Header().Set("Content-Type", "application/x-sqlite3") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", filepath.Base(currentFile))) // Stream the file file, err := os.Open(currentFile) if err != nil { http.Error(w, "Failed to open database", http.StatusInternalServerError) return } defer file.Close() if _, err := io.Copy(w, file); err != nil { log.Printf("โš ๏ธ Failed to stream database: %v", err) } if verbose { log.Printf("๐Ÿ“ค Database dump sent: %s", filepath.Base(currentFile)) } } // ServiceInfo represents service metadata for discovery type ServiceInfo struct { ServiceType string `json:"service_type"` Version string `json:"version"` Name string `json:"name"` InstanceID string `json:"instance_id"` Capabilities []string `json:"capabilities"` } func handleServiceInfo(w http.ResponseWriter, r *http.Request) { hostname, _ := os.Hostname() if hostname == "" { hostname = "unknown" } info := ServiceInfo{ ServiceType: "output", Version: VERSION, Name: "output_service", InstanceID: hostname, Capabilities: []string{"result_storage", "hop_extraction", "database_rotation"}, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(info) } func handleHealth(w http.ResponseWriter, r *http.Request) { statsMux.RLock() defer statsMux.RUnlock() health := map[string]interface{}{ "status": "healthy", "version": VERSION, "uptime": time.Since(startTime).String(), "stats": stats, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(health) } func handleReady(w http.ResponseWriter, r *http.Request) { dbMux.RLock() defer dbMux.RUnlock() if db == nil { http.Error(w, "Database not ready", http.StatusServiceUnavailable) return } if err := db.Ping(); err != nil { http.Error(w, "Database not responding", http.StatusServiceUnavailable) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "ready"}) } func handleMetrics(w http.ResponseWriter, r *http.Request) { statsMux.RLock() defer statsMux.RUnlock() // Prometheus-style metrics metrics := fmt.Sprintf(`# HELP output_service_total_results Total number of results processed # TYPE output_service_total_results counter output_service_total_results %d # HELP output_service_successful_pings Total successful pings # TYPE output_service_successful_pings counter output_service_successful_pings %d # HELP output_service_failed_pings Total failed pings # TYPE output_service_failed_pings counter output_service_failed_pings %d # HELP output_service_hops_discovered Total hops discovered # TYPE output_service_hops_discovered counter output_service_hops_discovered %d # HELP output_service_hops_sent Total hops sent to input service # TYPE output_service_hops_sent counter output_service_hops_sent %d # HELP output_service_db_size_bytes Current database size in bytes # TYPE output_service_db_size_bytes gauge output_service_db_size_bytes %d `, stats.TotalResults, stats.SuccessfulPings, stats.FailedPings, stats.HopsDiscovered, stats.HopsSent, stats.CurrentDBSize, ) w.Header().Set("Content-Type", "text/plain") w.Write([]byte(metrics)) } func handleStats(w http.ResponseWriter, r *http.Request) { statsMux.RLock() defer statsMux.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(stats) } func handleRecent(w http.ResponseWriter, r *http.Request) { // Parse query parameters limitStr := r.URL.Query().Get("limit") limit := 100 if limitStr != "" { if l, err := fmt.Sscanf(limitStr, "%d", &limit); err == nil && l == 1 { if limit > 1000 { limit = 1000 } } } ipFilter := r.URL.Query().Get("ip") dbMux.RLock() defer dbMux.RUnlock() query := ` SELECT id, ip, sent, received, packet_loss, avg_rtt, timestamp, error FROM ping_results ` args := []interface{}{} if ipFilter != "" { query += " WHERE ip = ?" args = append(args, ipFilter) } query += " ORDER BY timestamp DESC LIMIT ?" args = append(args, limit) rows, err := db.Query(query, args...) if err != nil { http.Error(w, "Query failed", http.StatusInternalServerError) return } defer rows.Close() var results []map[string]interface{} for rows.Next() { var id int var ip, errorMsg string var sent, received int var packetLoss float64 var avgRtt int64 var timestamp time.Time if err := rows.Scan(&id, &ip, &sent, &received, &packetLoss, &avgRtt, ×tamp, &errorMsg); err != nil { continue } result := map[string]interface{}{ "id": id, "ip": ip, "sent": sent, "received": received, "packet_loss": packetLoss, "avg_rtt": avgRtt, "timestamp": timestamp, } if errorMsg != "" { result["error"] = errorMsg } results = append(results, result) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(results) }