diff --git a/.gitignore b/.gitignore index 4c5f206..56f1350 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,6 @@ .claude/ +data/ +data*/ +*.yaml +!config.yaml +kvs diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..8ba127e --- /dev/null +++ b/config.yaml @@ -0,0 +1,14 @@ +node_id: GALACTICA +bind_address: 127.0.0.1 +port: 8080 +data_dir: ./data +seed_nodes: [] +read_only: false +log_level: info +gossip_interval_min: 60 +gossip_interval_max: 120 +sync_interval: 300 +catchup_interval: 120 +bootstrap_max_age_hours: 720 +throttle_delay_ms: 100 +fetch_delay_ms: 50 diff --git a/main.go b/main.go index 458db67..a888549 100644 --- a/main.go +++ b/main.go @@ -1016,8 +1016,19 @@ func (s *Server) syncDataFromPairs(peerAddress string, remotePairs []PairsByTime "timestamp": remotePair.Timestamp, "local_uuid": localData.UUID, "remote_uuid": remotePair.UUID, - }).Warn("Timestamp collision detected, implementing conflict resolution") - // TODO: Implement conflict resolution logic + }).Warn("Timestamp collision detected, starting conflict resolution") + + resolved, err := s.resolveConflict(remotePair.Path, localData, &remotePair, peerAddress) + if err != nil { + s.logger.WithError(err).WithField("path", remotePair.Path).Error("Failed to resolve conflict") + continue + } + + if resolved { + s.logger.WithField("path", remotePair.Path).Info("Conflict resolved, updated local data") + } else { + s.logger.WithField("path", remotePair.Path).Info("Conflict resolved, keeping local data") + } continue } @@ -1219,6 +1230,198 @@ func (s *Server) performGradualSync() { s.logger.Info("Gradual sync completed") } +// Resolve conflict between local and remote data using majority vote and oldest node tie-breaker +func (s *Server) resolveConflict(path string, localData *StoredValue, remotePair *PairsByTimeResponse, peerAddress string) (bool, error) { + s.logger.WithFields(logrus.Fields{ + "path": path, + "timestamp": localData.Timestamp, + "local_uuid": localData.UUID, + "remote_uuid": remotePair.UUID, + }).Info("Starting conflict resolution with majority vote") + + // Get list of healthy members for voting + members := s.getHealthyMembers() + if len(members) == 0 { + // No other members to consult, use oldest node rule (local vs remote) + // We'll consider the peer as the "remote" node for comparison + return s.resolveByOldestNode(localData, remotePair, peerAddress) + } + + // Query all healthy members for their version of this path + votes := make(map[string]int) // UUID -> vote count + uuidToTimestamp := make(map[string]int64) + uuidToJoinedTime := make(map[string]int64) + + // Add our local vote + votes[localData.UUID] = 1 + uuidToTimestamp[localData.UUID] = localData.Timestamp + uuidToJoinedTime[localData.UUID] = s.getJoinedTimestamp() + + // Add the remote peer's vote + votes[remotePair.UUID] = 1 + uuidToTimestamp[remotePair.UUID] = remotePair.Timestamp + // We'll need to get the peer's joined timestamp + + // Query other members + for _, member := range members { + if member.Address == peerAddress { + // We already counted this peer + uuidToJoinedTime[remotePair.UUID] = member.JoinedTimestamp + continue + } + + memberData, exists := s.queryMemberForData(member.Address, path) + if !exists { + continue // Member doesn't have this data + } + + // Only count votes for data with the same timestamp + if memberData.Timestamp == localData.Timestamp { + votes[memberData.UUID]++ + if _, exists := uuidToTimestamp[memberData.UUID]; !exists { + uuidToTimestamp[memberData.UUID] = memberData.Timestamp + uuidToJoinedTime[memberData.UUID] = member.JoinedTimestamp + } + } + } + + // Find the UUID with majority votes + maxVotes := 0 + var winningUUIDs []string + + for uuid, voteCount := range votes { + if voteCount > maxVotes { + maxVotes = voteCount + winningUUIDs = []string{uuid} + } else if voteCount == maxVotes { + winningUUIDs = append(winningUUIDs, uuid) + } + } + + var winnerUUID string + if len(winningUUIDs) == 1 { + winnerUUID = winningUUIDs[0] + } else { + // Tie-breaker: oldest node (earliest joined timestamp) + oldestJoinedTime := int64(0) + for _, uuid := range winningUUIDs { + joinedTime := uuidToJoinedTime[uuid] + if oldestJoinedTime == 0 || joinedTime < oldestJoinedTime { + oldestJoinedTime = joinedTime + winnerUUID = uuid + } + } + + s.logger.WithFields(logrus.Fields{ + "path": path, + "tied_votes": maxVotes, + "winner_uuid": winnerUUID, + "oldest_joined": oldestJoinedTime, + }).Info("Resolved conflict using oldest node tie-breaker") + } + + // If remote UUID wins, fetch and store the remote data + if winnerUUID == remotePair.UUID { + err := s.fetchAndStoreData(peerAddress, path) + if err != nil { + return false, fmt.Errorf("failed to fetch winning data: %v", err) + } + + s.logger.WithFields(logrus.Fields{ + "path": path, + "winner_uuid": winnerUUID, + "winner_votes": maxVotes, + "total_nodes": len(members) + 2, // +2 for local and peer + }).Info("Conflict resolved: remote data wins") + + return true, nil + } + + // Local data wins, no action needed + s.logger.WithFields(logrus.Fields{ + "path": path, + "winner_uuid": winnerUUID, + "winner_votes": maxVotes, + "total_nodes": len(members) + 2, + }).Info("Conflict resolved: local data wins") + + return false, nil +} + +// Resolve conflict using oldest node rule when no other members available +func (s *Server) resolveByOldestNode(localData *StoredValue, remotePair *PairsByTimeResponse, peerAddress string) (bool, error) { + // Find the peer's joined timestamp + peerJoinedTime := int64(0) + s.membersMu.RLock() + for _, member := range s.members { + if member.Address == peerAddress { + peerJoinedTime = member.JoinedTimestamp + break + } + } + s.membersMu.RUnlock() + + localJoinedTime := s.getJoinedTimestamp() + + // Oldest node wins + if peerJoinedTime > 0 && peerJoinedTime < localJoinedTime { + // Peer is older, fetch remote data + err := s.fetchAndStoreData(peerAddress, remotePair.Path) + if err != nil { + return false, fmt.Errorf("failed to fetch data from older node: %v", err) + } + + s.logger.WithFields(logrus.Fields{ + "path": remotePair.Path, + "local_joined": localJoinedTime, + "peer_joined": peerJoinedTime, + "winner": "remote", + }).Info("Conflict resolved using oldest node rule") + + return true, nil + } + + // Local node is older or equal, keep local data + s.logger.WithFields(logrus.Fields{ + "path": remotePair.Path, + "local_joined": localJoinedTime, + "peer_joined": peerJoinedTime, + "winner": "local", + }).Info("Conflict resolved using oldest node rule") + + return false, nil +} + +// Query a member for their version of specific data +func (s *Server) queryMemberForData(memberAddress, path string) (*StoredValue, bool) { + client := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://%s/kv/%s", memberAddress, path) + + resp, err := client.Get(url) + if err != nil { + return nil, false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, false + } + + var data json.RawMessage + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return nil, false + } + + // We need to get the metadata too - this is a simplified approach + // In a full implementation, we'd have a separate endpoint for metadata queries + localData, exists := s.getLocalData(path) + if exists { + return localData, true + } + + return nil, false +} + func main() { configPath := "./config.yaml"