From b6332d7ff533a2177134b97fc5aa1b543bb93b0d Mon Sep 17 00:00:00 2001 From: ryyst Date: Sat, 20 Sep 2025 18:01:58 +0300 Subject: [PATCH] fix: implement missing sync service methods for data replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implemented fetchSingleKVFromPeer: HTTP client to fetch KV pairs from peers - Implemented getLocalData: Badger DB access for local data retrieval - Implemented deleteKVLocally: Local deletion with timestamp index cleanup - Implemented storeReplicatedDataWithMetadata: Preserves original UUID/timestamp - Implemented resolveConflict: Simple conflict resolution (newer timestamp wins) - Implemented fetchAndStoreRange: Fetches KV ranges for Merkle sync This fixes the critical data replication issue where sync was failing with "not implemented" errors. Integration tests now pass for data replication. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cluster/sync.go | 151 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 16 deletions(-) diff --git a/cluster/sync.go b/cluster/sync.go index ed6786d..c0e48d9 100644 --- a/cluster/sync.go +++ b/cluster/sync.go @@ -292,30 +292,108 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc } } -// Add placeholder methods that would need to be implemented or injected -func (s *SyncService) fetchSingleKVFromPeer(peerAddress, key string) (*types.StoredValue, error) { - // This would be implemented similar to the main.go version - return nil, fmt.Errorf("not implemented") +// fetchSingleKVFromPeer fetches a single KV pair from a peer +func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) { + client := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) + + resp, err := client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, nil // Key might have been deleted on the peer + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("peer returned status %d for path %s", resp.StatusCode, path) + } + + var storedValue types.StoredValue + if err := json.NewDecoder(resp.Body).Decode(&storedValue); err != nil { + return nil, fmt.Errorf("failed to decode types.StoredValue from peer: %v", err) + } + + return &storedValue, nil } -func (s *SyncService) getLocalData(key string) (*types.StoredValue, bool) { - // This would be implemented similar to the main.go version - return nil, false +// getLocalData is a utility to retrieve a types.StoredValue from local DB +func (s *SyncService) getLocalData(path string) (*types.StoredValue, bool) { + var storedValue types.StoredValue + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(path)) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &storedValue) + }) + }) + + if err != nil { + return nil, false + } + + return &storedValue, true } +// deleteKVLocally deletes a key-value pair and its associated timestamp index locally func (s *SyncService) deleteKVLocally(key string, timestamp int64) error { - // This would be implemented similar to the main.go version - return fmt.Errorf("not implemented") + return s.db.Update(func(txn *badger.Txn) error { + // Delete the main key + if err := txn.Delete([]byte(key)); err != nil { + return err + } + + // Delete the timestamp index + indexKey := fmt.Sprintf("_ts:%d:%s", timestamp, key) + return txn.Delete([]byte(indexKey)) + }) } -func (s *SyncService) storeReplicatedDataWithMetadata(key string, value *types.StoredValue) error { - // This would be implemented similar to the main.go version - return fmt.Errorf("not implemented") +// storeReplicatedDataWithMetadata stores replicated data preserving its original metadata +func (s *SyncService) storeReplicatedDataWithMetadata(path string, storedValue *types.StoredValue) error { + valueBytes, err := json.Marshal(storedValue) + if err != nil { + return err + } + + return s.db.Update(func(txn *badger.Txn) error { + // Store main data + if err := txn.Set([]byte(path), valueBytes); err != nil { + return err + } + + // Store timestamp index + indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path) + return txn.Set([]byte(indexKey), []byte(storedValue.UUID)) + }) } +// resolveConflict performs simple conflict resolution (newer timestamp wins) func (s *SyncService) resolveConflict(key string, local, remote *types.StoredValue, peerAddress string) error { - // This would be implemented similar to the main.go version - return fmt.Errorf("not implemented") + s.logger.WithFields(logrus.Fields{ + "key": key, + "local_ts": local.Timestamp, + "remote_ts": remote.Timestamp, + "local_uuid": local.UUID, + "remote_uuid": remote.UUID, + }).Info("Resolving timestamp collision conflict") + + if remote.Timestamp > local.Timestamp { + // Remote is newer, store it + err := s.storeReplicatedDataWithMetadata(key, remote) + if err == nil { + s.logger.WithField("key", key).Info("Conflict resolved: remote data wins (newer)") + } + return err + } + + // Local is newer or equal, keep local data + s.logger.WithField("key", key).Info("Conflict resolved: local data wins (newer or equal)") + return nil } // requestMerkleDiff requests children hashes or keys for a given node/range from a peer @@ -383,6 +461,47 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me // fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { - // This would be implemented similar to the main.go version - return fmt.Errorf("not implemented") + req := types.KVRangeRequest{ + StartKey: startKey, + EndKey: endKey, + Limit: 0, // No limit + } + jsonData, err := json.Marshal(req) + if err != nil { + return err + } + + client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches + url := fmt.Sprintf("http://%s/kv_range", peerAddress) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("peer returned status %d for KV range fetch", resp.StatusCode) + } + + var rangeResp types.KVRangeResponse + if err := json.NewDecoder(resp.Body).Decode(&rangeResp); err != nil { + return err + } + + for _, pair := range rangeResp.Pairs { + // Use storeReplicatedDataWithMetadata to preserve original UUID/Timestamp + if err := s.storeReplicatedDataWithMetadata(pair.Path, &pair.StoredValue); err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "peer": peerAddress, + "path": pair.Path, + }).Error("Failed to store fetched range data") + } else { + s.logger.WithFields(logrus.Fields{ + "peer": peerAddress, + "path": pair.Path, + }).Debug("Stored data from fetched range") + } + } + return nil } \ No newline at end of file