package main import ( "crypto/tls" "encoding/json" "fmt" "io" "net/http" "os" "sync" "time" ) // WorkerType represents the type of service type WorkerType string const ( WorkerTypeInput WorkerType = "input" WorkerTypePing WorkerType = "ping" WorkerTypeOutput WorkerType = "output" ) // WorkerInstance represents a registered service instance type WorkerInstance struct { ID string `json:"id"` Name string `json:"name"` Type WorkerType `json:"type"` URL string `json:"url"` // Base URL (e.g., http://10.0.0.5:8080) Location string `json:"location,omitempty"` Description string `json:"description,omitempty"` AddedAt time.Time `json:"added_at"` // Health status (updated by poller) Healthy bool `json:"healthy"` LastCheck time.Time `json:"last_check"` LastError string `json:"last_error,omitempty"` ResponseTime int64 `json:"response_time_ms,omitempty"` // Service-specific stats (from health endpoints) Stats map[string]interface{} `json:"stats,omitempty"` } // WorkerStore manages worker instances type WorkerStore struct { workers map[string]*WorkerInstance mu sync.RWMutex file string } func NewWorkerStore(filename string) *WorkerStore { ws := &WorkerStore{ workers: make(map[string]*WorkerInstance), file: filename, } ws.load() return ws } func (ws *WorkerStore) Add(worker *WorkerInstance) error { ws.mu.Lock() defer ws.mu.Unlock() if worker.ID == "" { worker.ID = fmt.Sprintf("%s-%d", worker.Type, time.Now().Unix()) } if worker.AddedAt.IsZero() { worker.AddedAt = time.Now() } ws.workers[worker.ID] = worker return ws.save() } func (ws *WorkerStore) Remove(id string) error { ws.mu.Lock() defer ws.mu.Unlock() delete(ws.workers, id) return ws.save() } func (ws *WorkerStore) Get(id string) (*WorkerInstance, bool) { ws.mu.RLock() defer ws.mu.RUnlock() worker, ok := ws.workers[id] return worker, ok } func (ws *WorkerStore) List() []*WorkerInstance { ws.mu.RLock() defer ws.mu.RUnlock() list := make([]*WorkerInstance, 0, len(ws.workers)) for _, worker := range ws.workers { list = append(list, worker) } return list } func (ws *WorkerStore) UpdateHealth(id string, healthy bool, responseTime int64, err error, stats map[string]interface{}) { ws.mu.Lock() defer ws.mu.Unlock() worker, ok := ws.workers[id] if !ok { return } worker.Healthy = healthy worker.LastCheck = time.Now() worker.ResponseTime = responseTime worker.Stats = stats if err != nil { worker.LastError = err.Error() } else { worker.LastError = "" } } func (ws *WorkerStore) save() error { data, err := json.MarshalIndent(ws.workers, "", " ") if err != nil { return err } return os.WriteFile(ws.file, data, 0600) } func (ws *WorkerStore) load() error { data, err := os.ReadFile(ws.file) if err != nil { if os.IsNotExist(err) { return nil // File doesn't exist yet, that's okay } return err } return json.Unmarshal(data, &ws.workers) } // HealthPoller periodically checks worker health type HealthPoller struct { store *WorkerStore interval time.Duration stop chan struct{} wg sync.WaitGroup } func NewHealthPoller(store *WorkerStore, interval time.Duration) *HealthPoller { return &HealthPoller{ store: store, interval: interval, stop: make(chan struct{}), } } func (hp *HealthPoller) Start() { hp.wg.Add(1) go func() { defer hp.wg.Done() // Initial check hp.checkAll() ticker := time.NewTicker(hp.interval) defer ticker.Stop() for { select { case <-ticker.C: hp.checkAll() case <-hp.stop: return } } }() } func (hp *HealthPoller) Stop() { close(hp.stop) hp.wg.Wait() } func (hp *HealthPoller) checkAll() { workers := hp.store.List() for _, worker := range workers { go hp.checkWorker(worker) } } func (hp *HealthPoller) checkWorker(worker *WorkerInstance) { start := time.Now() // Determine health endpoint based on worker type var healthURL string switch worker.Type { case WorkerTypeInput: healthURL = fmt.Sprintf("%s/status", worker.URL) case WorkerTypePing: healthURL = fmt.Sprintf("%s/health", worker.URL) case WorkerTypeOutput: healthURL = fmt.Sprintf("%s/health", worker.URL) default: healthURL = fmt.Sprintf("%s/health", worker.URL) } // Create HTTP client with TLS skip verify (for self-signed certs) transport := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } client := &http.Client{ Timeout: 10 * time.Second, Transport: transport, } resp, err := client.Get(healthURL) responseTime := time.Since(start).Milliseconds() if err != nil { hp.store.UpdateHealth(worker.ID, false, responseTime, err, nil) logger.Warn("Health check failed for %s (%s): %v", worker.Name, worker.ID, err) return } defer resp.Body.Close() // Read response body, err := io.ReadAll(resp.Body) if err != nil { hp.store.UpdateHealth(worker.ID, false, responseTime, err, nil) return } // Check status code if resp.StatusCode != 200 { err := fmt.Errorf("HTTP %d", resp.StatusCode) hp.store.UpdateHealth(worker.ID, false, responseTime, err, nil) return } // Try to parse stats from response var stats map[string]interface{} if err := json.Unmarshal(body, &stats); err == nil { hp.store.UpdateHealth(worker.ID, true, responseTime, nil, stats) } else { // If not JSON, just mark as healthy hp.store.UpdateHealth(worker.ID, true, responseTime, nil, nil) } } // GetDashboardStats aggregates statistics for the dashboard func (ws *WorkerStore) GetDashboardStats() map[string]interface{} { ws.mu.RLock() defer ws.mu.RUnlock() stats := map[string]interface{}{ "total_workers": len(ws.workers), "by_type": make(map[WorkerType]int), "healthy": 0, "unhealthy": 0, "total_pings": int64(0), "total_results": int64(0), } byType := stats["by_type"].(map[WorkerType]int) for _, worker := range ws.workers { byType[worker.Type]++ if worker.Healthy { stats["healthy"] = stats["healthy"].(int) + 1 } else { stats["unhealthy"] = stats["unhealthy"].(int) + 1 } // Aggregate service-specific stats if worker.Stats != nil { if worker.Type == WorkerTypePing { if totalPings, ok := worker.Stats["total_pings"].(float64); ok { stats["total_pings"] = stats["total_pings"].(int64) + int64(totalPings) } } else if worker.Type == WorkerTypeOutput { if totalResults, ok := worker.Stats["total_results"].(float64); ok { stats["total_results"] = stats["total_results"].(int64) + int64(totalResults) } } } } return stats }