forked from ryyst/kalzu-value-store
fix: implement missing sync service methods for data replication
- Implemented fetchSingleKVFromPeer: HTTP client to fetch KV pairs from peers - Implemented getLocalData: Badger DB access for local data retrieval - Implemented deleteKVLocally: Local deletion with timestamp index cleanup - Implemented storeReplicatedDataWithMetadata: Preserves original UUID/timestamp - Implemented resolveConflict: Simple conflict resolution (newer timestamp wins) - Implemented fetchAndStoreRange: Fetches KV ranges for Merkle sync This fixes the critical data replication issue where sync was failing with "not implemented" errors. Integration tests now pass for data replication. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
149
cluster/sync.go
149
cluster/sync.go
@@ -292,30 +292,108 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add placeholder methods that would need to be implemented or injected
|
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, key string) (*types.StoredValue, error) {
|
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||||
// This would be implemented similar to the main.go version
|
client := &http.Client{Timeout: 5 * time.Second}
|
||||||
return nil, fmt.Errorf("not implemented")
|
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
||||||
|
|
||||||
|
resp, err := client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
return nil, nil // Key might have been deleted on the peer
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("peer returned status %d for path %s", resp.StatusCode, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncService) getLocalData(key string) (*types.StoredValue, bool) {
|
var storedValue types.StoredValue
|
||||||
// This would be implemented similar to the main.go version
|
if err := json.NewDecoder(resp.Body).Decode(&storedValue); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to decode types.StoredValue from peer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &storedValue, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLocalData is a utility to retrieve a types.StoredValue from local DB
|
||||||
|
func (s *SyncService) getLocalData(path string) (*types.StoredValue, bool) {
|
||||||
|
var storedValue types.StoredValue
|
||||||
|
err := s.db.View(func(txn *badger.Txn) error {
|
||||||
|
item, err := txn.Get([]byte(path))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return item.Value(func(val []byte) error {
|
||||||
|
return json.Unmarshal(val, &storedValue)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return &storedValue, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteKVLocally deletes a key-value pair and its associated timestamp index locally
|
||||||
func (s *SyncService) deleteKVLocally(key string, timestamp int64) error {
|
func (s *SyncService) deleteKVLocally(key string, timestamp int64) error {
|
||||||
// This would be implemented similar to the main.go version
|
return s.db.Update(func(txn *badger.Txn) error {
|
||||||
return fmt.Errorf("not implemented")
|
// Delete the main key
|
||||||
|
if err := txn.Delete([]byte(key)); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SyncService) storeReplicatedDataWithMetadata(key string, value *types.StoredValue) error {
|
// Delete the timestamp index
|
||||||
// This would be implemented similar to the main.go version
|
indexKey := fmt.Sprintf("_ts:%d:%s", timestamp, key)
|
||||||
return fmt.Errorf("not implemented")
|
return txn.Delete([]byte(indexKey))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeReplicatedDataWithMetadata stores replicated data preserving its original metadata
|
||||||
|
func (s *SyncService) storeReplicatedDataWithMetadata(path string, storedValue *types.StoredValue) error {
|
||||||
|
valueBytes, err := json.Marshal(storedValue)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.db.Update(func(txn *badger.Txn) error {
|
||||||
|
// Store main data
|
||||||
|
if err := txn.Set([]byte(path), valueBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store timestamp index
|
||||||
|
indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path)
|
||||||
|
return txn.Set([]byte(indexKey), []byte(storedValue.UUID))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveConflict performs simple conflict resolution (newer timestamp wins)
|
||||||
func (s *SyncService) resolveConflict(key string, local, remote *types.StoredValue, peerAddress string) error {
|
func (s *SyncService) resolveConflict(key string, local, remote *types.StoredValue, peerAddress string) error {
|
||||||
// This would be implemented similar to the main.go version
|
s.logger.WithFields(logrus.Fields{
|
||||||
return fmt.Errorf("not implemented")
|
"key": key,
|
||||||
|
"local_ts": local.Timestamp,
|
||||||
|
"remote_ts": remote.Timestamp,
|
||||||
|
"local_uuid": local.UUID,
|
||||||
|
"remote_uuid": remote.UUID,
|
||||||
|
}).Info("Resolving timestamp collision conflict")
|
||||||
|
|
||||||
|
if remote.Timestamp > local.Timestamp {
|
||||||
|
// Remote is newer, store it
|
||||||
|
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||||
|
if err == nil {
|
||||||
|
s.logger.WithField("key", key).Info("Conflict resolved: remote data wins (newer)")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Local is newer or equal, keep local data
|
||||||
|
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (newer or equal)")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||||
@@ -383,6 +461,47 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
|
|||||||
|
|
||||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||||
// This would be implemented similar to the main.go version
|
req := types.KVRangeRequest{
|
||||||
return fmt.Errorf("not implemented")
|
StartKey: startKey,
|
||||||
|
EndKey: endKey,
|
||||||
|
Limit: 0, // No limit
|
||||||
|
}
|
||||||
|
jsonData, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
||||||
|
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
||||||
|
|
||||||
|
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("peer returned status %d for KV range fetch", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rangeResp types.KVRangeResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&rangeResp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pair := range rangeResp.Pairs {
|
||||||
|
// Use storeReplicatedDataWithMetadata to preserve original UUID/Timestamp
|
||||||
|
if err := s.storeReplicatedDataWithMetadata(pair.Path, &pair.StoredValue); err != nil {
|
||||||
|
s.logger.WithError(err).WithFields(logrus.Fields{
|
||||||
|
"peer": peerAddress,
|
||||||
|
"path": pair.Path,
|
||||||
|
}).Error("Failed to store fetched range data")
|
||||||
|
} else {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"peer": peerAddress,
|
||||||
|
"path": pair.Path,
|
||||||
|
}).Debug("Stored data from fetched range")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
Reference in New Issue
Block a user