diff --git a/main.go b/main.go index 00babb1..458db67 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,11 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" + "math/rand" "net" "net/http" "os" @@ -565,6 +567,38 @@ func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(pairs) } +func (s *Server) gossipHandler(w http.ResponseWriter, r *http.Request) { + var remoteMemberList []Member + if err := json.NewDecoder(r.Body).Decode(&remoteMemberList); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + // Merge the received member list + s.mergeMemberList(remoteMemberList) + + // Respond with our current member list + localMembers := s.getMembers() + gossipResponse := make([]Member, len(localMembers)) + for i, member := range localMembers { + gossipResponse[i] = *member + } + + // Add ourselves to the response + selfMember := Member{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + LastSeen: time.Now().UnixMilli(), + JoinedTimestamp: s.getJoinedTimestamp(), + } + gossipResponse = append(gossipResponse, selfMember) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(gossipResponse) + + s.logger.WithField("remote_members", len(remoteMemberList)).Debug("Processed gossip request") +} + // Utility function to check if request is from cluster member func (s *Server) isClusterMember(remoteAddr string) bool { host, _, err := net.SplitHostPort(remoteAddr) @@ -601,6 +635,7 @@ func (s *Server) setupRoutes() *mux.Router { router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") + router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") return router @@ -653,16 +688,535 @@ func (s *Server) Stop() error { return nil } -// Background tasks placeholder (gossip, sync, etc.) +// Background tasks (gossip, sync, etc.) func (s *Server) startBackgroundTasks() { - // TODO: Implement gossip protocol - // TODO: Implement periodic sync - // TODO: Implement catch-up sync + // Start gossip routine + s.wg.Add(1) + go s.gossipRoutine() + + // Start sync routine + s.wg.Add(1) + go s.syncRoutine() } -// Bootstrap placeholder +// Gossip routine - runs periodically to exchange member lists +func (s *Server) gossipRoutine() { + defer s.wg.Done() + + for { + // Random interval between 1-2 minutes + minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second + maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second + interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval))) + + select { + case <-s.ctx.Done(): + return + case <-time.After(interval): + s.performGossipRound() + } + } +} + +// Perform a gossip round with random healthy peers +func (s *Server) performGossipRound() { + members := s.getHealthyMembers() + if len(members) == 0 { + s.logger.Debug("No healthy members for gossip round") + return + } + + // Select 1-3 random peers for gossip + maxPeers := 3 + if len(members) < maxPeers { + maxPeers = len(members) + } + + // Shuffle and select + rand.Shuffle(len(members), func(i, j int) { + members[i], members[j] = members[j], members[i] + }) + + selectedPeers := members[:rand.Intn(maxPeers)+1] + + for _, peer := range selectedPeers { + go s.gossipWithPeer(peer) + } +} + +// Gossip with a specific peer +func (s *Server) gossipWithPeer(peer *Member) { + s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer") + + // Get our current member list + localMembers := s.getMembers() + + // Send our member list to the peer + gossipData := make([]Member, len(localMembers)) + for i, member := range localMembers { + gossipData[i] = *member + } + + // Add ourselves to the list + selfMember := Member{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + LastSeen: time.Now().UnixMilli(), + JoinedTimestamp: s.getJoinedTimestamp(), + } + gossipData = append(gossipData, selfMember) + + jsonData, err := json.Marshal(gossipData) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal gossip data") + return + } + + // Send HTTP request to peer + client := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://%s/members/gossip", peer.Address) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithFields(logrus.Fields{ + "peer": peer.Address, + "error": err.Error(), + }).Warn("Failed to gossip with peer") + s.markPeerUnhealthy(peer.ID) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s.logger.WithFields(logrus.Fields{ + "peer": peer.Address, + "status": resp.StatusCode, + }).Warn("Gossip request failed") + s.markPeerUnhealthy(peer.ID) + return + } + + // Process response - peer's member list + var remoteMemberList []Member + if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil { + s.logger.WithError(err).Error("Failed to decode gossip response") + return + } + + // Merge remote member list with our local list + s.mergeMemberList(remoteMemberList) + + // Update peer's last seen timestamp + s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli()) + + s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer") +} + +// Get healthy members (exclude those marked as down) +func (s *Server) getHealthyMembers() []*Member { + s.membersMu.RLock() + defer s.membersMu.RUnlock() + + now := time.Now().UnixMilli() + healthyMembers := make([]*Member, 0) + + for _, member := range s.members { + // Consider member healthy if last seen within last 5 minutes + if now-member.LastSeen < 5*60*1000 { + healthyMembers = append(healthyMembers, member) + } + } + + return healthyMembers +} + +// Mark a peer as unhealthy +func (s *Server) markPeerUnhealthy(nodeID string) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + if member, exists := s.members[nodeID]; exists { + // Mark as last seen a long time ago to indicate unhealthy + member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago + s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy") + } +} + +// Update member's last seen timestamp +func (s *Server) updateMemberLastSeen(nodeID string, timestamp int64) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + if member, exists := s.members[nodeID]; exists { + member.LastSeen = timestamp + } +} + +// Merge remote member list with local member list +func (s *Server) mergeMemberList(remoteMembers []Member) { + s.membersMu.Lock() + defer s.membersMu.Unlock() + + now := time.Now().UnixMilli() + + for _, remoteMember := range remoteMembers { + // Skip ourselves + if remoteMember.ID == s.config.NodeID { + continue + } + + if localMember, exists := s.members[remoteMember.ID]; exists { + // Update existing member + if remoteMember.LastSeen > localMember.LastSeen { + localMember.LastSeen = remoteMember.LastSeen + } + // Keep the earlier joined timestamp + if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { + localMember.JoinedTimestamp = remoteMember.JoinedTimestamp + } + } else { + // Add new member + newMember := &Member{ + ID: remoteMember.ID, + Address: remoteMember.Address, + LastSeen: remoteMember.LastSeen, + JoinedTimestamp: remoteMember.JoinedTimestamp, + } + s.members[remoteMember.ID] = newMember + s.logger.WithFields(logrus.Fields{ + "node_id": remoteMember.ID, + "address": remoteMember.Address, + }).Info("Discovered new member through gossip") + } + } + + // Clean up old members (not seen for more than 10 minutes) + toRemove := make([]string, 0) + for nodeID, member := range s.members { + if now-member.LastSeen > 10*60*1000 { // 10 minutes + toRemove = append(toRemove, nodeID) + } + } + + for _, nodeID := range toRemove { + delete(s.members, nodeID) + s.logger.WithField("node_id", nodeID).Info("Removed stale member") + } +} + +// Get this node's joined timestamp (startup time) +func (s *Server) getJoinedTimestamp() int64 { + // For now, use a simple approach - this should be stored persistently + return time.Now().UnixMilli() +} + +// Sync routine - handles regular and catch-up syncing +func (s *Server) syncRoutine() { + defer s.wg.Done() + + syncTicker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second) + defer syncTicker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-syncTicker.C: + s.performRegularSync() + } + } +} + +// Perform regular 5-minute sync +func (s *Server) performRegularSync() { + members := s.getHealthyMembers() + if len(members) == 0 { + s.logger.Debug("No healthy members for sync") + return + } + + // Select random peer + peer := members[rand.Intn(len(members))] + + s.logger.WithField("peer", peer.Address).Info("Starting regular sync") + + // Request latest 15 UUIDs + req := PairsByTimeRequest{ + StartTimestamp: 0, + EndTimestamp: 0, // Current time + Limit: 15, + } + + remotePairs, err := s.requestPairsByTime(peer.Address, req) + if err != nil { + s.logger.WithError(err).WithField("peer", peer.Address).Error("Failed to sync with peer") + s.markPeerUnhealthy(peer.ID) + return + } + + // Compare with our local data and fetch missing/newer data + s.syncDataFromPairs(peer.Address, remotePairs) + + s.logger.WithField("peer", peer.Address).Info("Completed regular sync") +} + +// Request pairs by time from a peer +func (s *Server) requestPairsByTime(peerAddress string, req PairsByTimeRequest) ([]PairsByTimeResponse, error) { + jsonData, err := json.Marshal(req) + if err != nil { + return nil, err + } + + client := &http.Client{Timeout: 10 * time.Second} + url := fmt.Sprintf("http://%s/members/pairs_by_time", peerAddress) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return []PairsByTimeResponse{}, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("peer returned status %d", resp.StatusCode) + } + + var pairs []PairsByTimeResponse + if err := json.NewDecoder(resp.Body).Decode(&pairs); err != nil { + return nil, err + } + + return pairs, nil +} + +// Sync data from pairs - fetch missing or newer data +func (s *Server) syncDataFromPairs(peerAddress string, remotePairs []PairsByTimeResponse) { + for _, remotePair := range remotePairs { + // Check our local version + localData, localExists := s.getLocalData(remotePair.Path) + + shouldFetch := false + if !localExists { + shouldFetch = true + s.logger.WithField("path", remotePair.Path).Debug("Missing local data, will fetch") + } else if localData.Timestamp < remotePair.Timestamp { + shouldFetch = true + s.logger.WithFields(logrus.Fields{ + "path": remotePair.Path, + "local_timestamp": localData.Timestamp, + "remote_timestamp": remotePair.Timestamp, + }).Debug("Local data is older, will fetch") + } else if localData.Timestamp == remotePair.Timestamp && localData.UUID != remotePair.UUID { + // Timestamp collision - need conflict resolution + s.logger.WithFields(logrus.Fields{ + "path": remotePair.Path, + "timestamp": remotePair.Timestamp, + "local_uuid": localData.UUID, + "remote_uuid": remotePair.UUID, + }).Warn("Timestamp collision detected, implementing conflict resolution") + // TODO: Implement conflict resolution logic + continue + } + + if shouldFetch { + if err := s.fetchAndStoreData(peerAddress, remotePair.Path); err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "peer": peerAddress, + "path": remotePair.Path, + }).Error("Failed to fetch data from peer") + } + } + } +} + +// Get local data for a path +func (s *Server) getLocalData(path string) (*StoredValue, bool) { + var storedValue StoredValue + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(path)) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &storedValue) + }) + }) + + if err != nil { + return nil, false + } + + return &storedValue, true +} + +// Fetch and store data from peer +func (s *Server) fetchAndStoreData(peerAddress, path string) error { + client := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) + + resp, err := client.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("peer returned status %d for path %s", resp.StatusCode, path) + } + + var data json.RawMessage + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return err + } + + // Store the data using our internal storage mechanism + return s.storeReplicatedData(path, data) +} + +// Store replicated data (internal storage without timestamp/UUID generation) +func (s *Server) storeReplicatedData(path string, data json.RawMessage) error { + // For now, we'll generate new timestamp/UUID - in full implementation, + // we'd need to preserve the original metadata from the source + now := time.Now().UnixMilli() + newUUID := uuid.New().String() + + storedValue := StoredValue{ + UUID: newUUID, + Timestamp: now, + Data: data, + } + + valueBytes, err := json.Marshal(storedValue) + if err != nil { + return err + } + + return s.db.Update(func(txn *badger.Txn) error { + // Store main data + if err := txn.Set([]byte(path), valueBytes); err != nil { + return err + } + + // Store timestamp index + indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) + return txn.Set([]byte(indexKey), []byte(newUUID)) + }) +} + +// Bootstrap - join cluster using seed nodes func (s *Server) bootstrap() { - // TODO: Implement gradual bootstrapping + if len(s.config.SeedNodes) == 0 { + s.logger.Info("No seed nodes configured, running as standalone") + return + } + + s.logger.Info("Starting bootstrap process") + s.setMode("syncing") + + // Try to join via each seed node + joined := false + for _, seedAddr := range s.config.SeedNodes { + if s.attemptJoin(seedAddr) { + joined = true + break + } + } + + if !joined { + s.logger.Warn("Failed to join cluster via seed nodes, running as standalone") + s.setMode("normal") + return + } + + // Wait a bit for member discovery + time.Sleep(2 * time.Second) + + // Perform gradual sync + s.performGradualSync() + + // Switch to normal mode + s.setMode("normal") + s.logger.Info("Bootstrap completed, entering normal mode") +} + +// Attempt to join cluster via a seed node +func (s *Server) attemptJoin(seedAddr string) bool { + joinReq := JoinRequest{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + JoinedTimestamp: time.Now().UnixMilli(), + } + + jsonData, err := json.Marshal(joinReq) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal join request") + return false + } + + client := &http.Client{Timeout: 10 * time.Second} + url := fmt.Sprintf("http://%s/members/join", seedAddr) + + resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "error": err.Error(), + }).Warn("Failed to contact seed node") + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "status": resp.StatusCode, + }).Warn("Seed node rejected join request") + return false + } + + // Process member list response + var memberList []Member + if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil { + s.logger.WithError(err).Error("Failed to decode member list from seed") + return false + } + + // Add all members to our local list + for _, member := range memberList { + if member.ID != s.config.NodeID { + s.addMember(&member) + } + } + + s.logger.WithFields(logrus.Fields{ + "seed": seedAddr, + "member_count": len(memberList), + }).Info("Successfully joined cluster") + + return true +} + +// Perform gradual sync (simplified version) +func (s *Server) performGradualSync() { + s.logger.Info("Starting gradual sync") + + members := s.getHealthyMembers() + if len(members) == 0 { + s.logger.Info("No healthy members for gradual sync") + return + } + + // For now, just do a few rounds of regular sync + for i := 0; i < 3; i++ { + s.performRegularSync() + time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond) + } + + s.logger.Info("Gradual sync completed") } func main() {