package main import ( "context" "encoding/json" "fmt" "net" "net/http" "os" "os/signal" "path/filepath" "strconv" "strings" "sync" "syscall" "time" badger "github.com/dgraph-io/badger/v4" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) // Core data structures type StoredValue struct { UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` Data json.RawMessage `json:"data"` } type Member struct { ID string `json:"id"` Address string `json:"address"` LastSeen int64 `json:"last_seen"` JoinedTimestamp int64 `json:"joined_timestamp"` } type JoinRequest struct { ID string `json:"id"` Address string `json:"address"` JoinedTimestamp int64 `json:"joined_timestamp"` } type LeaveRequest struct { ID string `json:"id"` } type PairsByTimeRequest struct { StartTimestamp int64 `json:"start_timestamp"` EndTimestamp int64 `json:"end_timestamp"` Limit int `json:"limit"` Prefix string `json:"prefix,omitempty"` } type PairsByTimeResponse struct { Path string `json:"path"` UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` } type PutResponse struct { UUID string `json:"uuid"` Timestamp int64 `json:"timestamp"` } // Configuration type Config struct { NodeID string `yaml:"node_id"` BindAddress string `yaml:"bind_address"` Port int `yaml:"port"` DataDir string `yaml:"data_dir"` SeedNodes []string `yaml:"seed_nodes"` ReadOnly bool `yaml:"read_only"` LogLevel string `yaml:"log_level"` GossipIntervalMin int `yaml:"gossip_interval_min"` GossipIntervalMax int `yaml:"gossip_interval_max"` SyncInterval int `yaml:"sync_interval"` CatchupInterval int `yaml:"catchup_interval"` BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` ThrottleDelayMs int `yaml:"throttle_delay_ms"` FetchDelayMs int `yaml:"fetch_delay_ms"` } // Server represents the KVS node type Server struct { config *Config db *badger.DB members map[string]*Member membersMu sync.RWMutex mode string // "normal", "read-only", "syncing" modeMu sync.RWMutex logger *logrus.Logger httpServer *http.Server ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // Default configuration func defaultConfig() *Config { hostname, _ := os.Hostname() return &Config{ NodeID: hostname, BindAddress: "127.0.0.1", Port: 8080, DataDir: "./data", SeedNodes: []string{}, ReadOnly: false, LogLevel: "info", GossipIntervalMin: 60, // 1 minute GossipIntervalMax: 120, // 2 minutes SyncInterval: 300, // 5 minutes CatchupInterval: 120, // 2 minutes BootstrapMaxAgeHours: 720, // 30 days ThrottleDelayMs: 100, FetchDelayMs: 50, } } // Load configuration from file or create default func loadConfig(configPath string) (*Config, error) { config := defaultConfig() if _, err := os.Stat(configPath); os.IsNotExist(err) { // Create default config file if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { return nil, fmt.Errorf("failed to create config directory: %v", err) } data, err := yaml.Marshal(config) if err != nil { return nil, fmt.Errorf("failed to marshal default config: %v", err) } if err := os.WriteFile(configPath, data, 0644); err != nil { return nil, fmt.Errorf("failed to write default config: %v", err) } fmt.Printf("Created default configuration at %s\n", configPath) return config, nil } data, err := os.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("failed to read config file: %v", err) } if err := yaml.Unmarshal(data, config); err != nil { return nil, fmt.Errorf("failed to parse config file: %v", err) } return config, nil } // Initialize server func NewServer(config *Config) (*Server, error) { logger := logrus.New() logger.SetFormatter(&logrus.JSONFormatter{}) level, err := logrus.ParseLevel(config.LogLevel) if err != nil { level = logrus.InfoLevel } logger.SetLevel(level) // Create data directory if err := os.MkdirAll(config.DataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create data directory: %v", err) } // Open BadgerDB opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger")) opts.Logger = nil // Disable badger's internal logging db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("failed to open BadgerDB: %v", err) } ctx, cancel := context.WithCancel(context.Background()) server := &Server{ config: config, db: db, members: make(map[string]*Member), mode: "normal", logger: logger, ctx: ctx, cancel: cancel, } if config.ReadOnly { server.setMode("read-only") } return server, nil } // Mode management func (s *Server) getMode() string { s.modeMu.RLock() defer s.modeMu.RUnlock() return s.mode } func (s *Server) setMode(mode string) { s.modeMu.Lock() defer s.modeMu.Unlock() oldMode := s.mode s.mode = mode s.logger.WithFields(logrus.Fields{ "old_mode": oldMode, "new_mode": mode, }).Info("Mode changed") } // Member management func (s *Server) addMember(member *Member) { s.membersMu.Lock() defer s.membersMu.Unlock() s.members[member.ID] = member s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member added") } func (s *Server) removeMember(nodeID string) { s.membersMu.Lock() defer s.membersMu.Unlock() if member, exists := s.members[nodeID]; exists { delete(s.members, nodeID) s.logger.WithFields(logrus.Fields{ "node_id": member.ID, "address": member.Address, }).Info("Member removed") } } func (s *Server) getMembers() []*Member { s.membersMu.RLock() defer s.membersMu.RUnlock() members := make([]*Member, 0, len(s.members)) for _, member := range s.members { members = append(members, member) } return members } // HTTP Handlers func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { mode := s.getMode() memberCount := len(s.getMembers()) health := map[string]interface{}{ "status": "ok", "mode": mode, "member_count": memberCount, "node_id": s.config.NodeID, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(health) } func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] var storedValue StoredValue err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(path)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) }) if err == badger.ErrKeyNotFound { http.Error(w, "Not Found", http.StatusNotFound) return } if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to get value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(storedValue.Data) } func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var data json.RawMessage if err := json.NewDecoder(r.Body).Decode(&data); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() newUUID := uuid.New().String() storedValue := StoredValue{ UUID: newUUID, Timestamp: now, Data: data, } valueBytes, err := json.Marshal(storedValue) if err != nil { s.logger.WithError(err).Error("Failed to marshal stored value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } var isUpdate bool err = s.db.Update(func(txn *badger.Txn) error { // Check if key exists _, err := txn.Get([]byte(path)) isUpdate = (err == nil) // Store main data if err := txn.Set([]byte(path), valueBytes); err != nil { return err } // Store timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) return txn.Set([]byte(indexKey), []byte(newUUID)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to store value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } response := PutResponse{ UUID: newUUID, Timestamp: now, } status := http.StatusCreated if isUpdate { status = http.StatusOK } w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(response) s.logger.WithFields(logrus.Fields{ "path": path, "uuid": newUUID, "timestamp": now, "is_update": isUpdate, }).Info("Value stored") } func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var found bool err := s.db.Update(func(txn *badger.Txn) error { // Check if key exists and get timestamp for index cleanup item, err := txn.Get([]byte(path)) if err == badger.ErrKeyNotFound { return nil } if err != nil { return err } found = true var storedValue StoredValue err = item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) if err != nil { return err } // Delete main data if err := txn.Delete([]byte(path)); err != nil { return err } // Delete timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path) return txn.Delete([]byte(indexKey)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to delete value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if !found { http.Error(w, "Not Found", http.StatusNotFound) return } w.WriteHeader(http.StatusNoContent) s.logger.WithField("path", path).Info("Value deleted") } func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) { members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) { var req JoinRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() member := &Member{ ID: req.ID, Address: req.Address, LastSeen: now, JoinedTimestamp: req.JoinedTimestamp, } s.addMember(member) // Return current member list members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) { var req LeaveRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } s.removeMember(req.ID) w.WriteHeader(http.StatusNoContent) } func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) { var req PairsByTimeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } // Default limit to 15 as per spec if req.Limit <= 0 { req.Limit = 15 } var pairs []PairsByTimeResponse err := s.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = req.Limit it := txn.NewIterator(opts) defer it.Close() prefix := []byte("_ts:") if req.Prefix != "" { // We need to scan through timestamp entries and filter by path prefix } for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() { item := it.Item() key := string(item.Key()) // Parse timestamp index key: "_ts:{timestamp}:{path}" parts := strings.SplitN(key, ":", 3) if len(parts) != 3 { continue } timestamp, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { continue } // Filter by timestamp range if req.StartTimestamp > 0 && timestamp < req.StartTimestamp { continue } if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp { continue } path := parts[2] // Filter by prefix if specified if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) { continue } var uuid string err = item.Value(func(val []byte) error { uuid = string(val) return nil }) if err != nil { continue } pairs = append(pairs, PairsByTimeResponse{ Path: path, UUID: uuid, Timestamp: timestamp, }) } return nil }) if err != nil { s.logger.WithError(err).Error("Failed to query pairs by time") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if len(pairs) == 0 { w.WriteHeader(http.StatusNoContent) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(pairs) } // Utility function to check if request is from cluster member func (s *Server) isClusterMember(remoteAddr string) bool { host, _, err := net.SplitHostPort(remoteAddr) if err != nil { return false } s.membersMu.RLock() defer s.membersMu.RUnlock() for _, member := range s.members { memberHost, _, err := net.SplitHostPort(member.Address) if err == nil && memberHost == host { return true } } return false } // Setup HTTP routes func (s *Server) setupRoutes() *mux.Router { router := mux.NewRouter() // Health endpoint router.HandleFunc("/health", s.healthHandler).Methods("GET") // KV endpoints router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE") // Member endpoints router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") return router } // Start the server func (s *Server) Start() error { router := s.setupRoutes() addr := fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port) s.httpServer = &http.Server{ Addr: addr, Handler: router, } s.logger.WithFields(logrus.Fields{ "node_id": s.config.NodeID, "address": addr, }).Info("Starting KVS server") // Start gossip and sync routines s.startBackgroundTasks() // Try to join cluster if seed nodes are configured if len(s.config.SeedNodes) > 0 { go s.bootstrap() } return s.httpServer.ListenAndServe() } // Stop the server gracefully func (s *Server) Stop() error { s.logger.Info("Shutting down KVS server") s.cancel() s.wg.Wait() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := s.httpServer.Shutdown(ctx); err != nil { s.logger.WithError(err).Error("HTTP server shutdown error") } if err := s.db.Close(); err != nil { s.logger.WithError(err).Error("BadgerDB close error") } return nil } // Background tasks placeholder (gossip, sync, etc.) func (s *Server) startBackgroundTasks() { // TODO: Implement gossip protocol // TODO: Implement periodic sync // TODO: Implement catch-up sync } // Bootstrap placeholder func (s *Server) bootstrap() { // TODO: Implement gradual bootstrapping } func main() { configPath := "./config.yaml" // Simple CLI argument parsing if len(os.Args) > 1 { configPath = os.Args[1] } config, err := loadConfig(configPath) if err != nil { fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err) os.Exit(1) } server, err := NewServer(config) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err) os.Exit(1) } // Handle graceful shutdown sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh server.Stop() }() if err := server.Start(); err != nil && err != http.ErrServerClosed { fmt.Fprintf(os.Stderr, "Server error: %v\n", err) os.Exit(1) } }