Moved go routines control to use sync/atomic library.
This commit is contained in:
15
go.mod
Normal file
15
go.mod
Normal file
@@ -0,0 +1,15 @@
|
||||
module ping-service
|
||||
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/go-ping/ping v1.2.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.2.0 // indirect
|
||||
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 // indirect
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005 // indirect
|
||||
)
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -24,7 +25,7 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const VERSION = "1.0.0"
|
||||
const VERSION = "0.0.4"
|
||||
|
||||
type Config struct {
|
||||
InputFile string `yaml:"input_file"`
|
||||
@@ -132,6 +133,9 @@ func main() {
|
||||
|
||||
// Start health check server
|
||||
go startHealthCheckServer(config.HealthCheckPort)
|
||||
|
||||
// FIX: Start the cache cleanup goroutine
|
||||
go cacheJanitor(config.CooldownMinutes)
|
||||
|
||||
// Setup graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -171,6 +175,31 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// Function to prune the cache periodically
|
||||
func cacheJanitor(cooldownMinutes int) {
|
||||
// Run cleanup every hour (or more frequently if memory is tight)
|
||||
ticker := time.NewTicker(1 * time.Hour)
|
||||
for range ticker.C {
|
||||
cacheMux.Lock()
|
||||
count := 0
|
||||
threshold := time.Now().Add(-time.Duration(cooldownMinutes) * time.Minute)
|
||||
|
||||
for ip, lastPing := range cooldownCache {
|
||||
if lastPing.Before(threshold) {
|
||||
delete(cooldownCache, ip)
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
if verbose && count > 0 {
|
||||
log.Printf("Janitor: Cleaned up %d expired entries from cooldown cache", count)
|
||||
}
|
||||
cacheMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// ... [rest of the logic remains the same: process, readSource, runPing, etc.]
|
||||
|
||||
func startHealthCheckServer(port int) {
|
||||
http.HandleFunc("/health", healthCheckHandler)
|
||||
http.HandleFunc("/ready", readinessHandler)
|
||||
@@ -242,13 +271,13 @@ func updateHealth(status string) {
|
||||
health.Status = status
|
||||
}
|
||||
|
||||
func updateStats(lastRun time.Time, successful, failed int) {
|
||||
func updateStats(lastRun time.Time, successful, failed int64) {
|
||||
healthMux.Lock()
|
||||
defer healthMux.Unlock()
|
||||
health.LastRun = lastRun
|
||||
health.TotalPings += int64(successful + failed)
|
||||
health.SuccessfulPings += int64(successful)
|
||||
health.FailedPings += int64(failed)
|
||||
health.TotalPings += (successful + failed)
|
||||
health.SuccessfulPings += successful
|
||||
health.FailedPings += failed
|
||||
}
|
||||
|
||||
func loadConfig(path string) *Config {
|
||||
@@ -265,7 +294,7 @@ func loadConfig(path string) *Config {
|
||||
}
|
||||
|
||||
func process(cfg *Config) {
|
||||
startTime := time.Now()
|
||||
processStart := time.Now()
|
||||
|
||||
// 1. Read IPs
|
||||
data, err := readSource(cfg.InputFile)
|
||||
@@ -310,8 +339,9 @@ func process(cfg *Config) {
|
||||
// 2. Perform Pings
|
||||
var wg sync.WaitGroup
|
||||
results := make([]PingResult, len(ipsToPing))
|
||||
successful := 0
|
||||
failed := 0
|
||||
|
||||
var successful int64
|
||||
var failed int64
|
||||
|
||||
for i, ip := range ipsToPing {
|
||||
wg.Add(1)
|
||||
@@ -320,9 +350,9 @@ func process(cfg *Config) {
|
||||
results[idx] = runPing(targetIP)
|
||||
|
||||
if results[idx].Error == "" && results[idx].Received > 0 {
|
||||
successful++
|
||||
atomic.AddInt64(&successful, 1)
|
||||
} else {
|
||||
failed++
|
||||
atomic.AddInt64(&failed, 1)
|
||||
}
|
||||
|
||||
// If ping successful and traceroute enabled, do traceroute
|
||||
@@ -349,16 +379,14 @@ func process(cfg *Config) {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Update stats
|
||||
updateStats(time.Now(), successful, failed)
|
||||
|
||||
// 3. Write Output
|
||||
updateStats(time.Now(), successful, failed)
|
||||
outputData, _ := json.MarshalIndent(results, "", " ")
|
||||
err = writeDestination(cfg.OutputFile, outputData)
|
||||
_ = writeDestination(cfg.OutputFile, outputData)
|
||||
if err != nil {
|
||||
log.Printf("Output Error: %v", err)
|
||||
} else if verbose {
|
||||
log.Printf("Wrote results to %s (took %v)", cfg.OutputFile, time.Since(startTime))
|
||||
log.Printf("Cycle complete. Took %v", time.Since(processStart))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,7 +511,7 @@ func runTraceroute(ip string, maxHops int) *TracerouteResult {
|
||||
}
|
||||
}
|
||||
|
||||
result.Error = "traceroute failed (both ICMP and TCP)"
|
||||
result.Error = "traceroute failed"
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -534,7 +562,6 @@ func parseTracerouteOutput(output string, method string) *TracerouteResult {
|
||||
}
|
||||
|
||||
lines := strings.Split(output, "\n")
|
||||
|
||||
hopRegex := regexp.MustCompile(`^\s*(\d+)\s+([0-9.]+)\s+([0-9.]+)\s*ms`)
|
||||
timeoutRegex := regexp.MustCompile(`^\s*(\d+)\s+\*`)
|
||||
|
||||
@@ -569,4 +596,4 @@ func parseTracerouteOutput(output string, method string) *TracerouteResult {
|
||||
func fileExists(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
return err == nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user