Files
ping_service/ping_service.go
2025-12-31 21:06:35 +02:00

600 lines
15 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/go-ping/ping"
"gopkg.in/yaml.v3"
)
const VERSION = "0.0.4"
type Config struct {
InputFile string `yaml:"input_file"`
OutputFile string `yaml:"output_file"`
IntervalSeconds int `yaml:"interval_seconds"`
CooldownMinutes int `yaml:"cooldown_minutes"`
EnableTraceroute bool `yaml:"enable_traceroute"`
TracerouteMaxHops int `yaml:"traceroute_max_hops"`
HealthCheckPort int `yaml:"health_check_port"`
}
type PingResult struct {
IP string `json:"ip"`
Sent int `json:"sent"`
Received int `json:"received"`
PacketLoss float64 `json:"packet_loss"`
AvgRtt time.Duration `json:"avg_rtt"`
Timestamp time.Time `json:"timestamp"`
Error string `json:"error,omitempty"`
Traceroute *TracerouteResult `json:"traceroute,omitempty"`
}
type TracerouteResult struct {
Method string `json:"method"`
Hops []TracerouteHop `json:"hops"`
Completed bool `json:"completed"`
Error string `json:"error,omitempty"`
}
type TracerouteHop struct {
TTL int `json:"ttl"`
IP string `json:"ip"`
Rtt time.Duration `json:"rtt,omitempty"`
Timeout bool `json:"timeout,omitempty"`
}
type HealthStatus struct {
Status string `json:"status"`
Version string `json:"version"`
Uptime string `json:"uptime"`
LastRun time.Time `json:"last_run"`
TotalPings int64 `json:"total_pings"`
SuccessfulPings int64 `json:"successful_pings"`
FailedPings int64 `json:"failed_pings"`
}
var (
cooldownCache = make(map[string]time.Time)
cacheMux sync.Mutex
verbose bool
startTime time.Time
health HealthStatus
healthMux sync.RWMutex
)
func main() {
// CLI flags
configPath := flag.String("config", "config.yaml", "Path to config file")
verboseFlag := flag.Bool("v", false, "Enable verbose logging")
flag.BoolVar(verboseFlag, "verbose", false, "Enable verbose logging")
versionFlag := flag.Bool("version", false, "Show version")
help := flag.Bool("help", false, "Show help message")
flag.Parse()
if *versionFlag {
fmt.Printf("ping-service version %s\n", VERSION)
os.Exit(0)
}
if *help {
fmt.Println("Ping Service - Monitor network endpoints via ping and traceroute")
fmt.Printf("Version: %s\n\n", VERSION)
fmt.Println("Usage:")
flag.PrintDefaults()
fmt.Println("\nConfig file format (YAML):")
fmt.Println(" input_file: Path/URL to file containing IPs (one per line)")
fmt.Println(" output_file: Path/URL/socket for JSON results")
fmt.Println(" interval_seconds: How often to check for new IPs")
fmt.Println(" cooldown_minutes: Minimum time between pings for same IP")
fmt.Println(" enable_traceroute: Enable traceroute after successful ping")
fmt.Println(" traceroute_max_hops: Maximum TTL for traceroute (default: 30)")
fmt.Println(" health_check_port: HTTP port for health checks (default: 8090)")
os.Exit(0)
}
verbose = *verboseFlag
startTime = time.Now()
// Initialize health status
health = HealthStatus{
Status: "starting",
Version: VERSION,
}
config := loadConfig(*configPath)
if config.CooldownMinutes == 0 {
config.CooldownMinutes = 10
}
if config.TracerouteMaxHops == 0 {
config.TracerouteMaxHops = 30
}
if config.HealthCheckPort == 0 {
config.HealthCheckPort = 8090
}
// Start health check server
go startHealthCheckServer(config.HealthCheckPort)
// FIX: Start the cache cleanup goroutine
go cacheJanitor(config.CooldownMinutes)
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Printf("Received signal %v, shutting down gracefully...", sig)
updateHealth("shutting_down")
cancel()
}()
ticker := time.NewTicker(time.Duration(config.IntervalSeconds) * time.Second)
defer ticker.Stop()
log.Printf("App started. Version: %s", VERSION)
log.Printf("Cooldown set to %d mins. Polling every %ds...", config.CooldownMinutes, config.IntervalSeconds)
if config.EnableTraceroute {
log.Printf("Traceroute enabled (max %d hops)", config.TracerouteMaxHops)
}
log.Printf("Health check available at http://localhost:%d/health", config.HealthCheckPort)
updateHealth("running")
// Main loop
for {
select {
case <-ctx.Done():
log.Println("Shutdown complete")
return
case <-ticker.C:
process(config)
}
}
}
// Function to prune the cache periodically
func cacheJanitor(cooldownMinutes int) {
// Run cleanup every hour (or more frequently if memory is tight)
ticker := time.NewTicker(1 * time.Hour)
for range ticker.C {
cacheMux.Lock()
count := 0
threshold := time.Now().Add(-time.Duration(cooldownMinutes) * time.Minute)
for ip, lastPing := range cooldownCache {
if lastPing.Before(threshold) {
delete(cooldownCache, ip)
count++
}
}
if verbose && count > 0 {
log.Printf("Janitor: Cleaned up %d expired entries from cooldown cache", count)
}
cacheMux.Unlock()
}
}
// ... [rest of the logic remains the same: process, readSource, runPing, etc.]
func startHealthCheckServer(port int) {
http.HandleFunc("/health", healthCheckHandler)
http.HandleFunc("/ready", readinessHandler)
http.HandleFunc("/metrics", metricsHandler)
addr := fmt.Sprintf(":%d", port)
log.Printf("Starting health check server on %s", addr)
server := &http.Server{
Addr: addr,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Printf("Health check server error: %v", err)
}
}
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
healthMux.RLock()
defer healthMux.RUnlock()
health.Uptime = time.Since(startTime).String()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(health)
}
func readinessHandler(w http.ResponseWriter, r *http.Request) {
healthMux.RLock()
defer healthMux.RUnlock()
if health.Status == "running" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"ready": true}`))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"ready": false}`))
}
}
func metricsHandler(w http.ResponseWriter, r *http.Request) {
healthMux.RLock()
defer healthMux.RUnlock()
// Prometheus-style metrics
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "# HELP ping_service_total_pings Total number of pings\n")
fmt.Fprintf(w, "# TYPE ping_service_total_pings counter\n")
fmt.Fprintf(w, "ping_service_total_pings %d\n", health.TotalPings)
fmt.Fprintf(w, "# HELP ping_service_successful_pings Successful pings\n")
fmt.Fprintf(w, "# TYPE ping_service_successful_pings counter\n")
fmt.Fprintf(w, "ping_service_successful_pings %d\n", health.SuccessfulPings)
fmt.Fprintf(w, "# HELP ping_service_failed_pings Failed pings\n")
fmt.Fprintf(w, "# TYPE ping_service_failed_pings counter\n")
fmt.Fprintf(w, "ping_service_failed_pings %d\n", health.FailedPings)
fmt.Fprintf(w, "# HELP ping_service_uptime_seconds Uptime in seconds\n")
fmt.Fprintf(w, "# TYPE ping_service_uptime_seconds gauge\n")
fmt.Fprintf(w, "ping_service_uptime_seconds %.0f\n", time.Since(startTime).Seconds())
}
func updateHealth(status string) {
healthMux.Lock()
defer healthMux.Unlock()
health.Status = status
}
func updateStats(lastRun time.Time, successful, failed int64) {
healthMux.Lock()
defer healthMux.Unlock()
health.LastRun = lastRun
health.TotalPings += (successful + failed)
health.SuccessfulPings += successful
health.FailedPings += failed
}
func loadConfig(path string) *Config {
f, err := os.ReadFile(path)
if err != nil {
log.Fatalf("Error reading config: %v", err)
}
var cfg Config
err = yaml.Unmarshal(f, &cfg)
if err != nil {
log.Fatalf("Error parsing config: %v", err)
}
return &cfg
}
func process(cfg *Config) {
processStart := time.Now()
// 1. Read IPs
data, err := readSource(cfg.InputFile)
if err != nil {
log.Printf("Input Error: %v", err)
return
}
rawIps := strings.Fields(string(data))
if verbose {
log.Printf("Read %d IPs from %s", len(rawIps), cfg.InputFile)
}
// Filter IPs based on cooldown
var ipsToPing []string
cacheMux.Lock()
for _, ip := range rawIps {
if last, ok := cooldownCache[ip]; ok {
if time.Since(last) < time.Duration(cfg.CooldownMinutes)*time.Minute {
if verbose {
log.Printf("Skipping %s (cooldown: %v remaining)", ip, time.Duration(cfg.CooldownMinutes)*time.Minute-time.Since(last))
}
continue
}
}
ipsToPing = append(ipsToPing, ip)
cooldownCache[ip] = time.Now()
}
cacheMux.Unlock()
if len(ipsToPing) == 0 {
if verbose {
log.Println("No IPs to ping (all in cooldown)")
}
return
}
if verbose {
log.Printf("Pinging %d IPs: %v", len(ipsToPing), ipsToPing)
}
// 2. Perform Pings
var wg sync.WaitGroup
results := make([]PingResult, len(ipsToPing))
var successful int64
var failed int64
for i, ip := range ipsToPing {
wg.Add(1)
go func(idx int, targetIP string) {
defer wg.Done()
results[idx] = runPing(targetIP)
if results[idx].Error == "" && results[idx].Received > 0 {
atomic.AddInt64(&successful, 1)
} else {
atomic.AddInt64(&failed, 1)
}
// If ping successful and traceroute enabled, do traceroute
if cfg.EnableTraceroute && results[idx].Error == "" && results[idx].Received > 0 {
if verbose {
log.Printf("Running traceroute to %s...", targetIP)
}
results[idx].Traceroute = runTraceroute(targetIP, cfg.TracerouteMaxHops)
}
if verbose {
if results[idx].Error != "" {
log.Printf("Pinged %s: ERROR - %s", results[idx].IP, results[idx].Error)
} else {
log.Printf("Pinged %s: %d/%d packets, %.2f%% loss, avg RTT: %v",
results[idx].IP, results[idx].Received, results[idx].Sent,
results[idx].PacketLoss, results[idx].AvgRtt)
if results[idx].Traceroute != nil {
log.Printf(" Traceroute: %d hops via %s", len(results[idx].Traceroute.Hops), results[idx].Traceroute.Method)
}
}
}
}(i, ip)
}
wg.Wait()
// 3. Write Output
updateStats(time.Now(), successful, failed)
outputData, _ := json.MarshalIndent(results, "", " ")
_ = writeDestination(cfg.OutputFile, outputData)
if err != nil {
log.Printf("Output Error: %v", err)
} else if verbose {
log.Printf("Cycle complete. Took %v", time.Since(processStart))
}
}
func handleSocket(path string, data []byte, mode string) ([]byte, error) {
if _, err := os.Stat(path); err == nil {
conn, err := net.DialTimeout("unix", path, 2*time.Second)
if err != nil {
os.Remove(path)
return handleSocket(path, data, mode)
}
defer conn.Close()
if mode == "read" {
return io.ReadAll(conn)
} else {
_, err = conn.Write(data)
return nil, err
}
} else {
l, err := net.Listen("unix", path)
if err != nil {
return nil, err
}
defer l.Close()
defer os.Remove(path)
l.(*net.UnixListener).SetDeadline(time.Now().Add(10 * time.Second))
conn, err := l.Accept()
if err != nil {
return nil, fmt.Errorf("timeout waiting for socket connection: %v", err)
}
defer conn.Close()
if mode == "read" {
return io.ReadAll(conn)
} else {
_, err = conn.Write(data)
return nil, err
}
}
}
func readSource(src string) ([]byte, error) {
if strings.HasPrefix(src, "http") {
resp, err := http.Get(src)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
if strings.HasSuffix(src, ".sock") {
return handleSocket(src, nil, "read")
}
return os.ReadFile(src)
}
func writeDestination(dest string, data []byte) error {
if strings.HasPrefix(dest, "http") {
resp, err := http.Post(dest, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
if strings.HasSuffix(dest, ".sock") {
_, err := handleSocket(dest, data, "write")
return err
}
return os.WriteFile(dest, data, 0644)
}
func runPing(ip string) PingResult {
pinger, err := ping.NewPinger(ip)
res := PingResult{IP: ip, Timestamp: time.Now()}
if err != nil {
res.Error = err.Error()
return res
}
pinger.Count = 3
pinger.Timeout = time.Second * 5
err = pinger.Run()
if err != nil {
res.Error = err.Error()
return res
}
stats := pinger.Statistics()
res.Sent = stats.PacketsSent
res.Received = stats.PacketsRecv
res.PacketLoss = stats.PacketLoss
res.AvgRtt = stats.AvgRtt
return res
}
func runTraceroute(ip string, maxHops int) *TracerouteResult {
result := &TracerouteResult{
Method: "icmp",
Hops: []TracerouteHop{},
}
if tr := tryICMPTraceroute(ip, maxHops); tr != nil {
return tr
}
if verbose {
log.Printf("ICMP traceroute failed for %s, trying TCP...", ip)
}
for _, port := range []int{80, 443, 22} {
if tr := tryTCPTraceroute(ip, port, maxHops); tr != nil {
tr.Method = fmt.Sprintf("tcp/%d", port)
return tr
}
}
result.Error = "traceroute failed"
return result
}
func tryICMPTraceroute(ip string, maxHops int) *TracerouteResult {
var cmd *exec.Cmd
switch {
case fileExists("/usr/bin/traceroute"):
cmd = exec.Command("traceroute", "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip)
case fileExists("/usr/sbin/traceroute"):
cmd = exec.Command("traceroute", "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip)
case fileExists("/bin/traceroute"):
cmd = exec.Command("traceroute", "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip)
default:
cmd = exec.Command("traceroute", "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip)
}
output, err := cmd.CombinedOutput()
if err != nil {
return nil
}
return parseTracerouteOutput(string(output), "icmp")
}
func tryTCPTraceroute(ip string, port int, maxHops int) *TracerouteResult {
var cmd *exec.Cmd
if fileExists("/usr/bin/tcptraceroute") || fileExists("/usr/sbin/tcptraceroute") {
cmd = exec.Command("tcptraceroute", "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip, strconv.Itoa(port))
} else {
cmd = exec.Command("traceroute", "-T", "-p", strconv.Itoa(port), "-m", strconv.Itoa(maxHops), "-n", "-q", "1", ip)
}
output, err := cmd.CombinedOutput()
if err != nil {
return nil
}
return parseTracerouteOutput(string(output), fmt.Sprintf("tcp/%d", port))
}
func parseTracerouteOutput(output string, method string) *TracerouteResult {
result := &TracerouteResult{
Method: method,
Hops: []TracerouteHop{},
Completed: false,
}
lines := strings.Split(output, "\n")
hopRegex := regexp.MustCompile(`^\s*(\d+)\s+([0-9.]+)\s+([0-9.]+)\s*ms`)
timeoutRegex := regexp.MustCompile(`^\s*(\d+)\s+\*`)
for _, line := range lines {
if match := hopRegex.FindStringSubmatch(line); match != nil {
ttl, _ := strconv.Atoi(match[1])
hopIP := match[2]
rttMs, _ := strconv.ParseFloat(match[3], 64)
rtt := time.Duration(rttMs * float64(time.Millisecond))
result.Hops = append(result.Hops, TracerouteHop{
TTL: ttl,
IP: hopIP,
Rtt: rtt,
})
} else if match := timeoutRegex.FindStringSubmatch(line); match != nil {
ttl, _ := strconv.Atoi(match[1])
result.Hops = append(result.Hops, TracerouteHop{
TTL: ttl,
Timeout: true,
})
}
}
if len(result.Hops) > 0 {
result.Completed = true
}
return result
}
func fileExists(path string) bool {
_, err := os.Stat(path)
return err == nil
}