diff --git a/cluster/bootstrap.go b/cluster/bootstrap.go new file mode 100644 index 0000000..f8e4704 --- /dev/null +++ b/cluster/bootstrap.go @@ -0,0 +1,145 @@ +package cluster + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "kvs/types" +) + +// BootstrapService handles cluster joining and initial synchronization +type BootstrapService struct { + config *types.Config + gossipService *GossipService + syncService *SyncService + logger *logrus.Logger + setMode func(string) // Callback to set server mode +} + +// NewBootstrapService creates a new bootstrap service +func NewBootstrapService(config *types.Config, gossipService *GossipService, syncService *SyncService, logger *logrus.Logger, setMode func(string)) *BootstrapService { + return &BootstrapService{ + config: config, + gossipService: gossipService, + syncService: syncService, + logger: logger, + setMode: setMode, + } +} + +// Bootstrap joins cluster using seed nodes +func (s *BootstrapService) Bootstrap() { + if len(s.config.SeedNodes) == 0 { + s.logger.Info("No seed nodes configured, running as standalone") + return + } + + s.logger.Info("Starting bootstrap process") + s.setMode("syncing") + + // Try to join cluster via each seed node + joined := false + for _, seedAddr := range s.config.SeedNodes { + if s.attemptJoin(seedAddr) { + joined = true + break + } + } + + if !joined { + s.logger.Warn("Failed to join cluster via seed nodes, running as standalone") + s.setMode("normal") + return + } + + // Wait a bit for member discovery + time.Sleep(2 * time.Second) + + // Perform gradual sync (now Merkle-based) + s.performGradualSync() + + // Switch to normal mode + s.setMode("normal") + s.logger.Info("Bootstrap completed, entering normal mode") +} + +// attemptJoin attempts to join cluster via a seed node +func (s *BootstrapService) attemptJoin(seedAddr string) bool { + joinReq := types.JoinRequest{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + JoinedTimestamp: time.Now().UnixMilli(), + } + + jsonData, err := json.Marshal(joinReq) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal join request") + return false + } + + client := &http.Client{Timeout: 10 * time.Second} + url := fmt.Sprintf("http://%s/members/join", seedAddr) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "error": err.Error(), + }).Warn("Failed to contact seed node") + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "status": resp.StatusCode, + }).Warn("Seed node rejected join request") + return false + } + + // Process member list response + var memberList []types.Member + if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil { + s.logger.WithError(err).Error("Failed to decode member list from seed") + return false + } + + // Add all members to our local list + for _, member := range memberList { + if member.ID != s.config.NodeID { + s.gossipService.AddMember(&member) + } + } + + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "member_count": len(memberList), + }).Info("Successfully joined cluster") + + return true +} + +// performGradualSync performs gradual sync (Merkle-based version) +func (s *BootstrapService) performGradualSync() { + s.logger.Info("Starting gradual sync (Merkle-based)") + + members := s.gossipService.GetHealthyMembers() + if len(members) == 0 { + s.logger.Info("No healthy members for gradual sync") + return + } + + // For now, just do a few rounds of Merkle sync + for i := 0; i < 3; i++ { + s.syncService.performMerkleSync() + time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond) + } + + s.logger.Info("Gradual sync completed") +} \ No newline at end of file diff --git a/cluster/gossip.go b/cluster/gossip.go new file mode 100644 index 0000000..cbdf74a --- /dev/null +++ b/cluster/gossip.go @@ -0,0 +1,303 @@ +package cluster + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/sirupsen/logrus" + + "kvs/types" +) + +// GossipService handles gossip protocol operations +type GossipService struct { + config *types.Config + members map[string]*types.Member + membersMu sync.RWMutex + logger *logrus.Logger + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewGossipService creates a new gossip service +func NewGossipService(config *types.Config, logger *logrus.Logger) *GossipService { + ctx, cancel := context.WithCancel(context.Background()) + return &GossipService{ + config: config, + members: make(map[string]*types.Member), + logger: logger, + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the gossip routine +func (s *GossipService) Start() { + if !s.config.ClusteringEnabled { + s.logger.Info("Clustering disabled, skipping gossip routine") + return + } + + s.wg.Add(1) + go s.gossipRoutine() +} + +// Stop terminates the gossip service +func (s *GossipService) Stop() { + s.cancel() + s.wg.Wait() +} + +// AddMember adds a member to the gossip member list +func (s *GossipService) AddMember(member *types.Member) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + s.members[member.ID] = member + s.logger.WithFields(logrus.Fields{ + "node_id": member.ID, + "address": member.Address, + }).Info("Member added") +} + +// RemoveMember removes a member from the gossip member list +func (s *GossipService) RemoveMember(nodeID string) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + if member, exists := s.members[nodeID]; exists { + delete(s.members, nodeID) + s.logger.WithFields(logrus.Fields{ + "node_id": member.ID, + "address": member.Address, + }).Info("Member removed") + } +} + +// GetMembers returns a copy of all members +func (s *GossipService) GetMembers() []*types.Member { + s.membersMu.RLock() + defer s.membersMu.RUnlock() + members := make([]*types.Member, 0, len(s.members)) + for _, member := range s.members { + members = append(members, member) + } + return members +} + +// GetHealthyMembers returns members that have been seen recently +func (s *GossipService) GetHealthyMembers() []*types.Member { + s.membersMu.RLock() + defer s.membersMu.RUnlock() + + now := time.Now().UnixMilli() + healthyMembers := make([]*types.Member, 0) + + for _, member := range s.members { + // Consider member healthy if last seen within last 5 minutes + if now-member.LastSeen < 5*60*1000 { + healthyMembers = append(healthyMembers, member) + } + } + + return healthyMembers +} + +// gossipRoutine runs periodically to exchange member lists +func (s *GossipService) gossipRoutine() { + defer s.wg.Done() + + for { + // Random interval between 1-2 minutes + minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second + maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second + interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval))) + + select { + case <-s.ctx.Done(): + return + case <-time.After(interval): + s.performGossipRound() + } + } +} + +// performGossipRound performs a gossip round with random healthy peers +func (s *GossipService) performGossipRound() { + members := s.GetHealthyMembers() + if len(members) == 0 { + s.logger.Debug("No healthy members for gossip round") + return + } + + // Select 1-3 random peers for gossip + maxPeers := 3 + if len(members) < maxPeers { + maxPeers = len(members) + } + + // Shuffle and select + rand.Shuffle(len(members), func(i, j int) { + members[i], members[j] = members[j], members[i] + }) + + selectedPeers := members[:rand.Intn(maxPeers)+1] + + for _, peer := range selectedPeers { + go s.gossipWithPeer(peer) + } +} + +// gossipWithPeer performs gossip with a specific peer +func (s *GossipService) gossipWithPeer(peer *types.Member) error { + s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer") + + // Get our current member list + localMembers := s.GetMembers() + + // Send our member list to the peer + gossipData := make([]types.Member, len(localMembers)) + for i, member := range localMembers { + gossipData[i] = *member + } + + // Add ourselves to the list + selfMember := types.Member{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + LastSeen: time.Now().UnixMilli(), + JoinedTimestamp: s.GetJoinedTimestamp(), + } + gossipData = append(gossipData, selfMember) + + jsonData, err := json.Marshal(gossipData) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal gossip data") + return err + } + + // Send HTTP request to peer + client := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://%s/members/gossip", peer.Address) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithFields(logrus.Fields{ + "peer": peer.Address, + "error": err.Error(), + }).Warn("Failed to gossip with peer") + s.markPeerUnhealthy(peer.ID) + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s.logger.WithFields(logrus.Fields{ + "peer": peer.Address, + "status": resp.StatusCode, + }).Warn("Gossip request failed") + s.markPeerUnhealthy(peer.ID) + return fmt.Errorf("gossip request failed with status %d", resp.StatusCode) + } + + // Process response - peer's member list + var remoteMemberList []types.Member + if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil { + s.logger.WithError(err).Error("Failed to decode gossip response") + return err + } + + // Merge remote member list with our local list + s.MergeMemberList(remoteMemberList, s.config.NodeID) + + // Update peer's last seen timestamp + s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli()) + + s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer") + return nil +} + +// markPeerUnhealthy marks a peer as unhealthy +func (s *GossipService) markPeerUnhealthy(nodeID string) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + if member, exists := s.members[nodeID]; exists { + // Mark as last seen a long time ago to indicate unhealthy + member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago + s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy") + } +} + +// updateMemberLastSeen updates member's last seen timestamp +func (s *GossipService) updateMemberLastSeen(nodeID string, timestamp int64) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + if member, exists := s.members[nodeID]; exists { + member.LastSeen = timestamp + } +} + +// MergeMemberList merges remote member list with local member list +func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID string) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + now := time.Now().UnixMilli() + + for _, remoteMember := range remoteMembers { + // Skip ourselves + if remoteMember.ID == selfNodeID { + continue + } + + if localMember, exists := s.members[remoteMember.ID]; exists { + // Update existing member + if remoteMember.LastSeen > localMember.LastSeen { + localMember.LastSeen = remoteMember.LastSeen + } + // Keep the earlier joined timestamp + if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { + localMember.JoinedTimestamp = remoteMember.JoinedTimestamp + } + } else { + // Add new member + newMember := &types.Member{ + ID: remoteMember.ID, + Address: remoteMember.Address, + LastSeen: remoteMember.LastSeen, + JoinedTimestamp: remoteMember.JoinedTimestamp, + } + s.members[remoteMember.ID] = newMember + s.logger.WithFields(logrus.Fields{ + "node_id": remoteMember.ID, + "address": remoteMember.Address, + }).Info("Discovered new member through gossip") + } + } + + // Clean up old members (not seen for more than 10 minutes) + toRemove := make([]string, 0) + for nodeID, member := range s.members { + if now-member.LastSeen > 10*60*1000 { // 10 minutes + toRemove = append(toRemove, nodeID) + } + } + + for _, nodeID := range toRemove { + delete(s.members, nodeID) + s.logger.WithField("node_id", nodeID).Info("Removed stale member") + } +} + +// GetJoinedTimestamp placeholder - would be implemented by the server +func (s *GossipService) GetJoinedTimestamp() int64 { + // This should be implemented by the server that uses this service + return time.Now().UnixMilli() +} \ No newline at end of file diff --git a/cluster/merkle.go b/cluster/merkle.go new file mode 100644 index 0000000..e6032fe --- /dev/null +++ b/cluster/merkle.go @@ -0,0 +1,176 @@ +package cluster + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "fmt" + "sort" + "strconv" + "strings" + + badger "github.com/dgraph-io/badger/v4" + "github.com/sirupsen/logrus" + + "kvs/types" +) + +// MerkleService handles Merkle tree operations +type MerkleService struct { + db *badger.DB + logger *logrus.Logger +} + +// NewMerkleService creates a new Merkle tree service +func NewMerkleService(db *badger.DB, logger *logrus.Logger) *MerkleService { + return &MerkleService{ + db: db, + logger: logger, + } +} + +// CalculateHash generates a SHA256 hash for a given byte slice +func CalculateHash(data []byte) []byte { + h := sha256.New() + h.Write(data) + return h.Sum(nil) +} + +// CalculateLeafHash generates a hash for a leaf node based on its path, UUID, timestamp, and data +func (s *MerkleService) CalculateLeafHash(path string, storedValue *types.StoredValue) []byte { + // Concatenate path, UUID, timestamp, and the raw data bytes for hashing + // Ensure a consistent order of fields for hashing + dataToHash := bytes.Buffer{} + dataToHash.WriteString(path) + dataToHash.WriteByte(':') + dataToHash.WriteString(storedValue.UUID) + dataToHash.WriteByte(':') + dataToHash.WriteString(strconv.FormatInt(storedValue.Timestamp, 10)) + dataToHash.WriteByte(':') + dataToHash.Write(storedValue.Data) // Use raw bytes of json.RawMessage + + return CalculateHash(dataToHash.Bytes()) +} + +// GetAllKVPairsForMerkleTree retrieves all key-value pairs needed for Merkle tree construction +func (s *MerkleService) GetAllKVPairsForMerkleTree() (map[string]*types.StoredValue, error) { + pairs := make(map[string]*types.StoredValue) + err := s.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = true // We need the values for hashing + it := txn.NewIterator(opts) + defer it.Close() + + // Iterate over all actual data keys (not _ts: indexes) + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := string(item.Key()) + + if strings.HasPrefix(key, "_ts:") { + continue // Skip index keys + } + + var storedValue types.StoredValue + err := item.Value(func(val []byte) error { + return json.Unmarshal(val, &storedValue) + }) + if err != nil { + s.logger.WithError(err).WithField("key", key).Warn("Failed to unmarshal stored value for Merkle tree, skipping") + continue + } + pairs[key] = &storedValue + } + return nil + }) + if err != nil { + return nil, err + } + return pairs, nil +} + +// BuildMerkleTreeFromPairs constructs a Merkle Tree from the KVS data +// This version uses a recursive approach to build a balanced tree from sorted keys +func (s *MerkleService) BuildMerkleTreeFromPairs(pairs map[string]*types.StoredValue) (*types.MerkleNode, error) { + if len(pairs) == 0 { + return &types.MerkleNode{Hash: CalculateHash([]byte("empty_tree")), StartKey: "", EndKey: ""}, nil + } + + // Sort keys to ensure consistent tree structure + keys := make([]string, 0, len(pairs)) + for k := range pairs { + keys = append(keys, k) + } + sort.Strings(keys) + + // Create leaf nodes + leafNodes := make([]*types.MerkleNode, len(keys)) + for i, key := range keys { + storedValue := pairs[key] + hash := s.CalculateLeafHash(key, storedValue) + leafNodes[i] = &types.MerkleNode{Hash: hash, StartKey: key, EndKey: key} + } + + // Recursively build parent nodes + return s.buildMerkleTreeRecursive(leafNodes) +} + +// buildMerkleTreeRecursive builds the tree from a slice of nodes +func (s *MerkleService) buildMerkleTreeRecursive(nodes []*types.MerkleNode) (*types.MerkleNode, error) { + if len(nodes) == 0 { + return nil, nil + } + if len(nodes) == 1 { + return nodes[0], nil + } + + var nextLevel []*types.MerkleNode + for i := 0; i < len(nodes); i += 2 { + left := nodes[i] + var right *types.MerkleNode + if i+1 < len(nodes) { + right = nodes[i+1] + } + + var combinedHash []byte + var endKey string + + if right != nil { + combinedHash = CalculateHash(append(left.Hash, right.Hash...)) + endKey = right.EndKey + } else { + // Odd number of nodes, promote the left node + combinedHash = left.Hash + endKey = left.EndKey + } + + parentNode := &types.MerkleNode{ + Hash: combinedHash, + StartKey: left.StartKey, + EndKey: endKey, + } + nextLevel = append(nextLevel, parentNode) + } + return s.buildMerkleTreeRecursive(nextLevel) +} + +// FilterPairsByRange filters a map of StoredValue by key range +func FilterPairsByRange(allPairs map[string]*types.StoredValue, startKey, endKey string) map[string]*types.StoredValue { + filtered := make(map[string]*types.StoredValue) + for key, value := range allPairs { + if (startKey == "" || key >= startKey) && (endKey == "" || key <= endKey) { + filtered[key] = value + } + } + return filtered +} + +// BuildSubtreeForRange builds a Merkle subtree for a specific key range +func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.MerkleNode, error) { + pairs, err := s.GetAllKVPairsForMerkleTree() + if err != nil { + return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err) + } + + filteredPairs := FilterPairsByRange(pairs, startKey, endKey) + return s.BuildMerkleTreeFromPairs(filteredPairs) +} \ No newline at end of file diff --git a/cluster/sync.go b/cluster/sync.go new file mode 100644 index 0000000..ea4142f --- /dev/null +++ b/cluster/sync.go @@ -0,0 +1,389 @@ +package cluster + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "sort" + "sync" + "time" + + badger "github.com/dgraph-io/badger/v4" + "github.com/sirupsen/logrus" + + "kvs/types" +) + +// SyncService handles data synchronization between cluster nodes +type SyncService struct { + db *badger.DB + config *types.Config + gossipService *GossipService + merkleService *MerkleService + logger *logrus.Logger + merkleRoot *types.MerkleNode + merkleRootMu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewSyncService creates a new sync service +func NewSyncService(db *badger.DB, config *types.Config, gossipService *GossipService, merkleService *MerkleService, logger *logrus.Logger) *SyncService { + ctx, cancel := context.WithCancel(context.Background()) + return &SyncService{ + db: db, + config: config, + gossipService: gossipService, + merkleService: merkleService, + logger: logger, + ctx: ctx, + cancel: cancel, + } +} + +// Start begins the sync routines +func (s *SyncService) Start() { + if !s.config.ClusteringEnabled { + s.logger.Info("Clustering disabled, skipping sync routines") + return + } + + // Start sync routine + s.wg.Add(1) + go s.syncRoutine() + + // Start Merkle tree rebuild routine + s.wg.Add(1) + go s.merkleTreeRebuildRoutine() +} + +// Stop terminates the sync service +func (s *SyncService) Stop() { + s.cancel() + s.wg.Wait() +} + +// GetMerkleRoot returns the current Merkle root +func (s *SyncService) GetMerkleRoot() *types.MerkleNode { + s.merkleRootMu.RLock() + defer s.merkleRootMu.RUnlock() + return s.merkleRoot +} + +// SetMerkleRoot sets the current Merkle root +func (s *SyncService) SetMerkleRoot(root *types.MerkleNode) { + s.merkleRootMu.Lock() + defer s.merkleRootMu.Unlock() + s.merkleRoot = root +} + +// syncRoutine handles regular and catch-up syncing +func (s *SyncService) syncRoutine() { + defer s.wg.Done() + + syncTicker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second) + defer syncTicker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-syncTicker.C: + s.performMerkleSync() + } + } +} + +// merkleTreeRebuildRoutine periodically rebuilds the Merkle tree +func (s *SyncService) merkleTreeRebuildRoutine() { + defer s.wg.Done() + ticker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.logger.Debug("Rebuilding Merkle tree...") + pairs, err := s.merkleService.GetAllKVPairsForMerkleTree() + if err != nil { + s.logger.WithError(err).Error("Failed to get KV pairs for Merkle tree rebuild") + continue + } + newRoot, err := s.merkleService.BuildMerkleTreeFromPairs(pairs) + if err != nil { + s.logger.WithError(err).Error("Failed to rebuild Merkle tree") + continue + } + s.SetMerkleRoot(newRoot) + s.logger.Debug("Merkle tree rebuilt.") + } + } +} + +// InitializeMerkleTree builds the initial Merkle tree +func (s *SyncService) InitializeMerkleTree() error { + pairs, err := s.merkleService.GetAllKVPairsForMerkleTree() + if err != nil { + return fmt.Errorf("failed to get all KV pairs for initial Merkle tree: %v", err) + } + root, err := s.merkleService.BuildMerkleTreeFromPairs(pairs) + if err != nil { + return fmt.Errorf("failed to build initial Merkle tree: %v", err) + } + s.SetMerkleRoot(root) + s.logger.Info("Initial Merkle tree built.") + return nil +} + +// performMerkleSync performs a synchronization round using Merkle Trees +func (s *SyncService) performMerkleSync() { + members := s.gossipService.GetHealthyMembers() + if len(members) == 0 { + s.logger.Debug("No healthy members for Merkle sync") + return + } + + // Select random peer + peer := members[rand.Intn(len(members))] + + s.logger.WithField("peer", peer.Address).Info("Starting Merkle tree sync") + + localRoot := s.GetMerkleRoot() + if localRoot == nil { + s.logger.Error("Local Merkle root is nil, cannot perform sync") + return + } + + // 1. Get remote peer's Merkle root + remoteRootResp, err := s.requestMerkleRoot(peer.Address) + if err != nil { + s.logger.WithError(err).WithField("peer", peer.Address).Error("Failed to get remote Merkle root") + s.gossipService.markPeerUnhealthy(peer.ID) + return + } + remoteRoot := remoteRootResp.Root + + // 2. Compare roots and start recursive diffing if they differ + if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) { + s.logger.WithFields(logrus.Fields{ + "peer": peer.Address, + "local_root": hex.EncodeToString(localRoot.Hash), + "remote_root": hex.EncodeToString(remoteRoot.Hash), + }).Info("Merkle roots differ, starting recursive diff") + s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot) + } else { + s.logger.WithField("peer", peer.Address).Info("Merkle roots match, no sync needed") + } + + s.logger.WithField("peer", peer.Address).Info("Completed Merkle tree sync") +} + +// requestMerkleRoot requests the Merkle root from a peer +func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) { + client := &http.Client{Timeout: 10 * time.Second} + url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress) + + resp, err := client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("peer returned status %d for Merkle root", resp.StatusCode) + } + + var merkleRootResp types.MerkleRootResponse + if err := json.NewDecoder(resp.Body).Decode(&merkleRootResp); err != nil { + return nil, err + } + return &merkleRootResp, nil +} + +// diffMerkleTreesRecursive recursively compares local and remote Merkle tree nodes +func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, remoteNode *types.MerkleNode) { + // If hashes match, this subtree is in sync. + if bytes.Equal(localNode.Hash, remoteNode.Hash) { + return + } + + // Hashes differ, need to go deeper. + // Request children from the remote peer for the current range. + req := types.MerkleTreeDiffRequest{ + ParentNode: *remoteNode, // We are asking the remote peer about its children for this range + LocalHash: localNode.Hash, // Our hash for this range + } + + remoteDiffResp, err := s.requestMerkleDiff(peerAddress, req) + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "peer": peerAddress, + "start_key": localNode.StartKey, + "end_key": localNode.EndKey, + }).Error("Failed to get Merkle diff from peer") + return + } + + if len(remoteDiffResp.Keys) > 0 { + // This is a leaf-level diff, we have the actual keys that are different. + s.handleLeafLevelDiff(peerAddress, remoteDiffResp.Keys, localNode) + } else if len(remoteDiffResp.Children) > 0 { + // Not a leaf level, continue recursive diff for children. + s.handleChildrenDiff(peerAddress, remoteDiffResp.Children) + } +} + +// handleLeafLevelDiff processes leaf-level differences +func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, localNode *types.MerkleNode) { + s.logger.WithFields(logrus.Fields{ + "peer": peerAddress, + "start_key": localNode.StartKey, + "end_key": localNode.EndKey, + "num_keys": len(keys), + }).Info("Found divergent keys, fetching and comparing data") + + for _, key := range keys { + // Fetch the individual key from the peer + remoteStoredValue, err := s.fetchSingleKVFromPeer(peerAddress, key) + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "peer": peerAddress, + "key": key, + }).Error("Failed to fetch single KV from peer during diff") + continue + } + + localStoredValue, localExists := s.getLocalData(key) + + if remoteStoredValue == nil { + // Key was deleted on remote, delete locally if exists + if localExists { + s.logger.WithField("key", key).Info("Key deleted on remote, deleting locally") + s.deleteKVLocally(key, localStoredValue.Timestamp) + } + continue + } + + if !localExists { + // Local data is missing, store the remote data + if err := s.storeReplicatedDataWithMetadata(key, remoteStoredValue); err != nil { + s.logger.WithError(err).WithField("key", key).Error("Failed to store missing replicated data") + } else { + s.logger.WithField("key", key).Info("Fetched and stored missing data from peer") + } + } else if localStoredValue.Timestamp < remoteStoredValue.Timestamp { + // Remote is newer, store the remote data + if err := s.storeReplicatedDataWithMetadata(key, remoteStoredValue); err != nil { + s.logger.WithError(err).WithField("key", key).Error("Failed to store newer replicated data") + } else { + s.logger.WithField("key", key).Info("Fetched and stored newer data from peer") + } + } else if localStoredValue.Timestamp == remoteStoredValue.Timestamp && localStoredValue.UUID != remoteStoredValue.UUID { + // Timestamp collision, engage conflict resolution + s.resolveConflict(key, localStoredValue, remoteStoredValue, peerAddress) + } + // If local is newer or same timestamp and same UUID, do nothing. + } +} + +// Add placeholder methods that would need to be implemented or injected +func (s *SyncService) fetchSingleKVFromPeer(peerAddress, key string) (*types.StoredValue, error) { + // This would be implemented similar to the main.go version + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncService) getLocalData(key string) (*types.StoredValue, bool) { + // This would be implemented similar to the main.go version + return nil, false +} + +func (s *SyncService) deleteKVLocally(key string, timestamp int64) error { + // This would be implemented similar to the main.go version + return fmt.Errorf("not implemented") +} + +func (s *SyncService) storeReplicatedDataWithMetadata(key string, value *types.StoredValue) error { + // This would be implemented similar to the main.go version + return fmt.Errorf("not implemented") +} + +func (s *SyncService) resolveConflict(key string, local, remote *types.StoredValue, peerAddress string) error { + // This would be implemented similar to the main.go version + return fmt.Errorf("not implemented") +} + +// requestMerkleDiff requests children hashes or keys for a given node/range from a peer +func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { + jsonData, err := json.Marshal(req) + if err != nil { + return nil, err + } + + client := &http.Client{Timeout: 10 * time.Second} + url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("peer returned status %d for Merkle diff", resp.StatusCode) + } + + var diffResp types.MerkleTreeDiffResponse + if err := json.NewDecoder(resp.Body).Decode(&diffResp); err != nil { + return nil, err + } + return &diffResp, nil +} + +// handleChildrenDiff processes children-level differences +func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.MerkleNode) { + localPairs, err := s.merkleService.GetAllKVPairsForMerkleTree() + if err != nil { + s.logger.WithError(err).Error("Failed to get KV pairs for local children comparison") + return + } + + for _, remoteChild := range children { + // Build the local Merkle node for this child's range + localChildNode, err := s.merkleService.BuildMerkleTreeFromPairs(FilterPairsByRange(localPairs, remoteChild.StartKey, remoteChild.EndKey)) + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "start_key": remoteChild.StartKey, + "end_key": remoteChild.EndKey, + }).Error("Failed to build local child node for diff") + continue + } + + if localChildNode == nil || !bytes.Equal(localChildNode.Hash, remoteChild.Hash) { + // If local child node is nil (meaning local has no data in this range) + // or hashes differ, then we need to fetch the data. + if localChildNode == nil { + s.logger.WithFields(logrus.Fields{ + "peer": peerAddress, + "start_key": remoteChild.StartKey, + "end_key": remoteChild.EndKey, + }).Info("Local node missing data in remote child's range, fetching full range") + s.fetchAndStoreRange(peerAddress, remoteChild.StartKey, remoteChild.EndKey) + } else { + s.diffMerkleTreesRecursive(peerAddress, localChildNode, &remoteChild) + } + } + } +} + +// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally +func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { + // This would be implemented similar to the main.go version + return fmt.Errorf("not implemented") +} \ No newline at end of file