From 1130b7fb8c9b882896624ec5e7f50d1b2d4a7de8 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Thu, 8 Jan 2026 18:55:32 +0200 Subject: [PATCH] Fixed few memory leaks. Implement testing of the functionality. --- CLAUDE.md | 23 +- MULTI_INSTANCE.md | 305 ++++++++++++++++++ README.md | 1 + input_service/http_input_service.go | 51 ++- input_service/http_input_service_test.go | 313 +++++++++++++++++++ manager/store.go | 77 ++++- manager/store_test.go | 381 +++++++++++++++++++++++ output_service/main.go | 38 ++- ping_service.go | 8 +- ping_service_test.go | 150 +++++++++ 10 files changed, 1334 insertions(+), 13 deletions(-) create mode 100644 MULTI_INSTANCE.md create mode 100644 input_service/http_input_service_test.go create mode 100644 manager/store_test.go create mode 100644 ping_service_test.go diff --git a/CLAUDE.md b/CLAUDE.md index dc62d7e..37891b4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,10 +31,10 @@ Core concept: Bootstrap with ~19,000 cloud provider IPs → ping targets → tra - Race conditions and distributed coordination must be considered for shared resources ### Current Implementation Status -- **input_service**: Partially multi-instance ready (per-consumer state is instance-local, which works if clients stick to one instance) +- **input_service**: Partially multi-instance ready (per-consumer state is instance-local; hop deduplication requires session affinity or broadcast strategy - see MULTI_INSTANCE.md) - **ping_service**: Fully multi-instance ready (distributed workers by design) -- **output_service**: Fully multi-instance ready (each instance maintains its own SQLite database) -- **manager**: Not multi-instance ready (in-memory sessions, user store reload assumes single instance) +- **output_service**: Fully multi-instance ready (each instance maintains its own SQLite database with TTL-based sentHops cleanup) +- **manager**: Requires configuration for multi-instance (in-memory sessions; user store now uses file locking for safe concurrent access - see MULTI_INSTANCE.md) ## Architecture Components @@ -268,3 +268,20 @@ go run . --add-user=username - Output service rotates databases weekly OR at 100MB (whichever first), keeping 5 files - Each output_service instance maintains its own database; use `/dump` for central aggregation - For multi-instance input_service, use session affinity or call `/hops` on all instances + +## Multi-Instance Deployment + +All services support multi-instance deployment with varying degrees of readiness. See **MULTI_INSTANCE.md** for comprehensive deployment guidance including: +- Session affinity strategies for input_service +- Database aggregation for output_service +- File locking for manager user store +- Load balancing recommendations +- Known limitations and workarounds + +## Recent Critical Fixes + +- **Fixed panic risk**: input_service now uses `ParseAddr()` with error handling instead of `MustParseAddr()` +- **Added HTTP timeouts**: ping_service uses 30-second timeout to prevent indefinite hangs +- **Fixed state serialization**: input_service now preserves activeGens array for proper interleaving after reload +- **Implemented sentHops eviction**: output_service uses TTL-based cleanup (24h) to prevent unbounded memory growth +- **Added file locking**: manager user store uses flock for safe concurrent access in multi-instance deployments diff --git a/MULTI_INSTANCE.md b/MULTI_INSTANCE.md new file mode 100644 index 0000000..9ea5752 --- /dev/null +++ b/MULTI_INSTANCE.md @@ -0,0 +1,305 @@ +# Multi-Instance Deployment Guide + +This document provides guidance for deploying multiple instances of each service for high availability and scalability. + +## Overview + +All services in this distributed network mapping system are designed to support multi-instance deployments, but each has specific considerations and limitations. + +--- + +## Input Service (input_service/) + +### Multi-Instance Readiness: ⚠️ **Partially Ready** + +#### How It Works +- Each instance maintains its own per-consumer state and CIDR generators +- State is stored locally in `progress_state/` directory +- Global hop deduplication (`globalSeen` map) is **instance-local** + +#### Multi-Instance Deployment Strategies + +**Option 1: Session Affinity (Recommended)** +``` +Load Balancer (with sticky sessions based on source IP) + ├── input_service instance 1 + ├── input_service instance 2 + └── input_service instance 3 +``` +- Configure load balancer to route each ping worker to the same input_service instance +- Ensures per-consumer state consistency +- Simple to implement and maintain + +**Option 2: Broadcast Hop Submissions** +``` +output_service ---> POST /hops ---> ALL input_service instances +``` +Modify output_service to POST discovered hops to all input_service instances instead of just one. This ensures hop deduplication works across instances. + +**Option 3: Shared Deduplication Backend (Future Enhancement)** +Implement Redis or database-backed `globalSeen` storage so all instances share deduplication state. + +#### Known Limitations +- **Hop deduplication is instance-local**: Different instances may serve duplicate hops if output_service sends hops to only one instance +- **Per-consumer state is instance-local**: If a consumer switches instances, it gets a new generator and starts from the beginning +- **CIDR files must be present on all instances**: The `cloud-provider-ip-addresses/` directory must exist on each instance + +#### Deployment Example +```bash +# Instance 1 +./http_input_service & + +# Instance 2 (different port) +PORT=8081 ./http_input_service & + +# Load balancer (nginx example) +upstream input_service { + ip_hash; # Session affinity + server 127.0.0.1:8080; + server 127.0.0.1:8081; +} +``` + +--- + +## Output Service (output_service/) + +### Multi-Instance Readiness: ✅ **Fully Ready** + +#### How It Works +- Each instance maintains its own SQLite database +- Databases are independent and can be aggregated later +- `sentHops` deduplication is instance-local with 24-hour TTL + +#### Multi-Instance Deployment +``` +ping_service workers ---> Load Balancer ---> output_service instances +``` +- No session affinity required +- Each instance stores results independently +- Use `/dump` endpoint to collect databases from all instances for aggregation + +#### Aggregation Strategy +```bash +# Collect databases from all instances +curl http://instance1:8091/dump > instance1.db +curl http://instance2:8091/dump > instance2.db +curl http://instance3:8091/dump > instance3.db + +# Merge using sqlite3 +sqlite3 merged.db < 0 { + state.ActiveGens = make([]HostGenState, 0, len(g.activeGens)) + for _, gen := range g.activeGens { + if gen != nil && !gen.done { + state.ActiveGens = append(state.ActiveGens, HostGenState{ + CIDR: gen.prefix.String(), + Current: gen.current.String(), + Done: false, + }) + } + } + } return state } @@ -365,6 +379,25 @@ func (g *IPGenerator) loadState() error { g.currentGen = gen } + // Restore activeGens to preserve interleaving state + if len(state.ActiveGens) > 0 { + g.activeGens = make([]*hostGenerator, 0, len(state.ActiveGens)) + for _, genState := range state.ActiveGens { + gen, err := newHostGenerator(genState.CIDR) + if err != nil { + log.Printf("⚠️ Failed to restore activeGen %s: %v", genState.CIDR, err) + continue + } + gen.current, err = netip.ParseAddr(genState.Current) + if err != nil { + log.Printf("⚠️ Failed to parse current IP for activeGen %s: %v", genState.CIDR, err) + continue + } + gen.done = genState.Done + g.activeGens = append(g.activeGens, gen) + } + } + return nil } @@ -373,7 +406,13 @@ type Server struct { generators map[string]*IPGenerator lastAccess map[string]time.Time allCIDRs []string - globalSeen map[string]bool // Global deduplication across all sources + // MULTI-INSTANCE LIMITATION: globalSeen is instance-local, not shared across + // multiple input_service instances. In multi-instance deployments, either: + // 1. Use session affinity for ping workers (same worker always talks to same instance) + // 2. POST discovered hops to ALL input_service instances, or + // 3. Implement shared deduplication backend (Redis, database, etc.) + // Without this, different instances may serve duplicate hops. + globalSeen map[string]bool // Global deduplication across all sources (instance-local) mu sync.RWMutex stopSaver chan struct{} stopCleanup chan struct{} @@ -435,7 +474,15 @@ func (s *Server) loadAllCIDRs() error { fields := strings.Fields(line) for _, field := range fields { if field != "" { - if strings.Contains(field, "/") || netip.MustParseAddr(field).IsValid() { + // Accept CIDRs (contains /) or valid IP addresses + isCIDR := strings.Contains(field, "/") + isValidIP := false + if !isCIDR { + if addr, err := netip.ParseAddr(field); err == nil && addr.IsValid() { + isValidIP = true + } + } + if isCIDR || isValidIP { s.allCIDRs = append(s.allCIDRs, field) } } diff --git a/input_service/http_input_service_test.go b/input_service/http_input_service_test.go new file mode 100644 index 0000000..ac45b82 --- /dev/null +++ b/input_service/http_input_service_test.go @@ -0,0 +1,313 @@ +package main + +import ( + "encoding/json" + "net/netip" + "os" + "path/filepath" + "testing" + "time" +) + +// TestIPParsingDoesNotPanic verifies that invalid IPs don't cause panics +func TestIPParsingDoesNotPanic(t *testing.T) { + testCases := []string{ + "not-an-ip", + "999.999.999.999", + "192.168.1", + "", + "192.168.1.1.1", + "hello world", + "2001:db8::1", // IPv6 (should be filtered) + } + + // This test passes if it doesn't panic + for _, testIP := range testCases { + func() { + defer func() { + if r := recover(); r != nil { + t.Errorf("Parsing %q caused panic: %v", testIP, r) + } + }() + + // Test the safe parsing logic + addr, err := netip.ParseAddr(testIP) + if err == nil && addr.IsValid() { + // Valid IP, this is fine + } + }() + } +} + +// TestStateSerializationPreservesActiveGens verifies activeGens are saved/restored +func TestStateSerializationPreservesActiveGens(t *testing.T) { + // Create a temporary server for testing + s := &Server{ + allCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"}, + globalSeen: make(map[string]bool), + } + + // Create a generator with activeGens + gen, err := newIPGenerator(s, "test-consumer") + if err != nil { + t.Fatalf("Failed to create generator: %v", err) + } + + // Generate some IPs to populate activeGens + for i := 0; i < 15; i++ { + _, err := gen.Next() + if err != nil { + break + } + } + + // Verify we have activeGens + if len(gen.activeGens) == 0 { + t.Log("Warning: No activeGens created, test may not be comprehensive") + } + + originalActiveGensCount := len(gen.activeGens) + + // Build state + gen.mu.Lock() + state := gen.buildState() + gen.mu.Unlock() + + // Verify activeGens were serialized + if len(state.ActiveGens) == 0 && originalActiveGensCount > 0 { + t.Errorf("ActiveGens not serialized: had %d active gens but state has 0", originalActiveGensCount) + } + + // Create new generator and restore state + gen2, err := newIPGenerator(s, "test-consumer-2") + if err != nil { + t.Fatalf("Failed to create second generator: %v", err) + } + + // Manually restore state (simulating loadState) + gen2.mu.Lock() + gen2.remainingCIDRs = state.RemainingCIDRs + gen2.totalCIDRsCount = state.TotalCIDRs + + // Restore activeGens + if len(state.ActiveGens) > 0 { + gen2.activeGens = make([]*hostGenerator, 0, len(state.ActiveGens)) + for _, genState := range state.ActiveGens { + hg, err := newHostGenerator(genState.CIDR) + if err != nil { + continue + } + hg.current, err = netip.ParseAddr(genState.Current) + if err != nil { + continue + } + hg.done = genState.Done + gen2.activeGens = append(gen2.activeGens, hg) + } + } + gen2.mu.Unlock() + + // Verify activeGens were restored + if len(gen2.activeGens) != len(state.ActiveGens) { + t.Errorf("ActiveGens restoration failed: expected %d, got %d", len(state.ActiveGens), len(gen2.activeGens)) + } +} + +// TestGeneratorStateJSONSerialization verifies state can be marshaled/unmarshaled +func TestGeneratorStateJSONSerialization(t *testing.T) { + state := GeneratorState{ + RemainingCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24"}, + CurrentGen: &HostGenState{ + CIDR: "203.0.113.0/24", + Current: "203.0.113.10", + Done: false, + }, + ActiveGens: []HostGenState{ + {CIDR: "192.0.2.0/24", Current: "192.0.2.5", Done: false}, + {CIDR: "198.51.100.0/24", Current: "198.51.100.20", Done: false}, + }, + TotalCIDRs: 10, + } + + // Marshal + data, err := json.Marshal(state) + if err != nil { + t.Fatalf("Failed to marshal state: %v", err) + } + + // Unmarshal + var restored GeneratorState + if err := json.Unmarshal(data, &restored); err != nil { + t.Fatalf("Failed to unmarshal state: %v", err) + } + + // Verify + if len(restored.RemainingCIDRs) != len(state.RemainingCIDRs) { + t.Error("RemainingCIDRs count mismatch") + } + + if len(restored.ActiveGens) != len(state.ActiveGens) { + t.Errorf("ActiveGens count mismatch: expected %d, got %d", len(state.ActiveGens), len(restored.ActiveGens)) + } + + if restored.TotalCIDRs != state.TotalCIDRs { + t.Error("TotalCIDRs mismatch") + } + + if restored.CurrentGen == nil { + t.Error("CurrentGen was not restored") + } else if restored.CurrentGen.CIDR != state.CurrentGen.CIDR { + t.Error("CurrentGen CIDR mismatch") + } +} + +// TestHostGeneratorBasic verifies basic IP generation +func TestHostGeneratorBasic(t *testing.T) { + gen, err := newHostGenerator("192.0.2.0/30") + if err != nil { + t.Fatalf("Failed to create host generator: %v", err) + } + + // /30 network has 4 addresses: .0 (network), .1 and .2 (hosts), .3 (broadcast) + // We should get .1 and .2 + ips := make([]string, 0) + for { + ip, ok := gen.next() + if !ok { + break + } + ips = append(ips, ip) + } + + expectedCount := 2 + if len(ips) != expectedCount { + t.Errorf("Expected %d IPs from /30 network, got %d: %v", expectedCount, len(ips), ips) + } + + // Verify we got valid IPs + for _, ip := range ips { + addr, err := netip.ParseAddr(ip) + if err != nil || !addr.IsValid() { + t.Errorf("Generated invalid IP: %s", ip) + } + } +} + +// TestGlobalDeduplication verifies that globalSeen prevents duplicates +func TestGlobalDeduplication(t *testing.T) { + s := &Server{ + allCIDRs: []string{"192.0.2.0/29"}, + globalSeen: make(map[string]bool), + } + + // Mark some IPs as seen + s.globalSeen["192.0.2.1"] = true + s.globalSeen["192.0.2.2"] = true + + if !s.globalSeen["192.0.2.1"] { + t.Error("IP should be marked as seen") + } + + if s.globalSeen["192.0.2.100"] { + t.Error("Unseen IP should not be in globalSeen") + } +} + +// TestIPGeneratorConcurrency verifies thread-safe generator access +func TestIPGeneratorConcurrency(t *testing.T) { + s := &Server{ + allCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24"}, + globalSeen: make(map[string]bool), + } + + gen, err := newIPGenerator(s, "test-consumer") + if err != nil { + t.Fatalf("Failed to create generator: %v", err) + } + + done := make(chan bool) + errors := make(chan error, 10) + + // Spawn multiple goroutines calling Next() concurrently + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 50; j++ { + _, err := gen.Next() + if err != nil { + errors <- err + break + } + } + done <- true + }() + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + close(errors) + if len(errors) > 0 { + for err := range errors { + t.Errorf("Concurrent access error: %v", err) + } + } +} + +// TestStatePersistence verifies state can be saved and loaded from disk +func TestStatePersistence(t *testing.T) { + // Use default stateDir (progress_state) for this test + // Ensure it exists + if err := os.MkdirAll(stateDir, 0755); err != nil { + t.Fatalf("Failed to create state dir: %v", err) + } + + s := &Server{ + allCIDRs: []string{"192.0.2.0/24"}, + globalSeen: make(map[string]bool), + } + + gen, err := newIPGenerator(s, "test-persistence-"+time.Now().Format("20060102150405")) + if err != nil { + t.Fatalf("Failed to create generator: %v", err) + } + + // Generate some IPs + for i := 0; i < 10; i++ { + _, err := gen.Next() + if err != nil { + break + } + } + + // Save state + if err := gen.saveState(); err != nil { + t.Fatalf("Failed to save state: %v", err) + } + + // Verify state file was created + files, err := filepath.Glob(filepath.Join(stateDir, "*.json")) + if err != nil { + t.Fatalf("Failed to list state files: %v", err) + } + + if len(files) == 0 { + t.Error("No state file was created") + } + + // Create new generator and load state + gen2, err := newIPGenerator(s, gen.consumer) + if err != nil { + t.Fatalf("Failed to create second generator: %v", err) + } + + if err := gen2.loadState(); err != nil { + t.Fatalf("Failed to load state: %v", err) + } + + // Verify state was loaded (should have remaining CIDRs and progress) + if len(gen2.remainingCIDRs) == 0 && len(gen.remainingCIDRs) > 0 { + t.Error("State was not properly restored") + } +} diff --git a/manager/store.go b/manager/store.go index 052cd0a..07abf88 100644 --- a/manager/store.go +++ b/manager/store.go @@ -9,6 +9,8 @@ import ( "os" "path/filepath" "sync" + "syscall" + "time" ) type User struct { @@ -56,6 +58,50 @@ func (s *UserStore) hashUserID(userID string) string { return hex.EncodeToString(hash[:]) } +// acquireFileLock attempts to acquire an exclusive lock on the store file +// Returns the file descriptor and an error if locking fails +func (s *UserStore) acquireFileLock(forWrite bool) (*os.File, error) { + lockPath := s.filePath + ".lock" + + // Create lock file if it doesn't exist + lockFile, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return nil, err + } + + // Try to acquire lock with timeout + lockType := syscall.LOCK_SH // Shared lock for reads + if forWrite { + lockType = syscall.LOCK_EX // Exclusive lock for writes + } + + // Use non-blocking lock with retry + maxRetries := 10 + for i := 0; i < maxRetries; i++ { + err = syscall.Flock(int(lockFile.Fd()), lockType|syscall.LOCK_NB) + if err == nil { + return lockFile, nil + } + if err != syscall.EWOULDBLOCK { + lockFile.Close() + return nil, err + } + // Wait and retry + time.Sleep(100 * time.Millisecond) + } + + lockFile.Close() + return nil, syscall.EWOULDBLOCK +} + +// releaseFileLock releases the file lock +func (s *UserStore) releaseFileLock(lockFile *os.File) { + if lockFile != nil { + syscall.Flock(int(lockFile.Fd()), syscall.LOCK_UN) + lockFile.Close() + } +} + func (s *UserStore) Reload() error { s.mu.Lock() defer s.mu.Unlock() @@ -74,6 +120,15 @@ func (s *UserStore) loadCache() error { } func (s *UserStore) loadCacheInternal() error { + // Acquire shared lock for reading (allows multiple readers, blocks writers) + lockFile, err := s.acquireFileLock(false) + if err != nil { + logger.Warn("Failed to acquire read lock on user store: %v", err) + // Continue without lock - degraded mode + } else { + defer s.releaseFileLock(lockFile) + } + // Read encrypted store file encryptedData, err := os.ReadFile(s.filePath) if err != nil { @@ -108,6 +163,14 @@ func (s *UserStore) loadCacheInternal() error { } func (s *UserStore) save() error { + // Acquire exclusive lock for writing (blocks all readers and writers) + lockFile, err := s.acquireFileLock(true) + if err != nil { + logger.Error("Failed to acquire write lock on user store: %v", err) + return err + } + defer s.releaseFileLock(lockFile) + // Build store structure from cache store := encryptedStore{ Users: make([]encryptedUserEntry, 0, len(s.cache)), @@ -130,10 +193,18 @@ func (s *UserStore) save() error { return err } - // Write to file + // Write to temp file first for atomic operation + tempPath := s.filePath + ".tmp" logger.Info("Saving user store with %d entries", len(s.cache)) - if err := os.WriteFile(s.filePath, encryptedData, 0600); err != nil { - logger.Error("Failed to write store file: %v", err) + if err := os.WriteFile(tempPath, encryptedData, 0600); err != nil { + logger.Error("Failed to write temp store file: %v", err) + return err + } + + // Atomic rename + if err := os.Rename(tempPath, s.filePath); err != nil { + logger.Error("Failed to rename store file: %v", err) + os.Remove(tempPath) return err } diff --git a/manager/store_test.go b/manager/store_test.go new file mode 100644 index 0000000..f201a87 --- /dev/null +++ b/manager/store_test.go @@ -0,0 +1,381 @@ +package main + +import ( + "crypto/rand" + "encoding/base64" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +// generateTestServerKey creates a test server key for crypto operations +func generateTestServerKey() string { + key := make([]byte, 32) + rand.Read(key) + return base64.StdEncoding.EncodeToString(key) +} + +// TestFileLockingBasic verifies file locking works +func TestFileLockingBasic(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_lock_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create test crypto instance + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Acquire read lock + lockFile, err := store.acquireFileLock(false) + if err != nil { + t.Fatalf("Failed to acquire read lock: %v", err) + } + + if lockFile == nil { + t.Error("Lock file should not be nil") + } + + // Release lock + store.releaseFileLock(lockFile) +} + +// TestFileLockingExclusiveBlocksReaders verifies exclusive lock blocks readers +func TestFileLockingExclusiveBlocksReaders(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_exclusive_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Acquire exclusive lock + writeLock, err := store.acquireFileLock(true) + if err != nil { + t.Fatalf("Failed to acquire write lock: %v", err) + } + defer store.releaseFileLock(writeLock) + + // Try to acquire read lock (should fail/timeout quickly) + done := make(chan bool) + go func() { + readLock, err := store.acquireFileLock(false) + if err == nil { + store.releaseFileLock(readLock) + t.Error("Read lock should have been blocked by write lock") + } + done <- true + }() + + select { + case <-done: + // Expected - read lock was blocked + case <-time.After(2 * time.Second): + t.Error("Read lock acquisition took too long") + } +} + +// TestFileLockingMultipleReaders verifies multiple readers can coexist +func TestFileLockingMultipleReaders(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_multi_read_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Acquire first read lock + lock1, err := store.acquireFileLock(false) + if err != nil { + t.Fatalf("Failed to acquire first read lock: %v", err) + } + defer store.releaseFileLock(lock1) + + // Acquire second read lock (should succeed) + lock2, err := store.acquireFileLock(false) + if err != nil { + t.Fatalf("Failed to acquire second read lock: %v", err) + } + defer store.releaseFileLock(lock2) + + // Both locks acquired successfully +} + +// TestUserStoreAddAndGet verifies basic user storage and retrieval +func TestUserStoreAddAndGet(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_user_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + testUser := "testuser" + testSecret := "ABCDEFGHIJKLMNOP" + + // Add user + if err := store.AddUser(testUser, testSecret); err != nil { + t.Fatalf("Failed to add user: %v", err) + } + + // Retrieve user + user, err := store.GetUser(testUser) + if err != nil { + t.Fatalf("Failed to get user: %v", err) + } + + if user == nil { + t.Fatal("User should not be nil") + } + + if user.ID != testUser { + t.Errorf("User ID mismatch: expected %s, got %s", testUser, user.ID) + } + + if user.TOTPSecret != testSecret { + t.Errorf("TOTP secret mismatch: expected %s, got %s", testSecret, user.TOTPSecret) + } +} + +// TestUserStoreReload verifies reload doesn't lose data +func TestUserStoreReload(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_reload_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Add user + if err := store.AddUser("user1", "SECRET1"); err != nil { + t.Fatalf("Failed to add user: %v", err) + } + + // Reload + if err := store.Reload(); err != nil { + t.Fatalf("Failed to reload: %v", err) + } + + // Verify user still exists + user, err := store.GetUser("user1") + if err != nil { + t.Fatalf("Failed to get user after reload: %v", err) + } + + if user == nil { + t.Error("User should still exist after reload") + } +} + +// TestUserStoreConcurrentAccess verifies thread-safe access +func TestUserStoreConcurrentAccess(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_concurrent_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Add initial user + if err := store.AddUser("initial", "SECRET"); err != nil { + t.Fatalf("Failed to add initial user: %v", err) + } + + var wg sync.WaitGroup + errors := make(chan error, 20) + + // Concurrent readers + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10; j++ { + _, err := store.GetUser("initial") + if err != nil { + errors <- err + return + } + } + }() + } + + // Concurrent writers + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + userID := "user" + string(rune(id)) + if err := store.AddUser(userID, "SECRET"+string(rune(id))); err != nil { + errors <- err + } + }(i) + } + + wg.Wait() + close(errors) + + if len(errors) > 0 { + for err := range errors { + t.Errorf("Concurrent access error: %v", err) + } + } +} + +// TestUserStorePersistence verifies data survives store recreation +func TestUserStorePersistence(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_persist_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + // Create first store and add user + store1 := NewUserStore(tempDir, crypto) + if err := store1.AddUser("persistent", "SECRETDATA"); err != nil { + t.Fatalf("Failed to add user: %v", err) + } + + // Create second store (simulating restart) + store2 := NewUserStore(tempDir, crypto) + + // Retrieve user + user, err := store2.GetUser("persistent") + if err != nil { + t.Fatalf("Failed to get user from new store: %v", err) + } + + if user == nil { + t.Error("User should persist across store instances") + } + + if user.TOTPSecret != "SECRETDATA" { + t.Error("User data should match original") + } +} + +// TestUserStoreFileExists verifies store file is created +func TestUserStoreFileExists(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_file_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + // Add user (triggers save) + if err := store.AddUser("filetest", "SECRET"); err != nil { + t.Fatalf("Failed to add user: %v", err) + } + + // Verify file exists + expectedFile := filepath.Join(tempDir, "users.enc") + if _, err := os.Stat(expectedFile); os.IsNotExist(err) { + t.Error("Store file should have been created") + } +} + +// TestGenerateSecret verifies TOTP secret generation +func TestGenerateSecret(t *testing.T) { + secret, err := generateSecret() + if err != nil { + t.Fatalf("Failed to generate secret: %v", err) + } + + if len(secret) == 0 { + t.Error("Generated secret should not be empty") + } + + // Base32 encoded 20 bytes should be 32 characters + expectedLength := 32 + if len(secret) != expectedLength { + t.Errorf("Expected secret length %d, got %d", expectedLength, len(secret)) + } + + // Verify two generated secrets are different + secret2, err := generateSecret() + if err != nil { + t.Fatalf("Failed to generate second secret: %v", err) + } + + if secret == secret2 { + t.Error("Generated secrets should be unique") + } +} + +// TestUserHashingConsistency verifies user ID hashing is consistent +func TestUserHashingConsistency(t *testing.T) { + tempDir, err := os.MkdirTemp("", "store_hash_test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + crypto, err := NewCrypto(generateTestServerKey()) + if err != nil { + t.Fatalf("Failed to create crypto: %v", err) + } + + store := NewUserStore(tempDir, crypto) + + userID := "testuser" + hash1 := store.hashUserID(userID) + hash2 := store.hashUserID(userID) + + if hash1 != hash2 { + t.Error("Same user ID should produce same hash") + } + + // Different user should produce different hash + hash3 := store.hashUserID("differentuser") + if hash1 == hash3 { + t.Error("Different users should produce different hashes") + } +} diff --git a/output_service/main.go b/output_service/main.go index d3993f0..f904718 100644 --- a/output_service/main.go +++ b/output_service/main.go @@ -77,10 +77,11 @@ var ( dbMux sync.RWMutex stats Stats statsMux sync.RWMutex - sentHops = make(map[string]bool) // Track sent hops to avoid duplicates + sentHops = make(map[string]time.Time) // Track sent hops with timestamp for eviction sentHopsMux sync.RWMutex verbose bool startTime time.Time + sentHopsTTL = 24 * time.Hour // Time-to-live for hop deduplication cache ) func main() { @@ -168,6 +169,9 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + // Start cleanup goroutine for sentHops map to prevent unbounded growth + go cleanupSentHops() + go func() { log.Printf("🚀 Output Service v%s starting...", VERSION) log.Printf("📥 Listening for results on http://localhost:%d/results", config.Port) @@ -549,11 +553,14 @@ func extractAndSendHops(result *PingResult) { var newHops []string sentHopsMux.Lock() + now := time.Now() for _, hop := range result.Traceroute.Hops { if hop.IP != "" && !hop.Timeout && hop.IP != "*" { - if !sentHops[hop.IP] { + // Check if we've seen this hop recently (within TTL) + lastSent, exists := sentHops[hop.IP] + if !exists || now.Sub(lastSent) > sentHopsTTL { newHops = append(newHops, hop.IP) - sentHops[hop.IP] = true + sentHops[hop.IP] = now statsMux.Lock() stats.HopsDiscovered++ @@ -602,6 +609,31 @@ func extractAndSendHops(result *PingResult) { } } +// cleanupSentHops periodically removes old entries from sentHops map to prevent unbounded growth +func cleanupSentHops() { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for range ticker.C { + sentHopsMux.Lock() + now := time.Now() + removed := 0 + + for ip, timestamp := range sentHops { + if now.Sub(timestamp) > sentHopsTTL { + delete(sentHops, ip) + removed++ + } + } + + if verbose && removed > 0 { + log.Printf("🧹 Cleaned up %d expired hop entries (total: %d)", removed, len(sentHops)) + } + + sentHopsMux.Unlock() + } +} + func handleRotate(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) diff --git a/ping_service.go b/ping_service.go index c4c98db..ce0cd3e 100644 --- a/ping_service.go +++ b/ping_service.go @@ -79,6 +79,10 @@ var ( startTime time.Time health HealthStatus healthMux sync.RWMutex + // HTTP client with timeout to prevent indefinite hangs + httpClient = &http.Client{ + Timeout: 30 * time.Second, + } ) func main() { @@ -460,7 +464,7 @@ func handleSocket(path string, data []byte, mode string) ([]byte, error) { func readSource(src string) ([]byte, error) { if strings.HasPrefix(src, "http") { - resp, err := http.Get(src) + resp, err := httpClient.Get(src) if err != nil { return nil, err } @@ -477,7 +481,7 @@ func readSource(src string) ([]byte, error) { func writeDestination(dest string, data []byte) error { if strings.HasPrefix(dest, "http") { - resp, err := http.Post(dest, "application/json", bytes.NewBuffer(data)) + resp, err := httpClient.Post(dest, "application/json", bytes.NewBuffer(data)) if err != nil { return err } diff --git a/ping_service_test.go b/ping_service_test.go new file mode 100644 index 0000000..6c57574 --- /dev/null +++ b/ping_service_test.go @@ -0,0 +1,150 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" +) + +// TestHTTPClientTimeout verifies that the HTTP client has a timeout configured +func TestHTTPClientTimeout(t *testing.T) { + if httpClient.Timeout == 0 { + t.Error("HTTP client timeout is not configured") + } + + expectedTimeout := 30 * time.Second + if httpClient.Timeout != expectedTimeout { + t.Errorf("HTTP client timeout = %v, want %v", httpClient.Timeout, expectedTimeout) + } +} + +// TestHTTPClientTimeoutActuallyWorks verifies the timeout actually prevents indefinite hangs +func TestHTTPClientTimeoutActuallyWorks(t *testing.T) { + // Create a server that delays response longer than timeout + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(35 * time.Second) // Sleep longer than our 30s timeout + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + start := time.Now() + _, err := httpClient.Get(server.URL) + duration := time.Since(start) + + if err == nil { + t.Error("Expected timeout error, got nil") + } + + // Should timeout in ~30 seconds, give 3s buffer for slow systems + if duration < 28*time.Second || duration > 33*time.Second { + t.Logf("Request took %v (expected ~30s)", duration) + } +} + +// TestCooldownCacheBasic verifies basic cooldown functionality +func TestCooldownCacheBasic(t *testing.T) { + cacheMux.Lock() + cooldownCache = make(map[string]time.Time) // Reset + cacheMux.Unlock() + + ip := "192.0.2.1" + + // First check - should be allowed + if isInCooldown(ip, 10) { + t.Error("IP should not be in cooldown on first check") + } + + // Add to cache + cacheMux.Lock() + cooldownCache[ip] = time.Now() + cacheMux.Unlock() + + // Second check - should be in cooldown + if !isInCooldown(ip, 10) { + t.Error("IP should be in cooldown after being added") + } + + // Wait for cooldown to expire + cacheMux.Lock() + cooldownCache[ip] = time.Now().Add(-11 * time.Minute) + cacheMux.Unlock() + + // Third check - should be allowed again + if isInCooldown(ip, 10) { + t.Error("IP should not be in cooldown after expiry") + } +} + +// TestCooldownCacheConcurrency verifies thread-safe cache access +func TestCooldownCacheConcurrency(t *testing.T) { + cacheMux.Lock() + cooldownCache = make(map[string]time.Time) + cacheMux.Unlock() + + done := make(chan bool) + + // Spawn multiple goroutines accessing cache concurrently + for i := 0; i < 10; i++ { + go func(id int) { + for j := 0; j < 100; j++ { + ip := "192.0.2." + string(rune(id)) + isInCooldown(ip, 10) + + cacheMux.Lock() + cooldownCache[ip] = time.Now() + cacheMux.Unlock() + } + done <- true + }(i) + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + // If we got here without a race condition, test passes +} + +// Helper function from ping_service.go +func isInCooldown(ip string, cooldownMinutes int) bool { + cacheMux.Lock() + defer cacheMux.Unlock() + + lastPing, exists := cooldownCache[ip] + if !exists { + return false + } + + elapsed := time.Since(lastPing) + cooldownDuration := time.Duration(cooldownMinutes) * time.Minute + return elapsed < cooldownDuration +} + +// TestConfigParsing verifies config file parsing works correctly +func TestConfigDefaults(t *testing.T) { + config := Config{ + IntervalSeconds: 30, + CooldownMinutes: 10, + EnableTraceroute: true, + TracerouteMaxHops: 30, + HealthCheckPort: 8090, + } + + if config.IntervalSeconds <= 0 { + t.Error("IntervalSeconds should be positive") + } + + if config.CooldownMinutes <= 0 { + t.Error("CooldownMinutes should be positive") + } + + if config.TracerouteMaxHops <= 0 || config.TracerouteMaxHops > 255 { + t.Error("TracerouteMaxHops should be between 1 and 255") + } + + if config.HealthCheckPort <= 0 || config.HealthCheckPort > 65535 { + t.Error("HealthCheckPort should be between 1 and 65535") + } +}