Add input service state export and import functionality. Add /status endpoint. Add persistent state (write state to a file).

This commit is contained in:
Kalzu Rekku
2025-12-31 21:52:38 +02:00
parent 5dbc0392f0
commit ec9fec5ce3

View File

@@ -3,6 +3,7 @@ package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
@@ -20,8 +21,24 @@ import (
const (
repoDir = "cloud-provider-ip-addresses"
port = 8080
stateDir = "progress_state"
saveInterval = 30 * time.Second
)
// GeneratorState represents the serializable state of a generator
type GeneratorState struct {
CurrentFile int `json:"current_file"`
CurrentCIDRs []string `json:"current_cidrs"`
ActiveGenStates []HostGenState `json:"active_gen_states"`
CIDRFiles []string `json:"cidr_files"`
}
type HostGenState struct {
CIDR string `json:"cidr"`
Current string `json:"current"`
Done bool `json:"done"`
}
// IPGenerator generates IPs from CIDR ranges lazily
type IPGenerator struct {
mu sync.Mutex
@@ -31,9 +48,12 @@ type IPGenerator struct {
activeGens []*hostGenerator
rng *rand.Rand
totalCIDRsCount int
consumer string
dirty bool
}
type hostGenerator struct {
cidr string
network *net.IPNet
current net.IP
done bool
@@ -58,6 +78,7 @@ func newHostGenerator(cidr string) (*hostGenerator, error) {
ones, bits := network.Mask.Size()
hg := &hostGenerator{
cidr: cidr,
network: network,
current: make(net.IP, len(network.IP)),
}
@@ -131,11 +152,27 @@ func (hg *hostGenerator) next() (string, bool) {
return ip, true
}
func newIPGenerator() (*IPGenerator, error) {
func (hg *hostGenerator) getState() HostGenState {
return HostGenState{
CIDR: hg.cidr,
Current: hg.current.String(),
Done: hg.done,
}
}
func newIPGenerator(consumer string) (*IPGenerator, error) {
gen := &IPGenerator{
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
consumer: consumer,
}
// Try to load existing state
if err := gen.loadState(); err == nil {
log.Printf("📂 Loaded saved state for consumer: %s", consumer)
return gen, nil
}
// No saved state, initialize fresh
// Find all IP files
err := filepath.Walk(repoDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
@@ -160,6 +197,7 @@ func newIPGenerator() (*IPGenerator, error) {
return nil, err
}
log.Printf("🆕 New generator for %s: %d IP files, %d CIDRs", consumer, len(gen.cidrFiles), gen.totalCIDRsCount)
log.Printf("📁 Found %d IP files", len(gen.cidrFiles))
log.Printf("📊 Total CIDRs discovered: %d", gen.totalCIDRsCount)
@@ -230,6 +268,7 @@ func (g *IPGenerator) loadNextFile() error {
g.activeGens = append(g.activeGens, gen)
}
g.dirty = true
return nil
}
@@ -256,23 +295,179 @@ func (g *IPGenerator) Next() (string, error) {
if !ok {
// This generator is exhausted, remove it
g.activeGens = append(g.activeGens[:idx], g.activeGens[idx+1:]...)
g.dirty = true
continue
}
g.dirty = true
return ip, nil
}
}
func (g *IPGenerator) getState() GeneratorState {
g.mu.Lock()
defer g.mu.Unlock()
activeStates := make([]HostGenState, len(g.activeGens))
for i, gen := range g.activeGens {
activeStates[i] = gen.getState()
}
return GeneratorState{
CurrentFile: g.currentFile,
CurrentCIDRs: g.currentCIDRs,
ActiveGenStates: activeStates,
CIDRFiles: g.cidrFiles,
}
}
func (g *IPGenerator) saveState() error {
if !g.dirty {
return nil
}
state := g.getState()
// Ensure state directory exists
if err := os.MkdirAll(stateDir, 0755); err != nil {
return fmt.Errorf("failed to create state directory: %w", err)
}
// Use consumer as filename (sanitize for filesystem)
filename := strings.ReplaceAll(g.consumer, ":", "_")
filename = strings.ReplaceAll(filename, "/", "_")
filepath := filepath.Join(stateDir, filename+".json")
// Write to temp file first, then rename for atomic write
tempPath := filepath + ".tmp"
file, err := os.Create(tempPath)
if err != nil {
return fmt.Errorf("failed to create temp state file: %w", err)
}
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(state); err != nil {
file.Close()
os.Remove(tempPath)
return fmt.Errorf("failed to encode state: %w", err)
}
if err := file.Close(); err != nil {
os.Remove(tempPath)
return fmt.Errorf("failed to close temp state file: %w", err)
}
if err := os.Rename(tempPath, filepath); err != nil {
os.Remove(tempPath)
return fmt.Errorf("failed to rename state file: %w", err)
}
g.dirty = false
return nil
}
func (g *IPGenerator) loadState() error {
// Use consumer as filename (sanitize for filesystem)
filename := strings.ReplaceAll(g.consumer, ":", "_")
filename = strings.ReplaceAll(filename, "/", "_")
filepath := filepath.Join(stateDir, filename+".json")
file, err := os.Open(filepath)
if err != nil {
return err
}
defer file.Close()
var state GeneratorState
if err := json.NewDecoder(file).Decode(&state); err != nil {
return fmt.Errorf("failed to decode state: %w", err)
}
// Restore state
g.cidrFiles = state.CIDRFiles
g.currentFile = state.CurrentFile
g.currentCIDRs = state.CurrentCIDRs
g.totalCIDRsCount = len(state.CurrentCIDRs)
// Rebuild active generators from state
g.activeGens = make([]*hostGenerator, 0, len(state.ActiveGenStates))
for _, genState := range state.ActiveGenStates {
gen, err := newHostGenerator(genState.CIDR)
if err != nil {
continue
}
// Restore current IP position
gen.current = net.ParseIP(genState.Current)
if gen.current == nil {
continue
}
gen.done = genState.Done
g.activeGens = append(g.activeGens, gen)
}
return nil
}
// Server holds per-consumer generators
type Server struct {
generators map[string]*IPGenerator
mu sync.RWMutex
stopSaver chan struct{}
wg sync.WaitGroup
}
func newServer() *Server {
return &Server{
s := &Server{
generators: make(map[string]*IPGenerator),
stopSaver: make(chan struct{}),
}
s.startPeriodicSaver()
return s
}
func (s *Server) startPeriodicSaver() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
ticker := time.NewTicker(saveInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.saveAllStates()
case <-s.stopSaver:
// Final save before shutdown
s.saveAllStates()
return
}
}
}()
}
func (s *Server) saveAllStates() {
s.mu.RLock()
generators := make([]*IPGenerator, 0, len(s.generators))
for _, gen := range s.generators {
generators = append(generators, gen)
}
s.mu.RUnlock()
for _, gen := range generators {
if err := gen.saveState(); err != nil {
log.Printf("⚠️ Failed to save state for %s: %v", gen.consumer, err)
}
}
}
func (s *Server) shutdown() {
close(s.stopSaver)
s.wg.Wait()
log.Println("💾 All states saved")
}
func (s *Server) getGenerator(consumer string) (*IPGenerator, error) {
@@ -293,7 +488,7 @@ func (s *Server) getGenerator(consumer string) (*IPGenerator, error) {
return gen, nil
}
newGen, err := newIPGenerator()
newGen, err := newIPGenerator(consumer)
if err != nil {
return nil, err
}
@@ -335,6 +530,160 @@ func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s\n", ip)
}
type ConsumerStatus struct {
Consumer string `json:"consumer"`
CurrentFile int `json:"current_file"`
TotalFiles int `json:"total_files"`
ActiveCIDRs int `json:"active_cidrs"`
TotalCIDRs int `json:"total_cidrs_discovered"`
CurrentFilePath string `json:"current_file_path,omitempty"`
}
type StatusResponse struct {
TotalConsumers int `json:"total_consumers"`
Consumers []ConsumerStatus `json:"consumers"`
StateDirectory string `json:"state_directory"`
SaveInterval string `json:"save_interval"`
}
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
s.mu.RLock()
defer s.mu.RUnlock()
response := StatusResponse{
TotalConsumers: len(s.generators),
Consumers: make([]ConsumerStatus, 0, len(s.generators)),
StateDirectory: stateDir,
SaveInterval: saveInterval.String(),
}
for consumer, gen := range s.generators {
gen.mu.Lock()
status := ConsumerStatus{
Consumer: consumer,
CurrentFile: gen.currentFile,
TotalFiles: len(gen.cidrFiles),
ActiveCIDRs: len(gen.activeGens),
TotalCIDRs: gen.totalCIDRsCount,
}
if gen.currentFile > 0 && gen.currentFile <= len(gen.cidrFiles) {
status.CurrentFilePath = gen.cidrFiles[gen.currentFile-1]
}
gen.mu.Unlock()
response.Consumers = append(response.Consumers, status)
}
w.Header().Set("Content-Type", "application/json")
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
if err := encoder.Encode(response); err != nil {
log.Printf("❌ Failed to encode status response: %v", err)
}
}
type ExportResponse struct {
ExportedAt time.Time `json:"exported_at"`
States map[string]GeneratorState `json:"states"`
}
func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Force save all current states first
s.saveAllStates()
s.mu.RLock()
defer s.mu.RUnlock()
response := ExportResponse{
ExportedAt: time.Now(),
States: make(map[string]GeneratorState),
}
for consumer, gen := range s.generators {
response.States[consumer] = gen.getState()
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=state-export-%s.json",
time.Now().Format("2006-01-02-150405")))
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
if err := encoder.Encode(response); err != nil {
log.Printf("❌ Failed to encode export response: %v", err)
}
}
func (s *Server) handleImport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var exportData ExportResponse
if err := json.NewDecoder(r.Body).Decode(&exportData); err != nil {
http.Error(w, fmt.Sprintf("Failed to decode import data: %v", err), http.StatusBadRequest)
return
}
// Ensure state directory exists
if err := os.MkdirAll(stateDir, 0755); err != nil {
http.Error(w, fmt.Sprintf("Failed to create state directory: %v", err), http.StatusInternalServerError)
return
}
imported := 0
failed := 0
for consumer, state := range exportData.States {
// Sanitize consumer name for filename
filename := strings.ReplaceAll(consumer, ":", "_")
filename = strings.ReplaceAll(filename, "/", "_")
filepath := filepath.Join(stateDir, filename+".json")
file, err := os.Create(filepath)
if err != nil {
log.Printf("⚠️ Failed to create state file for %s: %v", consumer, err)
failed++
continue
}
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(state); err != nil {
file.Close()
log.Printf("⚠️ Failed to encode state for %s: %v", consumer, err)
failed++
continue
}
file.Close()
imported++
}
response := map[string]interface{}{
"imported": imported,
"failed": failed,
"total": len(exportData.States),
"message": fmt.Sprintf("Successfully imported %d consumer states", imported),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
log.Printf("📥 Imported %d consumer states (%d failed)", imported, failed)
}
func main() {
// Check if repo directory exists
if _, err := os.Stat(repoDir); os.IsNotExist(err) {
@@ -345,6 +694,9 @@ func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", server.handleRequest)
mux.HandleFunc("/status", server.handleStatus)
mux.HandleFunc("/export", server.handleExport)
mux.HandleFunc("/import", server.handleImport)
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", port),
@@ -365,6 +717,9 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Stop periodic saver and save final state
server.shutdown()
if err := httpServer.Shutdown(ctx); err != nil {
log.Printf("❌ Error during shutdown: %v", err)
}
@@ -373,6 +728,10 @@ func main() {
log.Printf("🌐 HTTP Input Server running on http://localhost:%d", port)
log.Printf(" Serving individual IPv4 host addresses lazily")
log.Printf(" In highly mixed random order per consumer")
log.Printf(" 💾 Progress saved every %v to '%s' directory", saveInterval, stateDir)
log.Printf(" 📊 Status endpoint: http://localhost:%d/status", port)
log.Printf(" 📤 Export endpoint: http://localhost:%d/export", port)
log.Printf(" 📥 Import endpoint: http://localhost:%d/import (POST)", port)
log.Printf(" Press Ctrl+C to stop")
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {