package main import ( "bytes" "context" "encoding/json" "fmt" "math/rand" "net" "net/http" "os" "os/signal" "path/filepath" "strconv" "strings" "sync" "syscall" "time" badger "github.com/dgraph-io/badger/v4" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) // Core data structures type StoredValue struct { UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` Data json.RawMessage `json:"data"` } type Member struct { ID string `json:"id"` Address string `json:"address"` LastSeen int64 `json:"last_seen"` JoinedTimestamp int64 `json:"joined_timestamp"` } type JoinRequest struct { ID string `json:"id"` Address string `json:"address"` JoinedTimestamp int64 `json:"joined_timestamp"` } type LeaveRequest struct { ID string `json:"id"` } type PairsByTimeRequest struct { StartTimestamp int64 `json:"start_timestamp"` EndTimestamp int64 `json:"end_timestamp"` Limit int `json:"limit"` Prefix string `json:"prefix,omitempty"` } type PairsByTimeResponse struct { Path string `json:"path"` UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` } type PutResponse struct { UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` } // Configuration type Config struct { NodeID string `yaml:"node_id"` BindAddress string `yaml:"bind_address"` Port int `yaml:"port"` DataDir string `yaml:"data_dir"` SeedNodes []string `yaml:"seed_nodes"` ReadOnly bool `yaml:"read_only"` LogLevel string `yaml:"log_level"` GossipIntervalMin int `yaml:"gossip_interval_min"` GossipIntervalMax int `yaml:"gossip_interval_max"` SyncInterval int `yaml:"sync_interval"` CatchupInterval int `yaml:"catchup_interval"` BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` ThrottleDelayMs int `yaml:"throttle_delay_ms"` FetchDelayMs int `yaml:"fetch_delay_ms"` } // Server represents the KVS node type Server struct { config *Config db *badger.DB members map[string]*Member membersMu sync.RWMutex mode string // "normal", "read-only", "syncing" modeMu sync.RWMutex logger *logrus.Logger httpServer *http.Server ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // Default configuration func defaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ NodeID: hostname, BindAddress: "127.0.0.1", Port: 8080, DataDir: "./data", SeedNodes: []string{}, ReadOnly: false, LogLevel: "info", GossipIntervalMin: 60, // 1 minute GossipIntervalMax: 120, // 2 minutes SyncInterval: 300, // 5 minutes CatchupInterval: 120, // 2 minutes BootstrapMaxAgeHours: 720, // 30 days ThrottleDelayMs: 100, FetchDelayMs: 50, } } // Load configuration from file or create default func loadConfig(configPath string) (*Config, error) { config := defaultConfig() if _, err := os.Stat(configPath); os.IsNotExist(err) { // Create default config file if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { return nil, fmt.Errorf("failed to create config directory: %v", err) } data, err := yaml.Marshal(config) if err != nil { return nil, fmt.Errorf("failed to marshal default config: %v", err) } if err := os.WriteFile(configPath, data, 0644); err != nil { return nil, fmt.Errorf("failed to write default config: %v", err) } fmt.Printf("Created default configuration at %s\n", configPath) return config, nil } data, err := os.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("failed to read config file: %v", err) } if err := yaml.Unmarshal(data, config); err != nil { return nil, fmt.Errorf("failed to parse config file: %v", err) } return config, nil } // Initialize server func NewServer(config *Config) (*Server, error) { logger := logrus.New() logger.SetFormatter(&logrus.JSONFormatter{}) level, err := logrus.ParseLevel(config.LogLevel) if err != nil { level = logrus.InfoLevel } logger.SetLevel(level) // Create data directory if err := os.MkdirAll(config.DataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create data directory: %v", err) } // Open BadgerDB opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger")) opts.Logger = nil // Disable badger's internal logging db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("failed to open BadgerDB: %v", err) } ctx, cancel := context.WithCancel(context.Background()) server := &Server{ config: config, db: db, members: make(map[string]*Member), mode: "normal", logger: logger, ctx: ctx, cancel: cancel, } if config.ReadOnly { server.setMode("read-only") } return server, nil } // Mode management func (s *Server) getMode() string { s.modeMu.RLock() defer s.modeMu.RUnlock() return s.mode } func (s *Server) setMode(mode string) { s.modeMu.Lock() defer s.modeMu.Unlock() oldMode := s.mode s.mode = mode s.logger.WithFields(logrus.Fields{ "old_mode": oldMode, "new_mode": mode, }).Info("Mode changed") } // Member management func (s *Server) addMember(member *Member) { s.membersMu.Lock() defer s.membersMu.Unlock() s.members[member.ID] = member s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member added") } func (s *Server) removeMember(nodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { delete(s.members, nodeID) s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member removed") } } func (s *Server) getMembers() []*Member { s.membersMu.RLock() defer s.membersMu.RUnlock() members := make([]*Member, 0, len(s.members)) for _, member := range s.members { members = append(members, member) } return members } // HTTP Handlers func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { mode := s.getMode() memberCount := len(s.getMembers()) health := map[string]interface{}{ "status": "ok", "mode": mode, "member_count": memberCount, "node_id": s.config.NodeID, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(health) } func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] var storedValue StoredValue err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(path)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) }) if err == badger.ErrKeyNotFound { http.Error(w, "Not Found", http.StatusNotFound) return } if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to get value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(storedValue.Data) } func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var data json.RawMessage if err := json.NewDecoder(r.Body).Decode(&data); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() newUUID := uuid.New().String() storedValue := StoredValue{ UUID: newUUID, Timestamp: now, Data: data, } valueBytes, err := json.Marshal(storedValue) if err != nil { s.logger.WithError(err).Error("Failed to marshal stored value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } var isUpdate bool err = s.db.Update(func(txn *badger.Txn) error { // Check if key exists _, err := txn.Get([]byte(path)) isUpdate = (err == nil) // Store main data if err := txn.Set([]byte(path), valueBytes); err != nil { return err } // Store timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) return txn.Set([]byte(indexKey), []byte(newUUID)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to store value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } response := PutResponse{ UUID: newUUID, Timestamp: now, } status := http.StatusCreated if isUpdate { status = http.StatusOK } w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(response) s.logger.WithFields(logrus.Fields{ "path": path, "uuid": newUUID, "timestamp": now, "is_update": isUpdate, }).Info("Value stored") } func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var found bool err := s.db.Update(func(txn *badger.Txn) error { // Check if key exists and get timestamp for index cleanup item, err := txn.Get([]byte(path)) if err == badger.ErrKeyNotFound { return nil } if err != nil { return err } found = true var storedValue StoredValue err = item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) if err != nil { return err } // Delete main data if err := txn.Delete([]byte(path)); err != nil { return err } // Delete timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path) return txn.Delete([]byte(indexKey)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to delete value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if !found { http.Error(w, "Not Found", http.StatusNotFound) return } w.WriteHeader(http.StatusNoContent) s.logger.WithField("path", path).Info("Value deleted") } func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) { members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) { var req JoinRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() member := &Member{ ID: req.ID, Address: req.Address, LastSeen: now, JoinedTimestamp: req.JoinedTimestamp, } s.addMember(member) // Return current member list members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) { var req LeaveRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } s.removeMember(req.ID) w.WriteHeader(http.StatusNoContent) } func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) { var req PairsByTimeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } // Default limit to 15 as per spec if req.Limit <= 0 { req.Limit = 15 } var pairs []PairsByTimeResponse err := s.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = req.Limit it := txn.NewIterator(opts) defer it.Close() prefix := []byte("_ts:") if req.Prefix != "" { // We need to scan through timestamp entries and filter by path prefix } for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() { item := it.Item() key := string(item.Key()) // Parse timestamp index key: "_ts:{timestamp}:{path}" parts := strings.SplitN(key, ":", 3) if len(parts) != 3 { continue } timestamp, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { continue } // Filter by timestamp range if req.StartTimestamp > 0 && timestamp < req.StartTimestamp { continue } if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp { continue } path := parts[2] // Filter by prefix if specified if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) { continue } var uuid string err = item.Value(func(val []byte) error { uuid = string(val) return nil }) if err != nil { continue } pairs = append(pairs, PairsByTimeResponse{ Path: path, UUID: uuid, Timestamp: timestamp, }) } return nil }) if err != nil { s.logger.WithError(err).Error("Failed to query pairs by time") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if len(pairs) == 0 { w.WriteHeader(http.StatusNoContent) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(pairs) } func (s *Server) gossipHandler(w http.ResponseWriter, r *http.Request) { var remoteMemberList []Member if err := json.NewDecoder(r.Body).Decode(&remoteMemberList); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } // Merge the received member list s.mergeMemberList(remoteMemberList) // Respond with our current member list localMembers := s.getMembers() gossipResponse := make([]Member, len(localMembers)) for i, member := range localMembers { gossipResponse[i] = *member } // Add ourselves to the response selfMember := Member{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), LastSeen: time.Now().UnixMilli(), JoinedTimestamp: s.getJoinedTimestamp(), } gossipResponse = append(gossipResponse, selfMember) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(gossipResponse) s.logger.WithField("remote_members", len(remoteMemberList)).Debug("Processed gossip request") } // Utility function to check if request is from cluster member func (s *Server) isClusterMember(remoteAddr string) bool { host, _, err := net.SplitHostPort(remoteAddr) if err != nil { return false } s.membersMu.RLock() defer s.membersMu.RUnlock() for _, member := range s.members { memberHost, _, err := net.SplitHostPort(member.Address) if err == nil && memberHost == host { return true } } return false } // Setup HTTP routes func (s *Server) setupRoutes() *mux.Router { router := mux.NewRouter() // Health endpoint router.HandleFunc("/health", s.healthHandler).Methods("GET") // KV endpoints router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE") // Member endpoints router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") return router } // Start the server func (s *Server) Start() error { router := s.setupRoutes() addr := fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port) s.httpServer = &http.Server{ Addr: addr, Handler: router, } s.logger.WithFields(logrus.Fields{ "node_id": s.config.NodeID, "address": addr, }).Info("Starting KVS server") // Start gossip and sync routines s.startBackgroundTasks() // Try to join cluster if seed nodes are configured if len(s.config.SeedNodes) > 0 { go s.bootstrap() } return s.httpServer.ListenAndServe() } // Stop the server gracefully func (s *Server) Stop() error { s.logger.Info("Shutting down KVS server") s.cancel() s.wg.Wait() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := s.httpServer.Shutdown(ctx); err != nil { s.logger.WithError(err).Error("HTTP server shutdown error") } if err := s.db.Close(); err != nil { s.logger.WithError(err).Error("BadgerDB close error") } return nil } // Background tasks (gossip, sync, etc.) func (s *Server) startBackgroundTasks() { // Start gossip routine s.wg.Add(1) go s.gossipRoutine() // Start sync routine s.wg.Add(1) go s.syncRoutine() } // Gossip routine - runs periodically to exchange member lists func (s *Server) gossipRoutine() { defer s.wg.Done() for { // Random interval between 1-2 minutes minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval))) select { case <-s.ctx.Done(): return case <-time.After(interval): s.performGossipRound() } } } // Perform a gossip round with random healthy peers func (s *Server) performGossipRound() { members := s.getHealthyMembers() if len(members) == 0 { s.logger.Debug("No healthy members for gossip round") return } // Select 1-3 random peers for gossip maxPeers := 3 if len(members) < maxPeers { maxPeers = len(members) } // Shuffle and select rand.Shuffle(len(members), func(i, j int) { members[i], members[j] = members[j], members[i] }) selectedPeers := members[:rand.Intn(maxPeers)+1] for _, peer := range selectedPeers { go s.gossipWithPeer(peer) } } // Gossip with a specific peer func (s *Server) gossipWithPeer(peer *Member) { s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer") // Get our current member list localMembers := s.getMembers() // Send our member list to the peer gossipData := make([]Member, len(localMembers)) for i, member := range localMembers { gossipData[i] = *member } // Add ourselves to the list selfMember := Member{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), LastSeen: time.Now().UnixMilli(), JoinedTimestamp: s.getJoinedTimestamp(), } gossipData = append(gossipData, selfMember) jsonData, err := json.Marshal(gossipData) if err != nil { s.logger.WithError(err).Error("Failed to marshal gossip data") return } // Send HTTP request to peer client := &http.Client{Timeout: 5 * time.Second} url := fmt.Sprintf("http://%s/members/gossip", peer.Address) resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { s.logger.WithFields(logrus.Fields{ "peer": peer.Address, "error": err.Error(), }).Warn("Failed to gossip with peer") s.markPeerUnhealthy(peer.ID) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s.logger.WithFields(logrus.Fields{ "peer": peer.Address, "status": resp.StatusCode, }).Warn("Gossip request failed") s.markPeerUnhealthy(peer.ID) return } // Process response - peer's member list var remoteMemberList []Member if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil { s.logger.WithError(err).Error("Failed to decode gossip response") return } // Merge remote member list with our local list s.mergeMemberList(remoteMemberList) // Update peer's last seen timestamp s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli()) s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer") } // Get healthy members (exclude those marked as down) func (s *Server) getHealthyMembers() []*Member { s.membersMu.RLock() defer s.membersMu.RUnlock() now := time.Now().UnixMilli() healthyMembers := make([]*Member, 0) for _, member := range s.members { // Consider member healthy if last seen within last 5 minutes if now-member.LastSeen < 5*60*1000 { healthyMembers = append(healthyMembers, member) } } return healthyMembers } // Mark a peer as unhealthy func (s *Server) markPeerUnhealthy(nodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { // Mark as last seen a long time ago to indicate unhealthy member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy") } } // Update member's last seen timestamp func (s *Server) updateMemberLastSeen(nodeID string, timestamp int64) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { member.LastSeen = timestamp } } // Merge remote member list with local member list func (s *Server) mergeMemberList(remoteMembers []Member) { s.membersMu.Lock() defer s.membersMu.Unlock() now := time.Now().UnixMilli() for _, remoteMember := range remoteMembers { // Skip ourselves if remoteMember.ID == s.config.NodeID { continue } if localMember, exists := s.members[remoteMember.ID]; exists { // Update existing member if remoteMember.LastSeen > localMember.LastSeen { localMember.LastSeen = remoteMember.LastSeen } // Keep the earlier joined timestamp if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { localMember.JoinedTimestamp = remoteMember.JoinedTimestamp } } else { // Add new member newMember := &Member{ ID: remoteMember.ID, Address: remoteMember.Address, LastSeen: remoteMember.LastSeen, JoinedTimestamp: remoteMember.JoinedTimestamp, } s.members[remoteMember.ID] = newMember s.logger.WithFields(logrus.Fields{ "node_id": remoteMember.ID, "address": remoteMember.Address, }).Info("Discovered new member through gossip") } } // Clean up old members (not seen for more than 10 minutes) toRemove := make([]string, 0) for nodeID, member := range s.members { if now-member.LastSeen > 10*60*1000 { // 10 minutes toRemove = append(toRemove, nodeID) } } for _, nodeID := range toRemove { delete(s.members, nodeID) s.logger.WithField("node_id", nodeID).Info("Removed stale member") } } // Get this node's joined timestamp (startup time) func (s *Server) getJoinedTimestamp() int64 { // For now, use a simple approach - this should be stored persistently return time.Now().UnixMilli() } // Sync routine - handles regular and catch-up syncing func (s *Server) syncRoutine() { defer s.wg.Done() syncTicker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second) defer syncTicker.Stop() for { select { case <-s.ctx.Done(): return case <-syncTicker.C: s.performRegularSync() } } } // Perform regular 5-minute sync func (s *Server) performRegularSync() { members := s.getHealthyMembers() if len(members) == 0 { s.logger.Debug("No healthy members for sync") return } // Select random peer peer := members[rand.Intn(len(members))] s.logger.WithField("peer", peer.Address).Info("Starting regular sync") // Request latest 15 UUIDs req := PairsByTimeRequest{ StartTimestamp: 0, EndTimestamp: 0, // Current time Limit: 15, } remotePairs, err := s.requestPairsByTime(peer.Address, req) if err != nil { s.logger.WithError(err).WithField("peer", peer.Address).Error("Failed to sync with peer") s.markPeerUnhealthy(peer.ID) return } // Compare with our local data and fetch missing/newer data s.syncDataFromPairs(peer.Address, remotePairs) s.logger.WithField("peer", peer.Address).Info("Completed regular sync") } // Request pairs by time from a peer func (s *Server) requestPairsByTime(peerAddress string, req PairsByTimeRequest) ([]PairsByTimeResponse, error) { jsonData, err := json.Marshal(req) if err != nil { return nil, err } client := &http.Client{Timeout: 10 * time.Second} url := fmt.Sprintf("http://%s/members/pairs_by_time", peerAddress) resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNoContent { return []PairsByTimeResponse{}, nil } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("peer returned status %d", resp.StatusCode) } var pairs []PairsByTimeResponse if err := json.NewDecoder(resp.Body).Decode(&pairs); err != nil { return nil, err } return pairs, nil } // Sync data from pairs - fetch missing or newer data func (s *Server) syncDataFromPairs(peerAddress string, remotePairs []PairsByTimeResponse) { for _, remotePair := range remotePairs { // Check our local version localData, localExists := s.getLocalData(remotePair.Path) shouldFetch := false if !localExists { shouldFetch = true s.logger.WithField("path", remotePair.Path).Debug("Missing local data, will fetch") } else if localData.Timestamp < remotePair.Timestamp { shouldFetch = true s.logger.WithFields(logrus.Fields{ "path": remotePair.Path, "local_timestamp": localData.Timestamp, "remote_timestamp": remotePair.Timestamp, }).Debug("Local data is older, will fetch") } else if localData.Timestamp == remotePair.Timestamp && localData.UUID != remotePair.UUID { // Timestamp collision - need conflict resolution s.logger.WithFields(logrus.Fields{ "path": remotePair.Path, "timestamp": remotePair.Timestamp, "local_uuid": localData.UUID, "remote_uuid": remotePair.UUID, }).Warn("Timestamp collision detected, implementing conflict resolution") // TODO: Implement conflict resolution logic continue } if shouldFetch { if err := s.fetchAndStoreData(peerAddress, remotePair.Path); err != nil { s.logger.WithError(err).WithFields(logrus.Fields{ "peer": peerAddress, "path": remotePair.Path, }).Error("Failed to fetch data from peer") } } } } // Get local data for a path func (s *Server) getLocalData(path string) (*StoredValue, bool) { var storedValue StoredValue err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(path)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) }) if err != nil { return nil, false } return &storedValue, true } // Fetch and store data from peer func (s *Server) fetchAndStoreData(peerAddress, path string) error { client := &http.Client{Timeout: 5 * time.Second} url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) resp, err := client.Get(url) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("peer returned status %d for path %s", resp.StatusCode, path) } var data json.RawMessage if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { return err } // Store the data using our internal storage mechanism return s.storeReplicatedData(path, data) } // Store replicated data (internal storage without timestamp/UUID generation) func (s *Server) storeReplicatedData(path string, data json.RawMessage) error { // For now, we'll generate new timestamp/UUID - in full implementation, // we'd need to preserve the original metadata from the source now := time.Now().UnixMilli() newUUID := uuid.New().String() storedValue := StoredValue{ UUID: newUUID, Timestamp: now, Data: data, } valueBytes, err := json.Marshal(storedValue) if err != nil { return err } return s.db.Update(func(txn *badger.Txn) error { // Store main data if err := txn.Set([]byte(path), valueBytes); err != nil { return err } // Store timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) return txn.Set([]byte(indexKey), []byte(newUUID)) }) } // Bootstrap - join cluster using seed nodes func (s *Server) bootstrap() { if len(s.config.SeedNodes) == 0 { s.logger.Info("No seed nodes configured, running as standalone") return } s.logger.Info("Starting bootstrap process") s.setMode("syncing") // Try to join via each seed node joined := false for _, seedAddr := range s.config.SeedNodes { if s.attemptJoin(seedAddr) { joined = true break } } if !joined { s.logger.Warn("Failed to join cluster via seed nodes, running as standalone") s.setMode("normal") return } // Wait a bit for member discovery time.Sleep(2 * time.Second) // Perform gradual sync s.performGradualSync() // Switch to normal mode s.setMode("normal") s.logger.Info("Bootstrap completed, entering normal mode") } // Attempt to join cluster via a seed node func (s *Server) attemptJoin(seedAddr string) bool { joinReq := JoinRequest{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), JoinedTimestamp: time.Now().UnixMilli(), } jsonData, err := json.Marshal(joinReq) if err != nil { s.logger.WithError(err).Error("Failed to marshal join request") return false } client := &http.Client{Timeout: 10 * time.Second} url := fmt.Sprintf("http://%s/members/join", seedAddr) resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "error": err.Error(), }).Warn("Failed to contact seed node") return false } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "status": resp.StatusCode, }).Warn("Seed node rejected join request") return false } // Process member list response var memberList []Member if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil { s.logger.WithError(err).Error("Failed to decode member list from seed") return false } // Add all members to our local list for _, member := range memberList { if member.ID != s.config.NodeID { s.addMember(&member) } } s.logger.WithFields(logrus.Fields{ "seed": seedAddr, "member_count": len(memberList), }).Info("Successfully joined cluster") return true } // Perform gradual sync (simplified version) func (s *Server) performGradualSync() { s.logger.Info("Starting gradual sync") members := s.getHealthyMembers() if len(members) == 0 { s.logger.Info("No healthy members for gradual sync") return } // For now, just do a few rounds of regular sync for i := 0; i < 3; i++ { s.performRegularSync() time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond) } s.logger.Info("Gradual sync completed") } func main() { configPath := "./config.yaml" // Simple CLI argument parsing if len(os.Args) > 1 { configPath = os.Args[1] } config, err := loadConfig(configPath) if err != nil { fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err) os.Exit(1) } server, err := NewServer(config) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err) os.Exit(1) } // Handle graceful shutdown sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh server.Stop() }() if err := server.Start(); err != nil && err != http.ErrServerClosed { fmt.Fprintf(os.Stderr, "Server error: %v\n", err) os.Exit(1) } }