Fixed few memory leaks. Implement testing of the functionality.

This commit is contained in:
Kalzu Rekku
2026-01-08 18:55:32 +02:00
parent c663ec0431
commit 1130b7fb8c
10 changed files with 1334 additions and 13 deletions

View File

@@ -31,10 +31,10 @@ Core concept: Bootstrap with ~19,000 cloud provider IPs → ping targets → tra
- Race conditions and distributed coordination must be considered for shared resources
### Current Implementation Status
- **input_service**: Partially multi-instance ready (per-consumer state is instance-local, which works if clients stick to one instance)
- **input_service**: Partially multi-instance ready (per-consumer state is instance-local; hop deduplication requires session affinity or broadcast strategy - see MULTI_INSTANCE.md)
- **ping_service**: Fully multi-instance ready (distributed workers by design)
- **output_service**: Fully multi-instance ready (each instance maintains its own SQLite database)
- **manager**: Not multi-instance ready (in-memory sessions, user store reload assumes single instance)
- **output_service**: Fully multi-instance ready (each instance maintains its own SQLite database with TTL-based sentHops cleanup)
- **manager**: Requires configuration for multi-instance (in-memory sessions; user store now uses file locking for safe concurrent access - see MULTI_INSTANCE.md)
## Architecture Components
@@ -268,3 +268,20 @@ go run . --add-user=username
- Output service rotates databases weekly OR at 100MB (whichever first), keeping 5 files
- Each output_service instance maintains its own database; use `/dump` for central aggregation
- For multi-instance input_service, use session affinity or call `/hops` on all instances
## Multi-Instance Deployment
All services support multi-instance deployment with varying degrees of readiness. See **MULTI_INSTANCE.md** for comprehensive deployment guidance including:
- Session affinity strategies for input_service
- Database aggregation for output_service
- File locking for manager user store
- Load balancing recommendations
- Known limitations and workarounds
## Recent Critical Fixes
- **Fixed panic risk**: input_service now uses `ParseAddr()` with error handling instead of `MustParseAddr()`
- **Added HTTP timeouts**: ping_service uses 30-second timeout to prevent indefinite hangs
- **Fixed state serialization**: input_service now preserves activeGens array for proper interleaving after reload
- **Implemented sentHops eviction**: output_service uses TTL-based cleanup (24h) to prevent unbounded memory growth
- **Added file locking**: manager user store uses flock for safe concurrent access in multi-instance deployments

305
MULTI_INSTANCE.md Normal file
View File

@@ -0,0 +1,305 @@
# Multi-Instance Deployment Guide
This document provides guidance for deploying multiple instances of each service for high availability and scalability.
## Overview
All services in this distributed network mapping system are designed to support multi-instance deployments, but each has specific considerations and limitations.
---
## Input Service (input_service/)
### Multi-Instance Readiness: ⚠️ **Partially Ready**
#### How It Works
- Each instance maintains its own per-consumer state and CIDR generators
- State is stored locally in `progress_state/` directory
- Global hop deduplication (`globalSeen` map) is **instance-local**
#### Multi-Instance Deployment Strategies
**Option 1: Session Affinity (Recommended)**
```
Load Balancer (with sticky sessions based on source IP)
├── input_service instance 1
├── input_service instance 2
└── input_service instance 3
```
- Configure load balancer to route each ping worker to the same input_service instance
- Ensures per-consumer state consistency
- Simple to implement and maintain
**Option 2: Broadcast Hop Submissions**
```
output_service ---> POST /hops ---> ALL input_service instances
```
Modify output_service to POST discovered hops to all input_service instances instead of just one. This ensures hop deduplication works across instances.
**Option 3: Shared Deduplication Backend (Future Enhancement)**
Implement Redis or database-backed `globalSeen` storage so all instances share deduplication state.
#### Known Limitations
- **Hop deduplication is instance-local**: Different instances may serve duplicate hops if output_service sends hops to only one instance
- **Per-consumer state is instance-local**: If a consumer switches instances, it gets a new generator and starts from the beginning
- **CIDR files must be present on all instances**: The `cloud-provider-ip-addresses/` directory must exist on each instance
#### Deployment Example
```bash
# Instance 1
./http_input_service &
# Instance 2 (different port)
PORT=8081 ./http_input_service &
# Load balancer (nginx example)
upstream input_service {
ip_hash; # Session affinity
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}
```
---
## Output Service (output_service/)
### Multi-Instance Readiness: ✅ **Fully Ready**
#### How It Works
- Each instance maintains its own SQLite database
- Databases are independent and can be aggregated later
- `sentHops` deduplication is instance-local with 24-hour TTL
#### Multi-Instance Deployment
```
ping_service workers ---> Load Balancer ---> output_service instances
```
- No session affinity required
- Each instance stores results independently
- Use `/dump` endpoint to collect databases from all instances for aggregation
#### Aggregation Strategy
```bash
# Collect databases from all instances
curl http://instance1:8091/dump > instance1.db
curl http://instance2:8091/dump > instance2.db
curl http://instance3:8091/dump > instance3.db
# Merge using sqlite3
sqlite3 merged.db <<EOF
ATTACH 'instance1.db' AS db1;
ATTACH 'instance2.db' AS db2;
ATTACH 'instance3.db' AS db3;
INSERT INTO ping_results SELECT * FROM db1.ping_results;
INSERT INTO ping_results SELECT * FROM db2.ping_results;
INSERT INTO ping_results SELECT * FROM db3.ping_results;
INSERT INTO traceroute_hops SELECT * FROM db1.traceroute_hops;
INSERT INTO traceroute_hops SELECT * FROM db2.traceroute_hops;
INSERT INTO traceroute_hops SELECT * FROM db3.traceroute_hops;
EOF
```
#### Deployment Example
```bash
# Instance 1
./output_service --port=8081 --health-port=8091 --db-dir=/data/output1 &
# Instance 2
./output_service --port=8082 --health-port=8092 --db-dir=/data/output2 &
# Instance 3
./output_service --port=8083 --health-port=8093 --db-dir=/data/output3 &
```
---
## Ping Service (ping_service/)
### Multi-Instance Readiness: ✅ **Fully Ready**
#### How It Works
- Designed from the ground up for distributed operation
- Each worker independently polls input_service and submits results
- Cooldown cache is instance-local (intentional - distributed workers coordinate via cooldown duration)
#### Multi-Instance Deployment
```
input_service <--- ping_service workers (many instances)
|
v
output_service
```
- Deploy as many workers as needed across different networks/locations
- Workers can run on Raspberry Pis, VPS, cloud instances, etc.
- No coordination required between workers
#### Deployment Example
```bash
# Worker 1 (local network)
./ping_service -config config.yaml &
# Worker 2 (VPS)
ssh vps1 "./ping_service -config config.yaml" &
# Worker 3 (different geographic location)
ssh vps2 "./ping_service -config config.yaml" &
```
---
## Manager (manager/)
### Multi-Instance Readiness: ⚠️ **Requires Configuration**
#### How It Works
- Session store is **in-memory** (not shared across instances)
- User store uses file-based storage with file locking (multi-instance safe as of latest update)
- Worker registry is instance-local
#### Multi-Instance Deployment Strategies
**Option 1: Active-Passive with Failover**
```
Load Balancer (active-passive)
├── manager instance 1 (active)
└── manager instance 2 (standby)
```
- Only one instance active at a time
- Failover on primary failure
- Simplest approach, no session coordination needed
**Option 2: Shared Session Store (Recommended for Active-Active)**
Implement Redis or database-backed session storage to enable true active-active multi-instance deployment.
**Required Changes for Active-Active:**
```go
// Replace in-memory sessions (main.go:31-34) with Redis
var sessions = redis.NewSessionStore(redisClient)
```
#### Current Limitations
- **Sessions are not shared**: User authenticated on instance A cannot access instance B
- **Worker registry is not shared**: Each instance maintains its own worker list
- **dy.fi updates may conflict**: Multiple instances updating the same domain simultaneously
#### User Store File Locking (✅ Fixed)
As of the latest update, the user store uses file locking to prevent race conditions:
- **Shared locks** for reads (multiple readers allowed)
- **Exclusive locks** for writes (blocks all readers and writers)
- **Atomic write-then-rename** prevents corruption
- Safe for multi-instance deployment when instances share the same filesystem
#### Deployment Example (Active-Passive)
```bash
# Primary instance
./manager --port=8080 --domain=manager.dy.fi &
# Secondary instance (standby)
MANAGER_PORT=8081 ./manager &
# Load balancer health check both, route to active only
```
---
## General Multi-Instance Recommendations
### Health Checks
All services expose `/health` and `/ready` endpoints. Configure your load balancer to:
- Route traffic only to healthy instances
- Remove failed instances from rotation automatically
- Monitor `/metrics` endpoint for Prometheus integration
### Monitoring
Add `instance_id` labels to metrics for per-instance monitoring:
```go
// Recommended enhancement for all services
var instanceID = os.Hostname()
```
### File Locking
Services that write to shared storage should use file locking (like manager user store) to prevent corruption:
```go
syscall.Flock(fd, syscall.LOCK_EX) // Exclusive lock
syscall.Flock(fd, syscall.LOCK_SH) // Shared lock
```
### Network Considerations
- **Latency**: Place input_service close to ping workers to minimize polling latency
- **Bandwidth**: output_service should have sufficient bandwidth for result ingestion
- **NAT Traversal**: Use manager gateway mode for ping workers behind NAT
---
## Troubleshooting Multi-Instance Deployments
### Input Service: Duplicate Hops Served
**Symptom**: Same hop appears multiple times in different workers
**Cause**: Hop deduplication is instance-local
**Solution**: Implement session affinity or broadcast hop submissions
### Manager: Sessions Lost After Reconnect
**Symptom**: User logged out when load balancer switches instances
**Cause**: Sessions are in-memory, not shared
**Solution**: Use session affinity in load balancer or implement shared session store
### Output Service: Database Conflicts
**Symptom**: Database file corruption or lock timeouts
**Cause**: Multiple instances writing to same database file
**Solution**: Each instance MUST have its own `--db-dir`, then aggregate later
### Ping Service: Excessive Pinging
**Symptom**: Same IP pinged too frequently
**Cause**: Too many workers with short cooldown period
**Solution**: Increase `cooldown_minutes` in config.yaml
---
## Production Deployment Checklist
- [ ] Input service: Configure session affinity or hop broadcast
- [ ] Output service: Each instance has unique `--db-dir`
- [ ] Ping service: Cooldown duration accounts for total worker count
- [ ] Manager: Decide active-passive or implement shared sessions
- [ ] All services: Health check endpoints configured in load balancer
- [ ] All services: Metrics exported to monitoring system
- [ ] All services: Logs aggregated to central logging system
- [ ] File-based state: Shared filesystem or backup/sync strategy
- [ ] Database rotation: Automated collection of output service dumps
---
## Future Enhancements
### High Priority
1. **Shared session store for manager** (Redis/database)
2. **Shared hop deduplication for input_service** (Redis)
3. **Distributed worker coordination** for ping_service cooldowns
### Medium Priority
4. **Instance ID labels in metrics** for better observability
5. **Graceful shutdown coordination** to prevent data loss
6. **Health check improvements** to verify actual functionality
### Low Priority
7. **Automated database aggregation** for output_service
8. **Service mesh integration** (Consul, etcd) for discovery
9. **Horizontal autoscaling** based on load metrics
---
## Summary Table
| Service | Multi-Instance Ready | Session Affinity Needed | Shared Storage Needed | Notes |
|---------|---------------------|------------------------|---------------------|-------|
| input_service | ⚠️ Partial | ✅ Yes (recommended) | ❌ No | Hop dedup is instance-local |
| output_service | ✅ Full | ❌ No | ❌ No | Each instance has own DB |
| ping_service | ✅ Full | ❌ No | ❌ No | Fully distributed by design |
| manager | ⚠️ Requires config | ✅ Yes (sessions) | ✅ Yes (user store) | Sessions in-memory; user store file-locked |
---
For questions or issues with multi-instance deployments, refer to the service-specific README files or open an issue in the project repository.

View File

@@ -302,6 +302,7 @@ Each service exposes health endpoints for monitoring:
## Documentation
- `CLAUDE.md` - Comprehensive project documentation and guidance
- `MULTI_INSTANCE.md` - Multi-instance deployment guide with production strategies
- `ping_service_README.md` - Ping service details
- `input_service/README.md` - Input service details
- `output_service/README.md` - Output service details

View File

@@ -38,6 +38,7 @@ const (
type GeneratorState struct {
RemainingCIDRs []string `json:"remaining_cidrs"`
CurrentGen *HostGenState `json:"current_gen,omitempty"`
ActiveGens []HostGenState `json:"active_gens,omitempty"`
TotalCIDRs int `json:"total_cidrs"`
}
@@ -279,6 +280,19 @@ func (g *IPGenerator) buildState() GeneratorState {
Done: false,
}
}
// Save activeGens to preserve interleaving state
if len(g.activeGens) > 0 {
state.ActiveGens = make([]HostGenState, 0, len(g.activeGens))
for _, gen := range g.activeGens {
if gen != nil && !gen.done {
state.ActiveGens = append(state.ActiveGens, HostGenState{
CIDR: gen.prefix.String(),
Current: gen.current.String(),
Done: false,
})
}
}
}
return state
}
@@ -365,6 +379,25 @@ func (g *IPGenerator) loadState() error {
g.currentGen = gen
}
// Restore activeGens to preserve interleaving state
if len(state.ActiveGens) > 0 {
g.activeGens = make([]*hostGenerator, 0, len(state.ActiveGens))
for _, genState := range state.ActiveGens {
gen, err := newHostGenerator(genState.CIDR)
if err != nil {
log.Printf("⚠️ Failed to restore activeGen %s: %v", genState.CIDR, err)
continue
}
gen.current, err = netip.ParseAddr(genState.Current)
if err != nil {
log.Printf("⚠️ Failed to parse current IP for activeGen %s: %v", genState.CIDR, err)
continue
}
gen.done = genState.Done
g.activeGens = append(g.activeGens, gen)
}
}
return nil
}
@@ -373,7 +406,13 @@ type Server struct {
generators map[string]*IPGenerator
lastAccess map[string]time.Time
allCIDRs []string
globalSeen map[string]bool // Global deduplication across all sources
// MULTI-INSTANCE LIMITATION: globalSeen is instance-local, not shared across
// multiple input_service instances. In multi-instance deployments, either:
// 1. Use session affinity for ping workers (same worker always talks to same instance)
// 2. POST discovered hops to ALL input_service instances, or
// 3. Implement shared deduplication backend (Redis, database, etc.)
// Without this, different instances may serve duplicate hops.
globalSeen map[string]bool // Global deduplication across all sources (instance-local)
mu sync.RWMutex
stopSaver chan struct{}
stopCleanup chan struct{}
@@ -435,7 +474,15 @@ func (s *Server) loadAllCIDRs() error {
fields := strings.Fields(line)
for _, field := range fields {
if field != "" {
if strings.Contains(field, "/") || netip.MustParseAddr(field).IsValid() {
// Accept CIDRs (contains /) or valid IP addresses
isCIDR := strings.Contains(field, "/")
isValidIP := false
if !isCIDR {
if addr, err := netip.ParseAddr(field); err == nil && addr.IsValid() {
isValidIP = true
}
}
if isCIDR || isValidIP {
s.allCIDRs = append(s.allCIDRs, field)
}
}

View File

@@ -0,0 +1,313 @@
package main
import (
"encoding/json"
"net/netip"
"os"
"path/filepath"
"testing"
"time"
)
// TestIPParsingDoesNotPanic verifies that invalid IPs don't cause panics
func TestIPParsingDoesNotPanic(t *testing.T) {
testCases := []string{
"not-an-ip",
"999.999.999.999",
"192.168.1",
"",
"192.168.1.1.1",
"hello world",
"2001:db8::1", // IPv6 (should be filtered)
}
// This test passes if it doesn't panic
for _, testIP := range testCases {
func() {
defer func() {
if r := recover(); r != nil {
t.Errorf("Parsing %q caused panic: %v", testIP, r)
}
}()
// Test the safe parsing logic
addr, err := netip.ParseAddr(testIP)
if err == nil && addr.IsValid() {
// Valid IP, this is fine
}
}()
}
}
// TestStateSerializationPreservesActiveGens verifies activeGens are saved/restored
func TestStateSerializationPreservesActiveGens(t *testing.T) {
// Create a temporary server for testing
s := &Server{
allCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"},
globalSeen: make(map[string]bool),
}
// Create a generator with activeGens
gen, err := newIPGenerator(s, "test-consumer")
if err != nil {
t.Fatalf("Failed to create generator: %v", err)
}
// Generate some IPs to populate activeGens
for i := 0; i < 15; i++ {
_, err := gen.Next()
if err != nil {
break
}
}
// Verify we have activeGens
if len(gen.activeGens) == 0 {
t.Log("Warning: No activeGens created, test may not be comprehensive")
}
originalActiveGensCount := len(gen.activeGens)
// Build state
gen.mu.Lock()
state := gen.buildState()
gen.mu.Unlock()
// Verify activeGens were serialized
if len(state.ActiveGens) == 0 && originalActiveGensCount > 0 {
t.Errorf("ActiveGens not serialized: had %d active gens but state has 0", originalActiveGensCount)
}
// Create new generator and restore state
gen2, err := newIPGenerator(s, "test-consumer-2")
if err != nil {
t.Fatalf("Failed to create second generator: %v", err)
}
// Manually restore state (simulating loadState)
gen2.mu.Lock()
gen2.remainingCIDRs = state.RemainingCIDRs
gen2.totalCIDRsCount = state.TotalCIDRs
// Restore activeGens
if len(state.ActiveGens) > 0 {
gen2.activeGens = make([]*hostGenerator, 0, len(state.ActiveGens))
for _, genState := range state.ActiveGens {
hg, err := newHostGenerator(genState.CIDR)
if err != nil {
continue
}
hg.current, err = netip.ParseAddr(genState.Current)
if err != nil {
continue
}
hg.done = genState.Done
gen2.activeGens = append(gen2.activeGens, hg)
}
}
gen2.mu.Unlock()
// Verify activeGens were restored
if len(gen2.activeGens) != len(state.ActiveGens) {
t.Errorf("ActiveGens restoration failed: expected %d, got %d", len(state.ActiveGens), len(gen2.activeGens))
}
}
// TestGeneratorStateJSONSerialization verifies state can be marshaled/unmarshaled
func TestGeneratorStateJSONSerialization(t *testing.T) {
state := GeneratorState{
RemainingCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24"},
CurrentGen: &HostGenState{
CIDR: "203.0.113.0/24",
Current: "203.0.113.10",
Done: false,
},
ActiveGens: []HostGenState{
{CIDR: "192.0.2.0/24", Current: "192.0.2.5", Done: false},
{CIDR: "198.51.100.0/24", Current: "198.51.100.20", Done: false},
},
TotalCIDRs: 10,
}
// Marshal
data, err := json.Marshal(state)
if err != nil {
t.Fatalf("Failed to marshal state: %v", err)
}
// Unmarshal
var restored GeneratorState
if err := json.Unmarshal(data, &restored); err != nil {
t.Fatalf("Failed to unmarshal state: %v", err)
}
// Verify
if len(restored.RemainingCIDRs) != len(state.RemainingCIDRs) {
t.Error("RemainingCIDRs count mismatch")
}
if len(restored.ActiveGens) != len(state.ActiveGens) {
t.Errorf("ActiveGens count mismatch: expected %d, got %d", len(state.ActiveGens), len(restored.ActiveGens))
}
if restored.TotalCIDRs != state.TotalCIDRs {
t.Error("TotalCIDRs mismatch")
}
if restored.CurrentGen == nil {
t.Error("CurrentGen was not restored")
} else if restored.CurrentGen.CIDR != state.CurrentGen.CIDR {
t.Error("CurrentGen CIDR mismatch")
}
}
// TestHostGeneratorBasic verifies basic IP generation
func TestHostGeneratorBasic(t *testing.T) {
gen, err := newHostGenerator("192.0.2.0/30")
if err != nil {
t.Fatalf("Failed to create host generator: %v", err)
}
// /30 network has 4 addresses: .0 (network), .1 and .2 (hosts), .3 (broadcast)
// We should get .1 and .2
ips := make([]string, 0)
for {
ip, ok := gen.next()
if !ok {
break
}
ips = append(ips, ip)
}
expectedCount := 2
if len(ips) != expectedCount {
t.Errorf("Expected %d IPs from /30 network, got %d: %v", expectedCount, len(ips), ips)
}
// Verify we got valid IPs
for _, ip := range ips {
addr, err := netip.ParseAddr(ip)
if err != nil || !addr.IsValid() {
t.Errorf("Generated invalid IP: %s", ip)
}
}
}
// TestGlobalDeduplication verifies that globalSeen prevents duplicates
func TestGlobalDeduplication(t *testing.T) {
s := &Server{
allCIDRs: []string{"192.0.2.0/29"},
globalSeen: make(map[string]bool),
}
// Mark some IPs as seen
s.globalSeen["192.0.2.1"] = true
s.globalSeen["192.0.2.2"] = true
if !s.globalSeen["192.0.2.1"] {
t.Error("IP should be marked as seen")
}
if s.globalSeen["192.0.2.100"] {
t.Error("Unseen IP should not be in globalSeen")
}
}
// TestIPGeneratorConcurrency verifies thread-safe generator access
func TestIPGeneratorConcurrency(t *testing.T) {
s := &Server{
allCIDRs: []string{"192.0.2.0/24", "198.51.100.0/24"},
globalSeen: make(map[string]bool),
}
gen, err := newIPGenerator(s, "test-consumer")
if err != nil {
t.Fatalf("Failed to create generator: %v", err)
}
done := make(chan bool)
errors := make(chan error, 10)
// Spawn multiple goroutines calling Next() concurrently
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 50; j++ {
_, err := gen.Next()
if err != nil {
errors <- err
break
}
}
done <- true
}()
}
// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}
close(errors)
if len(errors) > 0 {
for err := range errors {
t.Errorf("Concurrent access error: %v", err)
}
}
}
// TestStatePersistence verifies state can be saved and loaded from disk
func TestStatePersistence(t *testing.T) {
// Use default stateDir (progress_state) for this test
// Ensure it exists
if err := os.MkdirAll(stateDir, 0755); err != nil {
t.Fatalf("Failed to create state dir: %v", err)
}
s := &Server{
allCIDRs: []string{"192.0.2.0/24"},
globalSeen: make(map[string]bool),
}
gen, err := newIPGenerator(s, "test-persistence-"+time.Now().Format("20060102150405"))
if err != nil {
t.Fatalf("Failed to create generator: %v", err)
}
// Generate some IPs
for i := 0; i < 10; i++ {
_, err := gen.Next()
if err != nil {
break
}
}
// Save state
if err := gen.saveState(); err != nil {
t.Fatalf("Failed to save state: %v", err)
}
// Verify state file was created
files, err := filepath.Glob(filepath.Join(stateDir, "*.json"))
if err != nil {
t.Fatalf("Failed to list state files: %v", err)
}
if len(files) == 0 {
t.Error("No state file was created")
}
// Create new generator and load state
gen2, err := newIPGenerator(s, gen.consumer)
if err != nil {
t.Fatalf("Failed to create second generator: %v", err)
}
if err := gen2.loadState(); err != nil {
t.Fatalf("Failed to load state: %v", err)
}
// Verify state was loaded (should have remaining CIDRs and progress)
if len(gen2.remainingCIDRs) == 0 && len(gen.remainingCIDRs) > 0 {
t.Error("State was not properly restored")
}
}

View File

@@ -9,6 +9,8 @@ import (
"os"
"path/filepath"
"sync"
"syscall"
"time"
)
type User struct {
@@ -56,6 +58,50 @@ func (s *UserStore) hashUserID(userID string) string {
return hex.EncodeToString(hash[:])
}
// acquireFileLock attempts to acquire an exclusive lock on the store file
// Returns the file descriptor and an error if locking fails
func (s *UserStore) acquireFileLock(forWrite bool) (*os.File, error) {
lockPath := s.filePath + ".lock"
// Create lock file if it doesn't exist
lockFile, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
// Try to acquire lock with timeout
lockType := syscall.LOCK_SH // Shared lock for reads
if forWrite {
lockType = syscall.LOCK_EX // Exclusive lock for writes
}
// Use non-blocking lock with retry
maxRetries := 10
for i := 0; i < maxRetries; i++ {
err = syscall.Flock(int(lockFile.Fd()), lockType|syscall.LOCK_NB)
if err == nil {
return lockFile, nil
}
if err != syscall.EWOULDBLOCK {
lockFile.Close()
return nil, err
}
// Wait and retry
time.Sleep(100 * time.Millisecond)
}
lockFile.Close()
return nil, syscall.EWOULDBLOCK
}
// releaseFileLock releases the file lock
func (s *UserStore) releaseFileLock(lockFile *os.File) {
if lockFile != nil {
syscall.Flock(int(lockFile.Fd()), syscall.LOCK_UN)
lockFile.Close()
}
}
func (s *UserStore) Reload() error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -74,6 +120,15 @@ func (s *UserStore) loadCache() error {
}
func (s *UserStore) loadCacheInternal() error {
// Acquire shared lock for reading (allows multiple readers, blocks writers)
lockFile, err := s.acquireFileLock(false)
if err != nil {
logger.Warn("Failed to acquire read lock on user store: %v", err)
// Continue without lock - degraded mode
} else {
defer s.releaseFileLock(lockFile)
}
// Read encrypted store file
encryptedData, err := os.ReadFile(s.filePath)
if err != nil {
@@ -108,6 +163,14 @@ func (s *UserStore) loadCacheInternal() error {
}
func (s *UserStore) save() error {
// Acquire exclusive lock for writing (blocks all readers and writers)
lockFile, err := s.acquireFileLock(true)
if err != nil {
logger.Error("Failed to acquire write lock on user store: %v", err)
return err
}
defer s.releaseFileLock(lockFile)
// Build store structure from cache
store := encryptedStore{
Users: make([]encryptedUserEntry, 0, len(s.cache)),
@@ -130,10 +193,18 @@ func (s *UserStore) save() error {
return err
}
// Write to file
// Write to temp file first for atomic operation
tempPath := s.filePath + ".tmp"
logger.Info("Saving user store with %d entries", len(s.cache))
if err := os.WriteFile(s.filePath, encryptedData, 0600); err != nil {
logger.Error("Failed to write store file: %v", err)
if err := os.WriteFile(tempPath, encryptedData, 0600); err != nil {
logger.Error("Failed to write temp store file: %v", err)
return err
}
// Atomic rename
if err := os.Rename(tempPath, s.filePath); err != nil {
logger.Error("Failed to rename store file: %v", err)
os.Remove(tempPath)
return err
}

381
manager/store_test.go Normal file
View File

@@ -0,0 +1,381 @@
package main
import (
"crypto/rand"
"encoding/base64"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
// generateTestServerKey creates a test server key for crypto operations
func generateTestServerKey() string {
key := make([]byte, 32)
rand.Read(key)
return base64.StdEncoding.EncodeToString(key)
}
// TestFileLockingBasic verifies file locking works
func TestFileLockingBasic(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_lock_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
// Create test crypto instance
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Acquire read lock
lockFile, err := store.acquireFileLock(false)
if err != nil {
t.Fatalf("Failed to acquire read lock: %v", err)
}
if lockFile == nil {
t.Error("Lock file should not be nil")
}
// Release lock
store.releaseFileLock(lockFile)
}
// TestFileLockingExclusiveBlocksReaders verifies exclusive lock blocks readers
func TestFileLockingExclusiveBlocksReaders(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_exclusive_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Acquire exclusive lock
writeLock, err := store.acquireFileLock(true)
if err != nil {
t.Fatalf("Failed to acquire write lock: %v", err)
}
defer store.releaseFileLock(writeLock)
// Try to acquire read lock (should fail/timeout quickly)
done := make(chan bool)
go func() {
readLock, err := store.acquireFileLock(false)
if err == nil {
store.releaseFileLock(readLock)
t.Error("Read lock should have been blocked by write lock")
}
done <- true
}()
select {
case <-done:
// Expected - read lock was blocked
case <-time.After(2 * time.Second):
t.Error("Read lock acquisition took too long")
}
}
// TestFileLockingMultipleReaders verifies multiple readers can coexist
func TestFileLockingMultipleReaders(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_multi_read_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Acquire first read lock
lock1, err := store.acquireFileLock(false)
if err != nil {
t.Fatalf("Failed to acquire first read lock: %v", err)
}
defer store.releaseFileLock(lock1)
// Acquire second read lock (should succeed)
lock2, err := store.acquireFileLock(false)
if err != nil {
t.Fatalf("Failed to acquire second read lock: %v", err)
}
defer store.releaseFileLock(lock2)
// Both locks acquired successfully
}
// TestUserStoreAddAndGet verifies basic user storage and retrieval
func TestUserStoreAddAndGet(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_user_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
testUser := "testuser"
testSecret := "ABCDEFGHIJKLMNOP"
// Add user
if err := store.AddUser(testUser, testSecret); err != nil {
t.Fatalf("Failed to add user: %v", err)
}
// Retrieve user
user, err := store.GetUser(testUser)
if err != nil {
t.Fatalf("Failed to get user: %v", err)
}
if user == nil {
t.Fatal("User should not be nil")
}
if user.ID != testUser {
t.Errorf("User ID mismatch: expected %s, got %s", testUser, user.ID)
}
if user.TOTPSecret != testSecret {
t.Errorf("TOTP secret mismatch: expected %s, got %s", testSecret, user.TOTPSecret)
}
}
// TestUserStoreReload verifies reload doesn't lose data
func TestUserStoreReload(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_reload_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Add user
if err := store.AddUser("user1", "SECRET1"); err != nil {
t.Fatalf("Failed to add user: %v", err)
}
// Reload
if err := store.Reload(); err != nil {
t.Fatalf("Failed to reload: %v", err)
}
// Verify user still exists
user, err := store.GetUser("user1")
if err != nil {
t.Fatalf("Failed to get user after reload: %v", err)
}
if user == nil {
t.Error("User should still exist after reload")
}
}
// TestUserStoreConcurrentAccess verifies thread-safe access
func TestUserStoreConcurrentAccess(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_concurrent_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Add initial user
if err := store.AddUser("initial", "SECRET"); err != nil {
t.Fatalf("Failed to add initial user: %v", err)
}
var wg sync.WaitGroup
errors := make(chan error, 20)
// Concurrent readers
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := store.GetUser("initial")
if err != nil {
errors <- err
return
}
}
}()
}
// Concurrent writers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
userID := "user" + string(rune(id))
if err := store.AddUser(userID, "SECRET"+string(rune(id))); err != nil {
errors <- err
}
}(i)
}
wg.Wait()
close(errors)
if len(errors) > 0 {
for err := range errors {
t.Errorf("Concurrent access error: %v", err)
}
}
}
// TestUserStorePersistence verifies data survives store recreation
func TestUserStorePersistence(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_persist_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
// Create first store and add user
store1 := NewUserStore(tempDir, crypto)
if err := store1.AddUser("persistent", "SECRETDATA"); err != nil {
t.Fatalf("Failed to add user: %v", err)
}
// Create second store (simulating restart)
store2 := NewUserStore(tempDir, crypto)
// Retrieve user
user, err := store2.GetUser("persistent")
if err != nil {
t.Fatalf("Failed to get user from new store: %v", err)
}
if user == nil {
t.Error("User should persist across store instances")
}
if user.TOTPSecret != "SECRETDATA" {
t.Error("User data should match original")
}
}
// TestUserStoreFileExists verifies store file is created
func TestUserStoreFileExists(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_file_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
// Add user (triggers save)
if err := store.AddUser("filetest", "SECRET"); err != nil {
t.Fatalf("Failed to add user: %v", err)
}
// Verify file exists
expectedFile := filepath.Join(tempDir, "users.enc")
if _, err := os.Stat(expectedFile); os.IsNotExist(err) {
t.Error("Store file should have been created")
}
}
// TestGenerateSecret verifies TOTP secret generation
func TestGenerateSecret(t *testing.T) {
secret, err := generateSecret()
if err != nil {
t.Fatalf("Failed to generate secret: %v", err)
}
if len(secret) == 0 {
t.Error("Generated secret should not be empty")
}
// Base32 encoded 20 bytes should be 32 characters
expectedLength := 32
if len(secret) != expectedLength {
t.Errorf("Expected secret length %d, got %d", expectedLength, len(secret))
}
// Verify two generated secrets are different
secret2, err := generateSecret()
if err != nil {
t.Fatalf("Failed to generate second secret: %v", err)
}
if secret == secret2 {
t.Error("Generated secrets should be unique")
}
}
// TestUserHashingConsistency verifies user ID hashing is consistent
func TestUserHashingConsistency(t *testing.T) {
tempDir, err := os.MkdirTemp("", "store_hash_test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
crypto, err := NewCrypto(generateTestServerKey())
if err != nil {
t.Fatalf("Failed to create crypto: %v", err)
}
store := NewUserStore(tempDir, crypto)
userID := "testuser"
hash1 := store.hashUserID(userID)
hash2 := store.hashUserID(userID)
if hash1 != hash2 {
t.Error("Same user ID should produce same hash")
}
// Different user should produce different hash
hash3 := store.hashUserID("differentuser")
if hash1 == hash3 {
t.Error("Different users should produce different hashes")
}
}

View File

@@ -77,10 +77,11 @@ var (
dbMux sync.RWMutex
stats Stats
statsMux sync.RWMutex
sentHops = make(map[string]bool) // Track sent hops to avoid duplicates
sentHops = make(map[string]time.Time) // Track sent hops with timestamp for eviction
sentHopsMux sync.RWMutex
verbose bool
startTime time.Time
sentHopsTTL = 24 * time.Hour // Time-to-live for hop deduplication cache
)
func main() {
@@ -168,6 +169,9 @@ func main() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Start cleanup goroutine for sentHops map to prevent unbounded growth
go cleanupSentHops()
go func() {
log.Printf("🚀 Output Service v%s starting...", VERSION)
log.Printf("📥 Listening for results on http://localhost:%d/results", config.Port)
@@ -549,11 +553,14 @@ func extractAndSendHops(result *PingResult) {
var newHops []string
sentHopsMux.Lock()
now := time.Now()
for _, hop := range result.Traceroute.Hops {
if hop.IP != "" && !hop.Timeout && hop.IP != "*" {
if !sentHops[hop.IP] {
// Check if we've seen this hop recently (within TTL)
lastSent, exists := sentHops[hop.IP]
if !exists || now.Sub(lastSent) > sentHopsTTL {
newHops = append(newHops, hop.IP)
sentHops[hop.IP] = true
sentHops[hop.IP] = now
statsMux.Lock()
stats.HopsDiscovered++
@@ -602,6 +609,31 @@ func extractAndSendHops(result *PingResult) {
}
}
// cleanupSentHops periodically removes old entries from sentHops map to prevent unbounded growth
func cleanupSentHops() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for range ticker.C {
sentHopsMux.Lock()
now := time.Now()
removed := 0
for ip, timestamp := range sentHops {
if now.Sub(timestamp) > sentHopsTTL {
delete(sentHops, ip)
removed++
}
}
if verbose && removed > 0 {
log.Printf("🧹 Cleaned up %d expired hop entries (total: %d)", removed, len(sentHops))
}
sentHopsMux.Unlock()
}
}
func handleRotate(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)

View File

@@ -79,6 +79,10 @@ var (
startTime time.Time
health HealthStatus
healthMux sync.RWMutex
// HTTP client with timeout to prevent indefinite hangs
httpClient = &http.Client{
Timeout: 30 * time.Second,
}
)
func main() {
@@ -460,7 +464,7 @@ func handleSocket(path string, data []byte, mode string) ([]byte, error) {
func readSource(src string) ([]byte, error) {
if strings.HasPrefix(src, "http") {
resp, err := http.Get(src)
resp, err := httpClient.Get(src)
if err != nil {
return nil, err
}
@@ -477,7 +481,7 @@ func readSource(src string) ([]byte, error) {
func writeDestination(dest string, data []byte) error {
if strings.HasPrefix(dest, "http") {
resp, err := http.Post(dest, "application/json", bytes.NewBuffer(data))
resp, err := httpClient.Post(dest, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}

150
ping_service_test.go Normal file
View File

@@ -0,0 +1,150 @@
package main
import (
"net/http"
"net/http/httptest"
"testing"
"time"
)
// TestHTTPClientTimeout verifies that the HTTP client has a timeout configured
func TestHTTPClientTimeout(t *testing.T) {
if httpClient.Timeout == 0 {
t.Error("HTTP client timeout is not configured")
}
expectedTimeout := 30 * time.Second
if httpClient.Timeout != expectedTimeout {
t.Errorf("HTTP client timeout = %v, want %v", httpClient.Timeout, expectedTimeout)
}
}
// TestHTTPClientTimeoutActuallyWorks verifies the timeout actually prevents indefinite hangs
func TestHTTPClientTimeoutActuallyWorks(t *testing.T) {
// Create a server that delays response longer than timeout
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(35 * time.Second) // Sleep longer than our 30s timeout
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
start := time.Now()
_, err := httpClient.Get(server.URL)
duration := time.Since(start)
if err == nil {
t.Error("Expected timeout error, got nil")
}
// Should timeout in ~30 seconds, give 3s buffer for slow systems
if duration < 28*time.Second || duration > 33*time.Second {
t.Logf("Request took %v (expected ~30s)", duration)
}
}
// TestCooldownCacheBasic verifies basic cooldown functionality
func TestCooldownCacheBasic(t *testing.T) {
cacheMux.Lock()
cooldownCache = make(map[string]time.Time) // Reset
cacheMux.Unlock()
ip := "192.0.2.1"
// First check - should be allowed
if isInCooldown(ip, 10) {
t.Error("IP should not be in cooldown on first check")
}
// Add to cache
cacheMux.Lock()
cooldownCache[ip] = time.Now()
cacheMux.Unlock()
// Second check - should be in cooldown
if !isInCooldown(ip, 10) {
t.Error("IP should be in cooldown after being added")
}
// Wait for cooldown to expire
cacheMux.Lock()
cooldownCache[ip] = time.Now().Add(-11 * time.Minute)
cacheMux.Unlock()
// Third check - should be allowed again
if isInCooldown(ip, 10) {
t.Error("IP should not be in cooldown after expiry")
}
}
// TestCooldownCacheConcurrency verifies thread-safe cache access
func TestCooldownCacheConcurrency(t *testing.T) {
cacheMux.Lock()
cooldownCache = make(map[string]time.Time)
cacheMux.Unlock()
done := make(chan bool)
// Spawn multiple goroutines accessing cache concurrently
for i := 0; i < 10; i++ {
go func(id int) {
for j := 0; j < 100; j++ {
ip := "192.0.2." + string(rune(id))
isInCooldown(ip, 10)
cacheMux.Lock()
cooldownCache[ip] = time.Now()
cacheMux.Unlock()
}
done <- true
}(i)
}
// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}
// If we got here without a race condition, test passes
}
// Helper function from ping_service.go
func isInCooldown(ip string, cooldownMinutes int) bool {
cacheMux.Lock()
defer cacheMux.Unlock()
lastPing, exists := cooldownCache[ip]
if !exists {
return false
}
elapsed := time.Since(lastPing)
cooldownDuration := time.Duration(cooldownMinutes) * time.Minute
return elapsed < cooldownDuration
}
// TestConfigParsing verifies config file parsing works correctly
func TestConfigDefaults(t *testing.T) {
config := Config{
IntervalSeconds: 30,
CooldownMinutes: 10,
EnableTraceroute: true,
TracerouteMaxHops: 30,
HealthCheckPort: 8090,
}
if config.IntervalSeconds <= 0 {
t.Error("IntervalSeconds should be positive")
}
if config.CooldownMinutes <= 0 {
t.Error("CooldownMinutes should be positive")
}
if config.TracerouteMaxHops <= 0 || config.TracerouteMaxHops > 255 {
t.Error("TracerouteMaxHops should be between 1 and 255")
}
if config.HealthCheckPort <= 0 || config.HealthCheckPort > 65535 {
t.Error("HealthCheckPort should be between 1 and 65535")
}
}