- Updated bootstrap service to use authenticated HTTP client with cluster auth headers - Made GET /members/ endpoint unprotected for monitoring/inspection purposes - All other cluster communication endpoints remain protected by cluster auth middleware This ensures proper cluster formation while maintaining security for inter-node communication. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
155 lines
3.9 KiB
Go
155 lines
3.9 KiB
Go
package cluster
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"kvs/types"
|
|
)
|
|
|
|
// BootstrapService handles cluster joining and initial synchronization
|
|
type BootstrapService struct {
|
|
config *types.Config
|
|
gossipService *GossipService
|
|
syncService *SyncService
|
|
logger *logrus.Logger
|
|
setMode func(string) // Callback to set server mode
|
|
}
|
|
|
|
// NewBootstrapService creates a new bootstrap service
|
|
func NewBootstrapService(config *types.Config, gossipService *GossipService, syncService *SyncService, logger *logrus.Logger, setMode func(string)) *BootstrapService {
|
|
return &BootstrapService{
|
|
config: config,
|
|
gossipService: gossipService,
|
|
syncService: syncService,
|
|
logger: logger,
|
|
setMode: setMode,
|
|
}
|
|
}
|
|
|
|
// Bootstrap joins cluster using seed nodes
|
|
func (s *BootstrapService) Bootstrap() {
|
|
if len(s.config.SeedNodes) == 0 {
|
|
s.logger.Info("No seed nodes configured, running as standalone")
|
|
return
|
|
}
|
|
|
|
s.logger.Info("Starting bootstrap process")
|
|
s.setMode("syncing")
|
|
|
|
// Try to join cluster via each seed node
|
|
joined := false
|
|
for _, seedAddr := range s.config.SeedNodes {
|
|
if s.attemptJoin(seedAddr) {
|
|
joined = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !joined {
|
|
s.logger.Warn("Failed to join cluster via seed nodes, running as standalone")
|
|
s.setMode("normal")
|
|
return
|
|
}
|
|
|
|
// Wait a bit for member discovery
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// Perform gradual sync (now Merkle-based)
|
|
s.performGradualSync()
|
|
|
|
// Switch to normal mode
|
|
s.setMode("normal")
|
|
s.logger.Info("Bootstrap completed, entering normal mode")
|
|
}
|
|
|
|
// attemptJoin attempts to join cluster via a seed node
|
|
func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
|
joinReq := types.JoinRequest{
|
|
ID: s.config.NodeID,
|
|
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
|
|
JoinedTimestamp: time.Now().UnixMilli(),
|
|
}
|
|
|
|
jsonData, err := json.Marshal(joinReq)
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("Failed to marshal join request")
|
|
return false
|
|
}
|
|
|
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
|
protocol := GetProtocol(s.config)
|
|
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
|
if err != nil {
|
|
s.logger.WithError(err).Error("Failed to create join request")
|
|
return false
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
AddClusterAuthHeaders(req, s.config)
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
s.logger.WithFields(logrus.Fields{
|
|
"seed": seedAddr,
|
|
"error": err.Error(),
|
|
}).Warn("Failed to contact seed node")
|
|
return false
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
s.logger.WithFields(logrus.Fields{
|
|
"seed": seedAddr,
|
|
"status": resp.StatusCode,
|
|
}).Warn("Seed node rejected join request")
|
|
return false
|
|
}
|
|
|
|
// Process member list response
|
|
var memberList []types.Member
|
|
if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil {
|
|
s.logger.WithError(err).Error("Failed to decode member list from seed")
|
|
return false
|
|
}
|
|
|
|
// Add all members to our local list
|
|
for _, member := range memberList {
|
|
if member.ID != s.config.NodeID {
|
|
s.gossipService.AddMember(&member)
|
|
}
|
|
}
|
|
|
|
s.logger.WithFields(logrus.Fields{
|
|
"seed": seedAddr,
|
|
"member_count": len(memberList),
|
|
}).Info("Successfully joined cluster")
|
|
|
|
return true
|
|
}
|
|
|
|
// performGradualSync performs gradual sync (Merkle-based version)
|
|
func (s *BootstrapService) performGradualSync() {
|
|
s.logger.Info("Starting gradual sync (Merkle-based)")
|
|
|
|
members := s.gossipService.GetHealthyMembers()
|
|
if len(members) == 0 {
|
|
s.logger.Info("No healthy members for gradual sync")
|
|
return
|
|
}
|
|
|
|
// For now, just do a few rounds of Merkle sync
|
|
for i := 0; i < 3; i++ {
|
|
s.syncService.performMerkleSync()
|
|
time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond)
|
|
}
|
|
|
|
s.logger.Info("Gradual sync completed")
|
|
}
|