Files
kalzu-value-store/cluster/gossip.go
ryyst 9f12f3dbcb refactor: extract clustering system to cluster package
- Create cluster/merkle.go with Merkle tree operations
- Create cluster/gossip.go with gossip protocol implementation
- Create cluster/sync.go with data synchronization logic
- Create cluster/bootstrap.go with cluster joining functionality

Major clustering functionality now properly separated:
* MerkleService: Tree building, hashing, filtering
* GossipService: Member discovery, health checking, list merging
* SyncService: Merkle-based synchronization between nodes
* BootstrapService: Seed node joining and initial sync

Build tested and verified working. Ready for main.go integration.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-18 18:53:52 +03:00

303 lines
8.1 KiB
Go

package cluster
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/sirupsen/logrus"
"kvs/types"
)
// GossipService handles gossip protocol operations
type GossipService struct {
config *types.Config
members map[string]*types.Member
membersMu sync.RWMutex
logger *logrus.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewGossipService creates a new gossip service
func NewGossipService(config *types.Config, logger *logrus.Logger) *GossipService {
ctx, cancel := context.WithCancel(context.Background())
return &GossipService{
config: config,
members: make(map[string]*types.Member),
logger: logger,
ctx: ctx,
cancel: cancel,
}
}
// Start begins the gossip routine
func (s *GossipService) Start() {
if !s.config.ClusteringEnabled {
s.logger.Info("Clustering disabled, skipping gossip routine")
return
}
s.wg.Add(1)
go s.gossipRoutine()
}
// Stop terminates the gossip service
func (s *GossipService) Stop() {
s.cancel()
s.wg.Wait()
}
// AddMember adds a member to the gossip member list
func (s *GossipService) AddMember(member *types.Member) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
s.members[member.ID] = member
s.logger.WithFields(logrus.Fields{
"node_id": member.ID,
"address": member.Address,
}).Info("Member added")
}
// RemoveMember removes a member from the gossip member list
func (s *GossipService) RemoveMember(nodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
delete(s.members, nodeID)
s.logger.WithFields(logrus.Fields{
"node_id": member.ID,
"address": member.Address,
}).Info("Member removed")
}
}
// GetMembers returns a copy of all members
func (s *GossipService) GetMembers() []*types.Member {
s.membersMu.RLock()
defer s.membersMu.RUnlock()
members := make([]*types.Member, 0, len(s.members))
for _, member := range s.members {
members = append(members, member)
}
return members
}
// GetHealthyMembers returns members that have been seen recently
func (s *GossipService) GetHealthyMembers() []*types.Member {
s.membersMu.RLock()
defer s.membersMu.RUnlock()
now := time.Now().UnixMilli()
healthyMembers := make([]*types.Member, 0)
for _, member := range s.members {
// Consider member healthy if last seen within last 5 minutes
if now-member.LastSeen < 5*60*1000 {
healthyMembers = append(healthyMembers, member)
}
}
return healthyMembers
}
// gossipRoutine runs periodically to exchange member lists
func (s *GossipService) gossipRoutine() {
defer s.wg.Done()
for {
// Random interval between 1-2 minutes
minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second
maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second
interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval)))
select {
case <-s.ctx.Done():
return
case <-time.After(interval):
s.performGossipRound()
}
}
}
// performGossipRound performs a gossip round with random healthy peers
func (s *GossipService) performGossipRound() {
members := s.GetHealthyMembers()
if len(members) == 0 {
s.logger.Debug("No healthy members for gossip round")
return
}
// Select 1-3 random peers for gossip
maxPeers := 3
if len(members) < maxPeers {
maxPeers = len(members)
}
// Shuffle and select
rand.Shuffle(len(members), func(i, j int) {
members[i], members[j] = members[j], members[i]
})
selectedPeers := members[:rand.Intn(maxPeers)+1]
for _, peer := range selectedPeers {
go s.gossipWithPeer(peer)
}
}
// gossipWithPeer performs gossip with a specific peer
func (s *GossipService) gossipWithPeer(peer *types.Member) error {
s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer")
// Get our current member list
localMembers := s.GetMembers()
// Send our member list to the peer
gossipData := make([]types.Member, len(localMembers))
for i, member := range localMembers {
gossipData[i] = *member
}
// Add ourselves to the list
selfMember := types.Member{
ID: s.config.NodeID,
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
LastSeen: time.Now().UnixMilli(),
JoinedTimestamp: s.GetJoinedTimestamp(),
}
gossipData = append(gossipData, selfMember)
jsonData, err := json.Marshal(gossipData)
if err != nil {
s.logger.WithError(err).Error("Failed to marshal gossip data")
return err
}
// Send HTTP request to peer
client := &http.Client{Timeout: 5 * time.Second}
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,
"error": err.Error(),
}).Warn("Failed to gossip with peer")
s.markPeerUnhealthy(peer.ID)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,
"status": resp.StatusCode,
}).Warn("Gossip request failed")
s.markPeerUnhealthy(peer.ID)
return fmt.Errorf("gossip request failed with status %d", resp.StatusCode)
}
// Process response - peer's member list
var remoteMemberList []types.Member
if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil {
s.logger.WithError(err).Error("Failed to decode gossip response")
return err
}
// Merge remote member list with our local list
s.MergeMemberList(remoteMemberList, s.config.NodeID)
// Update peer's last seen timestamp
s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli())
s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer")
return nil
}
// markPeerUnhealthy marks a peer as unhealthy
func (s *GossipService) markPeerUnhealthy(nodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
// Mark as last seen a long time ago to indicate unhealthy
member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago
s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy")
}
}
// updateMemberLastSeen updates member's last seen timestamp
func (s *GossipService) updateMemberLastSeen(nodeID string, timestamp int64) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
if member, exists := s.members[nodeID]; exists {
member.LastSeen = timestamp
}
}
// MergeMemberList merges remote member list with local member list
func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID string) {
s.membersMu.Lock()
defer s.membersMu.Unlock()
now := time.Now().UnixMilli()
for _, remoteMember := range remoteMembers {
// Skip ourselves
if remoteMember.ID == selfNodeID {
continue
}
if localMember, exists := s.members[remoteMember.ID]; exists {
// Update existing member
if remoteMember.LastSeen > localMember.LastSeen {
localMember.LastSeen = remoteMember.LastSeen
}
// Keep the earlier joined timestamp
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
localMember.JoinedTimestamp = remoteMember.JoinedTimestamp
}
} else {
// Add new member
newMember := &types.Member{
ID: remoteMember.ID,
Address: remoteMember.Address,
LastSeen: remoteMember.LastSeen,
JoinedTimestamp: remoteMember.JoinedTimestamp,
}
s.members[remoteMember.ID] = newMember
s.logger.WithFields(logrus.Fields{
"node_id": remoteMember.ID,
"address": remoteMember.Address,
}).Info("Discovered new member through gossip")
}
}
// Clean up old members (not seen for more than 10 minutes)
toRemove := make([]string, 0)
for nodeID, member := range s.members {
if now-member.LastSeen > 10*60*1000 { // 10 minutes
toRemove = append(toRemove, nodeID)
}
}
for _, nodeID := range toRemove {
delete(s.members, nodeID)
s.logger.WithField("node_id", nodeID).Info("Removed stale member")
}
}
// GetJoinedTimestamp placeholder - would be implemented by the server
func (s *GossipService) GetJoinedTimestamp() int64 {
// This should be implemented by the server that uses this service
return time.Now().UnixMilli()
}