package cluster import ( "bytes" "context" "encoding/json" "fmt" "math/rand" "net/http" "sync" "time" "github.com/sirupsen/logrus" "kvs/types" ) // GossipService handles gossip protocol operations type GossipService struct { config *types.Config members map[string]*types.Member membersMu sync.RWMutex logger *logrus.Logger ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // NewGossipService creates a new gossip service func NewGossipService(config *types.Config, logger *logrus.Logger) *GossipService { ctx, cancel := context.WithCancel(context.Background()) return &GossipService{ config: config, members: make(map[string]*types.Member), logger: logger, ctx: ctx, cancel: cancel, } } // Start begins the gossip routine func (s *GossipService) Start() { if !s.config.ClusteringEnabled { s.logger.Info("Clustering disabled, skipping gossip routine") return } s.wg.Add(1) go s.gossipRoutine() } // Stop terminates the gossip service func (s *GossipService) Stop() { s.cancel() s.wg.Wait() } // AddMember adds a member to the gossip member list func (s *GossipService) AddMember(member *types.Member) { s.membersMu.Lock() defer s.membersMu.Unlock() s.members[member.ID] = member s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member added") } // RemoveMember removes a member from the gossip member list func (s *GossipService) RemoveMember(nodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { delete(s.members, nodeID) s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member removed") } } // GetMembers returns a copy of all members func (s *GossipService) GetMembers() []*types.Member { s.membersMu.RLock() defer s.membersMu.RUnlock() members := make([]*types.Member, 0, len(s.members)) for _, member := range s.members { members = append(members, member) } return members } // GetHealthyMembers returns members that have been seen recently func (s *GossipService) GetHealthyMembers() []*types.Member { s.membersMu.RLock() defer s.membersMu.RUnlock() now := time.Now().UnixMilli() healthyMembers := make([]*types.Member, 0) for _, member := range s.members { // Consider member healthy if last seen within last 5 minutes if now-member.LastSeen < 5*60*1000 { healthyMembers = append(healthyMembers, member) } } return healthyMembers } // gossipRoutine runs periodically to exchange member lists func (s *GossipService) gossipRoutine() { defer s.wg.Done() for { // Random interval between 1-2 minutes minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval))) select { case <-s.ctx.Done(): return case <-time.After(interval): s.performGossipRound() } } } // performGossipRound performs a gossip round with random healthy peers func (s *GossipService) performGossipRound() { members := s.GetHealthyMembers() if len(members) == 0 { s.logger.Debug("No healthy members for gossip round") return } // Select 1-3 random peers for gossip maxPeers := 3 if len(members) < maxPeers { maxPeers = len(members) } // Shuffle and select rand.Shuffle(len(members), func(i, j int) { members[i], members[j] = members[j], members[i] }) selectedPeers := members[:rand.Intn(maxPeers)+1] for _, peer := range selectedPeers { go s.gossipWithPeer(peer) } } // gossipWithPeer performs gossip with a specific peer func (s *GossipService) gossipWithPeer(peer *types.Member) error { s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer") // Get our current member list localMembers := s.GetMembers() // Send our member list to the peer gossipData := make([]types.Member, len(localMembers)) for i, member := range localMembers { gossipData[i] = *member } // Add ourselves to the list selfMember := types.Member{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), LastSeen: time.Now().UnixMilli(), JoinedTimestamp: s.GetJoinedTimestamp(), } gossipData = append(gossipData, selfMember) jsonData, err := json.Marshal(gossipData) if err != nil { s.logger.WithError(err).Error("Failed to marshal gossip data") return err } // Send HTTP request to peer with cluster authentication client := NewAuthenticatedHTTPClient(s.config, 5*time.Second) protocol := GetProtocol(s.config) url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { s.logger.WithError(err).Error("Failed to create gossip request") return err } req.Header.Set("Content-Type", "application/json") AddClusterAuthHeaders(req, s.config) resp, err := client.Do(req) if err != nil { s.logger.WithFields(logrus.Fields{ "peer": peer.Address, "error": err.Error(), }).Warn("Failed to gossip with peer") s.markPeerUnhealthy(peer.ID) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s.logger.WithFields(logrus.Fields{ "peer": peer.Address, "status": resp.StatusCode, }).Warn("Gossip request failed") s.markPeerUnhealthy(peer.ID) return fmt.Errorf("gossip request failed with status %d", resp.StatusCode) } // Process response - peer's member list var remoteMemberList []types.Member if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil { s.logger.WithError(err).Error("Failed to decode gossip response") return err } // Merge remote member list with our local list s.MergeMemberList(remoteMemberList, s.config.NodeID) // Update peer's last seen timestamp s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli()) s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer") return nil } // markPeerUnhealthy marks a peer as unhealthy func (s *GossipService) markPeerUnhealthy(nodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { // Mark as last seen a long time ago to indicate unhealthy member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy") } } // updateMemberLastSeen updates member's last seen timestamp func (s *GossipService) updateMemberLastSeen(nodeID string, timestamp int64) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { member.LastSeen = timestamp } } // MergeMemberList merges remote member list with local member list func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() now := time.Now().UnixMilli() for _, remoteMember := range remoteMembers { // Skip ourselves if remoteMember.ID == selfNodeID { continue } if localMember, exists := s.members[remoteMember.ID]; exists { // Update existing member if remoteMember.LastSeen > localMember.LastSeen { localMember.LastSeen = remoteMember.LastSeen } // Keep the earlier joined timestamp if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { localMember.JoinedTimestamp = remoteMember.JoinedTimestamp } } else { // Add new member newMember := &types.Member{ ID: remoteMember.ID, Address: remoteMember.Address, LastSeen: remoteMember.LastSeen, JoinedTimestamp: remoteMember.JoinedTimestamp, } s.members[remoteMember.ID] = newMember s.logger.WithFields(logrus.Fields{ "node_id": remoteMember.ID, "address": remoteMember.Address, }).Info("Discovered new member through gossip") } } // Clean up old members (not seen for more than 10 minutes) toRemove := make([]string, 0) for nodeID, member := range s.members { if now-member.LastSeen > 10*60*1000 { // 10 minutes toRemove = append(toRemove, nodeID) } } for _, nodeID := range toRemove { delete(s.members, nodeID) s.logger.WithField("node_id", nodeID).Info("Removed stale member") } } // GetJoinedTimestamp placeholder - would be implemented by the server func (s *GossipService) GetJoinedTimestamp() int64 { // This should be implemented by the server that uses this service return time.Now().UnixMilli() }