From ec9fec5ce3216d75a64c963c5f65914a5d00d5f4 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Wed, 31 Dec 2025 21:52:38 +0200 Subject: [PATCH] Add input service state export and import functionality. Add /status endpoint. Add persistent state (write state to a file). --- input_service/http_input_service.go | 373 +++++++++++++++++++++++++++- 1 file changed, 366 insertions(+), 7 deletions(-) diff --git a/input_service/http_input_service.go b/input_service/http_input_service.go index f6bbe9f..16ec888 100644 --- a/input_service/http_input_service.go +++ b/input_service/http_input_service.go @@ -3,6 +3,7 @@ package main import ( "bufio" "context" + "encoding/json" "fmt" "log" "math/rand" @@ -18,10 +19,26 @@ import ( ) const ( - repoDir = "cloud-provider-ip-addresses" - port = 8080 + repoDir = "cloud-provider-ip-addresses" + port = 8080 + stateDir = "progress_state" + saveInterval = 30 * time.Second ) +// GeneratorState represents the serializable state of a generator +type GeneratorState struct { + CurrentFile int `json:"current_file"` + CurrentCIDRs []string `json:"current_cidrs"` + ActiveGenStates []HostGenState `json:"active_gen_states"` + CIDRFiles []string `json:"cidr_files"` +} + +type HostGenState struct { + CIDR string `json:"cidr"` + Current string `json:"current"` + Done bool `json:"done"` +} + // IPGenerator generates IPs from CIDR ranges lazily type IPGenerator struct { mu sync.Mutex @@ -31,9 +48,12 @@ type IPGenerator struct { activeGens []*hostGenerator rng *rand.Rand totalCIDRsCount int + consumer string + dirty bool } type hostGenerator struct { + cidr string network *net.IPNet current net.IP done bool @@ -58,6 +78,7 @@ func newHostGenerator(cidr string) (*hostGenerator, error) { ones, bits := network.Mask.Size() hg := &hostGenerator{ + cidr: cidr, network: network, current: make(net.IP, len(network.IP)), } @@ -131,11 +152,27 @@ func (hg *hostGenerator) next() (string, bool) { return ip, true } -func newIPGenerator() (*IPGenerator, error) { +func (hg *hostGenerator) getState() HostGenState { + return HostGenState{ + CIDR: hg.cidr, + Current: hg.current.String(), + Done: hg.done, + } +} + +func newIPGenerator(consumer string) (*IPGenerator, error) { gen := &IPGenerator{ - rng: rand.New(rand.NewSource(time.Now().UnixNano())), + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + consumer: consumer, } + // Try to load existing state + if err := gen.loadState(); err == nil { + log.Printf("📂 Loaded saved state for consumer: %s", consumer) + return gen, nil + } + + // No saved state, initialize fresh // Find all IP files err := filepath.Walk(repoDir, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -160,6 +197,7 @@ func newIPGenerator() (*IPGenerator, error) { return nil, err } + log.Printf("🆕 New generator for %s: %d IP files, %d CIDRs", consumer, len(gen.cidrFiles), gen.totalCIDRsCount) log.Printf("📁 Found %d IP files", len(gen.cidrFiles)) log.Printf("📊 Total CIDRs discovered: %d", gen.totalCIDRsCount) @@ -230,6 +268,7 @@ func (g *IPGenerator) loadNextFile() error { g.activeGens = append(g.activeGens, gen) } + g.dirty = true return nil } @@ -256,23 +295,179 @@ func (g *IPGenerator) Next() (string, error) { if !ok { // This generator is exhausted, remove it g.activeGens = append(g.activeGens[:idx], g.activeGens[idx+1:]...) + g.dirty = true continue } + g.dirty = true return ip, nil } } +func (g *IPGenerator) getState() GeneratorState { + g.mu.Lock() + defer g.mu.Unlock() + + activeStates := make([]HostGenState, len(g.activeGens)) + for i, gen := range g.activeGens { + activeStates[i] = gen.getState() + } + + return GeneratorState{ + CurrentFile: g.currentFile, + CurrentCIDRs: g.currentCIDRs, + ActiveGenStates: activeStates, + CIDRFiles: g.cidrFiles, + } +} + +func (g *IPGenerator) saveState() error { + if !g.dirty { + return nil + } + + state := g.getState() + + // Ensure state directory exists + if err := os.MkdirAll(stateDir, 0755); err != nil { + return fmt.Errorf("failed to create state directory: %w", err) + } + + // Use consumer as filename (sanitize for filesystem) + filename := strings.ReplaceAll(g.consumer, ":", "_") + filename = strings.ReplaceAll(filename, "/", "_") + filepath := filepath.Join(stateDir, filename+".json") + + // Write to temp file first, then rename for atomic write + tempPath := filepath + ".tmp" + file, err := os.Create(tempPath) + if err != nil { + return fmt.Errorf("failed to create temp state file: %w", err) + } + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(state); err != nil { + file.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to encode state: %w", err) + } + + if err := file.Close(); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to close temp state file: %w", err) + } + + if err := os.Rename(tempPath, filepath); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to rename state file: %w", err) + } + + g.dirty = false + return nil +} + +func (g *IPGenerator) loadState() error { + // Use consumer as filename (sanitize for filesystem) + filename := strings.ReplaceAll(g.consumer, ":", "_") + filename = strings.ReplaceAll(filename, "/", "_") + filepath := filepath.Join(stateDir, filename+".json") + + file, err := os.Open(filepath) + if err != nil { + return err + } + defer file.Close() + + var state GeneratorState + if err := json.NewDecoder(file).Decode(&state); err != nil { + return fmt.Errorf("failed to decode state: %w", err) + } + + // Restore state + g.cidrFiles = state.CIDRFiles + g.currentFile = state.CurrentFile + g.currentCIDRs = state.CurrentCIDRs + g.totalCIDRsCount = len(state.CurrentCIDRs) + + // Rebuild active generators from state + g.activeGens = make([]*hostGenerator, 0, len(state.ActiveGenStates)) + for _, genState := range state.ActiveGenStates { + gen, err := newHostGenerator(genState.CIDR) + if err != nil { + continue + } + + // Restore current IP position + gen.current = net.ParseIP(genState.Current) + if gen.current == nil { + continue + } + gen.done = genState.Done + + g.activeGens = append(g.activeGens, gen) + } + + return nil +} + // Server holds per-consumer generators type Server struct { generators map[string]*IPGenerator mu sync.RWMutex + stopSaver chan struct{} + wg sync.WaitGroup } func newServer() *Server { - return &Server{ + s := &Server{ generators: make(map[string]*IPGenerator), + stopSaver: make(chan struct{}), } + + s.startPeriodicSaver() + return s +} + +func (s *Server) startPeriodicSaver() { + s.wg.Add(1) + go func() { + defer s.wg.Done() + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.saveAllStates() + case <-s.stopSaver: + // Final save before shutdown + s.saveAllStates() + return + } + } + }() +} + +func (s *Server) saveAllStates() { + s.mu.RLock() + generators := make([]*IPGenerator, 0, len(s.generators)) + for _, gen := range s.generators { + generators = append(generators, gen) + } + s.mu.RUnlock() + + for _, gen := range generators { + if err := gen.saveState(); err != nil { + log.Printf("⚠️ Failed to save state for %s: %v", gen.consumer, err) + } + } +} + +func (s *Server) shutdown() { + close(s.stopSaver) + s.wg.Wait() + log.Println("💾 All states saved") } func (s *Server) getGenerator(consumer string) (*IPGenerator, error) { @@ -293,7 +488,7 @@ func (s *Server) getGenerator(consumer string) (*IPGenerator, error) { return gen, nil } - newGen, err := newIPGenerator() + newGen, err := newIPGenerator(consumer) if err != nil { return nil, err } @@ -335,6 +530,160 @@ func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s\n", ip) } +type ConsumerStatus struct { + Consumer string `json:"consumer"` + CurrentFile int `json:"current_file"` + TotalFiles int `json:"total_files"` + ActiveCIDRs int `json:"active_cidrs"` + TotalCIDRs int `json:"total_cidrs_discovered"` + CurrentFilePath string `json:"current_file_path,omitempty"` +} + +type StatusResponse struct { + TotalConsumers int `json:"total_consumers"` + Consumers []ConsumerStatus `json:"consumers"` + StateDirectory string `json:"state_directory"` + SaveInterval string `json:"save_interval"` +} + +func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + s.mu.RLock() + defer s.mu.RUnlock() + + response := StatusResponse{ + TotalConsumers: len(s.generators), + Consumers: make([]ConsumerStatus, 0, len(s.generators)), + StateDirectory: stateDir, + SaveInterval: saveInterval.String(), + } + + for consumer, gen := range s.generators { + gen.mu.Lock() + status := ConsumerStatus{ + Consumer: consumer, + CurrentFile: gen.currentFile, + TotalFiles: len(gen.cidrFiles), + ActiveCIDRs: len(gen.activeGens), + TotalCIDRs: gen.totalCIDRsCount, + } + if gen.currentFile > 0 && gen.currentFile <= len(gen.cidrFiles) { + status.CurrentFilePath = gen.cidrFiles[gen.currentFile-1] + } + gen.mu.Unlock() + + response.Consumers = append(response.Consumers, status) + } + + w.Header().Set("Content-Type", "application/json") + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + if err := encoder.Encode(response); err != nil { + log.Printf("❌ Failed to encode status response: %v", err) + } +} + +type ExportResponse struct { + ExportedAt time.Time `json:"exported_at"` + States map[string]GeneratorState `json:"states"` +} + +func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Force save all current states first + s.saveAllStates() + + s.mu.RLock() + defer s.mu.RUnlock() + + response := ExportResponse{ + ExportedAt: time.Now(), + States: make(map[string]GeneratorState), + } + + for consumer, gen := range s.generators { + response.States[consumer] = gen.getState() + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=state-export-%s.json", + time.Now().Format("2006-01-02-150405"))) + + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + if err := encoder.Encode(response); err != nil { + log.Printf("❌ Failed to encode export response: %v", err) + } +} + +func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var exportData ExportResponse + if err := json.NewDecoder(r.Body).Decode(&exportData); err != nil { + http.Error(w, fmt.Sprintf("Failed to decode import data: %v", err), http.StatusBadRequest) + return + } + + // Ensure state directory exists + if err := os.MkdirAll(stateDir, 0755); err != nil { + http.Error(w, fmt.Sprintf("Failed to create state directory: %v", err), http.StatusInternalServerError) + return + } + + imported := 0 + failed := 0 + + for consumer, state := range exportData.States { + // Sanitize consumer name for filename + filename := strings.ReplaceAll(consumer, ":", "_") + filename = strings.ReplaceAll(filename, "/", "_") + filepath := filepath.Join(stateDir, filename+".json") + + file, err := os.Create(filepath) + if err != nil { + log.Printf("⚠️ Failed to create state file for %s: %v", consumer, err) + failed++ + continue + } + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(state); err != nil { + file.Close() + log.Printf("⚠️ Failed to encode state for %s: %v", consumer, err) + failed++ + continue + } + + file.Close() + imported++ + } + + response := map[string]interface{}{ + "imported": imported, + "failed": failed, + "total": len(exportData.States), + "message": fmt.Sprintf("Successfully imported %d consumer states", imported), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + + log.Printf("📥 Imported %d consumer states (%d failed)", imported, failed) +} + func main() { // Check if repo directory exists if _, err := os.Stat(repoDir); os.IsNotExist(err) { @@ -345,6 +694,9 @@ func main() { mux := http.NewServeMux() mux.HandleFunc("/", server.handleRequest) + mux.HandleFunc("/status", server.handleStatus) + mux.HandleFunc("/export", server.handleExport) + mux.HandleFunc("/import", server.handleImport) httpServer := &http.Server{ Addr: fmt.Sprintf(":%d", port), @@ -365,6 +717,9 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + // Stop periodic saver and save final state + server.shutdown() + if err := httpServer.Shutdown(ctx); err != nil { log.Printf("❌ Error during shutdown: %v", err) } @@ -373,6 +728,10 @@ func main() { log.Printf("🌐 HTTP Input Server running on http://localhost:%d", port) log.Printf(" Serving individual IPv4 host addresses lazily") log.Printf(" In highly mixed random order per consumer") + log.Printf(" 💾 Progress saved every %v to '%s' directory", saveInterval, stateDir) + log.Printf(" 📊 Status endpoint: http://localhost:%d/status", port) + log.Printf(" 📤 Export endpoint: http://localhost:%d/export", port) + log.Printf(" 📥 Import endpoint: http://localhost:%d/import (POST)", port) log.Printf(" Press Ctrl+C to stop") if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -380,4 +739,4 @@ func main() { } log.Println("✅ Server stopped cleanly") -} \ No newline at end of file +}