package cluster import ( "bytes" "encoding/json" "fmt" "net/http" "time" "github.com/sirupsen/logrus" "kvs/types" ) // BootstrapService handles cluster joining and initial synchronization type BootstrapService struct { config *types.Config gossipService *GossipService syncService *SyncService logger *logrus.Logger setMode func(string) // Callback to set server mode } // NewBootstrapService creates a new bootstrap service func NewBootstrapService(config *types.Config, gossipService *GossipService, syncService *SyncService, logger *logrus.Logger, setMode func(string)) *BootstrapService { return &BootstrapService{ config: config, gossipService: gossipService, syncService: syncService, logger: logger, setMode: setMode, } } // Bootstrap joins cluster using seed nodes func (s *BootstrapService) Bootstrap() { if len(s.config.SeedNodes) == 0 { s.logger.Info("No seed nodes configured, running as standalone") return } s.logger.Info("Starting bootstrap process") s.setMode("syncing") // Try to join cluster via each seed node joined := false for _, seedAddr := range s.config.SeedNodes { if s.attemptJoin(seedAddr) { joined = true break } } if !joined { s.logger.Warn("Failed to join cluster via seed nodes, running as standalone") s.setMode("normal") return } // Wait a bit for member discovery time.Sleep(2 * time.Second) // Perform gradual sync (now Merkle-based) s.performGradualSync() // Switch to normal mode s.setMode("normal") s.logger.Info("Bootstrap completed, entering normal mode") } // attemptJoin attempts to join cluster via a seed node func (s *BootstrapService) attemptJoin(seedAddr string) bool { joinReq := types.JoinRequest{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), JoinedTimestamp: time.Now().UnixMilli(), } jsonData, err := json.Marshal(joinReq) if err != nil { s.logger.WithError(err).Error("Failed to marshal join request") return false } client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) protocol := GetProtocol(s.config) url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { s.logger.WithError(err).Error("Failed to create join request") return false } req.Header.Set("Content-Type", "application/json") AddClusterAuthHeaders(req, s.config) resp, err := client.Do(req) if err != nil { s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "error": err.Error(), }).Warn("Failed to contact seed node") return false } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "status": resp.StatusCode, }).Warn("Seed node rejected join request") return false } // Process member list response var memberList []types.Member if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil { s.logger.WithError(err).Error("Failed to decode member list from seed") return false } // Add all members to our local list for _, member := range memberList { if member.ID != s.config.NodeID { s.gossipService.AddMember(&member) } } s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "member_count": len(memberList), }).Info("Successfully joined cluster") return true } // performGradualSync performs gradual sync (Merkle-based version) func (s *BootstrapService) performGradualSync() { s.logger.Info("Starting gradual sync (Merkle-based)") members := s.gossipService.GetHealthyMembers() if len(members) == 0 { s.logger.Info("No healthy members for gradual sync") return } // For now, just do a few rounds of Merkle sync for i := 0; i < 3; i++ { s.syncService.performMerkleSync() time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond) } s.logger.Info("Gradual sync completed") }