From a5ea869b280c6b74fa026456829b096e63073c74 Mon Sep 17 00:00:00 2001 From: ryyst Date: Sat, 20 Sep 2025 11:02:44 +0300 Subject: [PATCH] refactor: extract core server package with handlers, routes, and lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Created server package with: - server.go: Server struct and core methods - handlers.go: HTTP handlers for health, KV operations, cluster management - routes.go: HTTP route setup - lifecycle.go: Server startup/shutdown logic This moves ~400 lines of server-related code from main.go to dedicated server package for better organization. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/handlers.go | 402 ++++++++++++++++++++++++++++++++++++++++++++ server/lifecycle.go | 94 +++++++++++ server/routes.go | 54 ++++++ server/server.go | 183 ++++++++++++++++++++ 4 files changed, 733 insertions(+) create mode 100644 server/handlers.go create mode 100644 server/lifecycle.go create mode 100644 server/routes.go create mode 100644 server/server.go diff --git a/server/handlers.go b/server/handlers.go new file mode 100644 index 0000000..874778f --- /dev/null +++ b/server/handlers.go @@ -0,0 +1,402 @@ +package server + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "strconv" + "strings" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/google/uuid" + "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + + "github.com/kalzu/kvs/types" +) + +// healthHandler returns server health status +func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { + mode := s.getMode() + memberCount := len(s.getMembers()) + + health := map[string]interface{}{ + "status": "ok", + "mode": mode, + "member_count": memberCount, + "node_id": s.config.NodeID, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(health) +} + +// getKVHandler retrieves a key-value pair +func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + path := vars["path"] + + var storedValue types.StoredValue + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(path)) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &storedValue) + }) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + if err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to get value") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(storedValue) +} + +// putKVHandler stores a key-value pair +func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + path := vars["path"] + + mode := s.getMode() + if mode == "syncing" { + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) + return + } + + if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var data json.RawMessage + if err := json.NewDecoder(r.Body).Decode(&data); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + now := time.Now().UnixMilli() + newUUID := uuid.New().String() + + storedValue := types.StoredValue{ + UUID: newUUID, + Timestamp: now, + Data: data, + } + + valueBytes, err := json.Marshal(storedValue) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal stored value") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + var isUpdate bool + err = s.db.Update(func(txn *badger.Txn) error { + // Check if key exists + _, err := txn.Get([]byte(path)) + isUpdate = (err == nil) + + // Store main data + if err := txn.Set([]byte(path), valueBytes); err != nil { + return err + } + + // Store timestamp index + indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) + return txn.Set([]byte(indexKey), []byte(newUUID)) + }) + + if err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to store value") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + response := types.PutResponse{ + UUID: newUUID, + Timestamp: now, + } + + status := http.StatusCreated + if isUpdate { + status = http.StatusOK + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(response) + + s.logger.WithFields(logrus.Fields{ + "path": path, + "uuid": newUUID, + "timestamp": now, + "is_update": isUpdate, + }).Info("Value stored") +} + +// deleteKVHandler removes a key-value pair +func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + path := vars["path"] + + mode := s.getMode() + if mode == "syncing" { + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) + return + } + + if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var found bool + err := s.db.Update(func(txn *badger.Txn) error { + // Check if key exists and get timestamp for index cleanup + item, err := txn.Get([]byte(path)) + if err == badger.ErrKeyNotFound { + return nil + } + if err != nil { + return err + } + found = true + + var storedValue types.StoredValue + err = item.Value(func(val []byte) error { + return json.Unmarshal(val, &storedValue) + }) + if err != nil { + return err + } + + // Delete main data + if err := txn.Delete([]byte(path)); err != nil { + return err + } + + // Delete timestamp index + indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path) + return txn.Delete([]byte(indexKey)) + }) + + if err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to delete value") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + if !found { + http.Error(w, "Not Found", http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusNoContent) + + s.logger.WithField("path", path).Info("Value deleted") +} + +// isClusterMember checks if request is from a cluster member +func (s *Server) isClusterMember(remoteAddr string) bool { + host, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return false + } + + members := s.gossipService.GetMembers() + for _, member := range members { + memberHost, _, err := net.SplitHostPort(member.Address) + if err == nil && memberHost == host { + return true + } + } + + return false +} + +// getMembersHandler returns current cluster members +func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) { + members := s.getMembers() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(members) +} + +// joinMemberHandler handles member join requests +func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) { + var req types.JoinRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + now := time.Now().UnixMilli() + member := &types.Member{ + ID: req.ID, + Address: req.Address, + LastSeen: now, + JoinedTimestamp: req.JoinedTimestamp, + } + + s.addMember(member) + + // Return current member list + members := s.getMembers() + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(members) +} + +// leaveMemberHandler handles member leave requests +func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) { + var req types.LeaveRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + s.removeMember(req.ID) + w.WriteHeader(http.StatusNoContent) +} + +// pairsByTimeHandler handles queries for key-value pairs by timestamp +func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) { + var req types.PairsByTimeRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + // Default limit to 15 as per spec + if req.Limit <= 0 { + req.Limit = 15 + } + + var pairs []types.PairsByTimeResponse + + err := s.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = req.Limit + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte("_ts:") + // The original logic for prefix filtering here was incomplete. + // For Merkle tree sync, this handler is no longer used for core sync. + // It remains as a client-facing API. + + for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() { + item := it.Item() + key := string(item.Key()) + + // Parse timestamp index key: "_ts:{timestamp}:{path}" + parts := strings.SplitN(key, ":", 3) + if len(parts) != 3 { + continue + } + + timestamp, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + continue + } + + // Filter by timestamp range + if req.StartTimestamp > 0 && timestamp < req.StartTimestamp { + continue + } + if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp { + continue + } + + path := parts[2] + + // Filter by prefix if specified + if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) { + continue + } + + var uuid string + err = item.Value(func(val []byte) error { + uuid = string(val) + return nil + }) + if err != nil { + continue + } + + pairs = append(pairs, types.PairsByTimeResponse{ + Path: path, + UUID: uuid, + Timestamp: timestamp, + }) + } + + return nil + }) + + if err != nil { + s.logger.WithError(err).Error("Failed to query pairs by time") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + if len(pairs) == 0 { + w.WriteHeader(http.StatusNoContent) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(pairs) +} + +// gossipHandler handles gossip protocol messages +func (s *Server) gossipHandler(w http.ResponseWriter, r *http.Request) { + var remoteMemberList []types.Member + if err := json.NewDecoder(r.Body).Decode(&remoteMemberList); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + // Merge the received member list using cluster service + s.gossipService.MergeMemberList(remoteMemberList, s.config.NodeID) + + // Respond with our current member list + localMembers := s.gossipService.GetMembers() + gossipResponse := make([]types.Member, len(localMembers)) + for i, member := range localMembers { + gossipResponse[i] = *member + } + + // Add ourselves to the response + selfMember := types.Member{ + ID: s.config.NodeID, + Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), + LastSeen: time.Now().UnixMilli(), + JoinedTimestamp: s.getJoinedTimestamp(), + } + gossipResponse = append(gossipResponse, selfMember) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(gossipResponse) + + s.logger.WithField("remote_members", len(remoteMemberList)).Debug("Processed gossip request") +} + +// getBackupStatusHandler returns current backup status +func (s *Server) getBackupStatusHandler(w http.ResponseWriter, r *http.Request) { + status := s.getBackupStatus() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) +} \ No newline at end of file diff --git a/server/lifecycle.go b/server/lifecycle.go new file mode 100644 index 0000000..507f296 --- /dev/null +++ b/server/lifecycle.go @@ -0,0 +1,94 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/sirupsen/logrus" +) + +// Start the server and initialize all services +func (s *Server) Start() error { + router := s.setupRoutes() + + addr := fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port) + s.httpServer = &http.Server{ + Addr: addr, + Handler: router, + } + + s.logger.WithFields(logrus.Fields{ + "node_id": s.config.NodeID, + "address": addr, + }).Info("Starting KVS server") + + // Start gossip and sync routines + s.startBackgroundTasks() + + // Try to join cluster if seed nodes are configured and clustering is enabled + if s.config.ClusteringEnabled && len(s.config.SeedNodes) > 0 { + go s.bootstrap() + } + + return s.httpServer.ListenAndServe() +} + +// Stop the server gracefully +func (s *Server) Stop() error { + s.logger.Info("Shutting down KVS server") + + // Stop cluster services + s.gossipService.Stop() + s.syncService.Stop() + + // Close storage services + if s.storageService != nil { + s.storageService.Close() + } + + s.cancel() + s.wg.Wait() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := s.httpServer.Shutdown(ctx); err != nil { + s.logger.WithError(err).Error("HTTP server shutdown error") + } + + if err := s.db.Close(); err != nil { + s.logger.WithError(err).Error("BadgerDB close error") + } + + return nil +} + +// startBackgroundTasks initializes and starts cluster services +func (s *Server) startBackgroundTasks() { + // Start cluster services + s.gossipService.Start() + s.syncService.Start() +} + +// bootstrap joins cluster using seed nodes via bootstrap service +func (s *Server) bootstrap() { + if len(s.config.SeedNodes) == 0 { + s.logger.Info("No seed nodes configured, running as standalone") + return + } + + s.logger.Info("Starting bootstrap process") + s.setMode("syncing") + + // Use bootstrap service to join cluster + if err := s.bootstrapService.JoinCluster(); err != nil { + s.logger.WithError(err).Error("Failed to join cluster") + s.setMode("normal") + return + } + + s.setMode("normal") + s.logger.Info("Successfully joined cluster") +} \ No newline at end of file diff --git a/server/routes.go b/server/routes.go new file mode 100644 index 0000000..d0cf5d8 --- /dev/null +++ b/server/routes.go @@ -0,0 +1,54 @@ +package server + +import ( + "github.com/gorilla/mux" +) + +// setupRoutes configures all HTTP routes and their handlers +func (s *Server) setupRoutes() *mux.Router { + router := mux.NewRouter() + + // Health endpoint + router.HandleFunc("/health", s.healthHandler).Methods("GET") + + // KV endpoints + router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") + router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") + router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE") + + // Member endpoints + router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") + router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") + router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") + router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") + router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") // Still available for clients + + // Merkle Tree endpoints + router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") + router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") + router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") // New endpoint for fetching ranges + + // User Management endpoints + router.HandleFunc("/api/users", s.createUserHandler).Methods("POST") + router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET") + router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT") + router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE") + + // Group Management endpoints + router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST") + router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET") + router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT") + router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE") + + // Token Management endpoints + router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST") + + // Revision History endpoints + router.HandleFunc("/api/data/{key}/history", s.getRevisionHistoryHandler).Methods("GET") + router.HandleFunc("/api/data/{key}/history/{revision}", s.getSpecificRevisionHandler).Methods("GET") + + // Backup Status endpoint + router.HandleFunc("/api/backup/status", s.getBackupStatusHandler).Methods("GET") + + return router +} \ No newline at end of file diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..ebd2fa4 --- /dev/null +++ b/server/server.go @@ -0,0 +1,183 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "sync" + + "github.com/dgraph-io/badger/v3" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" + + "github.com/kalzu/kvs/auth" + "github.com/kalzu/kvs/cluster" + "github.com/kalzu/kvs/storage" + "github.com/kalzu/kvs/types" +) + +// Server represents the KVS node +type Server struct { + config *types.Config + db *badger.DB + mode string // "normal", "read-only", "syncing" + modeMu sync.RWMutex + logger *logrus.Logger + httpServer *http.Server + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // Cluster services + gossipService *cluster.GossipService + syncService *cluster.SyncService + merkleService *cluster.MerkleService + bootstrapService *cluster.BootstrapService + + // Storage services + storageService *storage.StorageService + revisionService *storage.RevisionService + + // Phase 2: Backup system + cronScheduler *cron.Cron // Cron scheduler for backups + backupStatus types.BackupStatus // Current backup status + backupMu sync.RWMutex // Protects backup status + + // Authentication service + authService *auth.AuthService +} + +// NewServer initializes and returns a new Server instance +func NewServer(config *types.Config) (*Server, error) { + logger := logrus.New() + logger.SetFormatter(&logrus.JSONFormatter{}) + + level, err := logrus.ParseLevel(config.LogLevel) + if err != nil { + level = logrus.InfoLevel + } + logger.SetLevel(level) + + // Create data directory + if err := os.MkdirAll(config.DataDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create data directory: %v", err) + } + + // Open BadgerDB + opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger")) + opts.Logger = nil // Disable badger's internal logging + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("failed to open BadgerDB: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + // Initialize cluster services + merkleService := cluster.NewMerkleService(db, logger) + gossipService := cluster.NewGossipService(config, logger) + syncService := cluster.NewSyncService(db, config, gossipService, merkleService, logger) + var server *Server // Forward declaration + bootstrapService := cluster.NewBootstrapService(config, gossipService, syncService, logger, func(mode string) { + if server != nil { + server.setMode(mode) + } + }) + + server = &Server{ + config: config, + db: db, + mode: "normal", + logger: logger, + ctx: ctx, + cancel: cancel, + gossipService: gossipService, + syncService: syncService, + merkleService: merkleService, + bootstrapService: bootstrapService, + } + + if config.ReadOnly { + server.setMode("read-only") + } + + // Initialize storage services + storageService, err := storage.NewStorageService(db, config, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize storage service: %v", err) + } + server.storageService = storageService + + // Initialize revision service + server.revisionService = storage.NewRevisionService(storageService) + + // Initialize authentication service + server.authService = auth.NewAuthService(db, logger) + + // Initialize Merkle tree using cluster service + if err := server.syncService.InitializeMerkleTree(); err != nil { + return nil, fmt.Errorf("failed to initialize Merkle tree: %v", err) + } + + return server, nil +} + +// getMode returns the current server mode +func (s *Server) getMode() string { + s.modeMu.RLock() + defer s.modeMu.RUnlock() + return s.mode +} + +// setMode sets the server mode +func (s *Server) setMode(mode string) { + s.modeMu.Lock() + defer s.modeMu.Unlock() + oldMode := s.mode + s.mode = mode + s.logger.WithFields(logrus.Fields{ + "old_mode": oldMode, + "new_mode": mode, + }).Info("Mode changed") +} + +// addMember adds a member using cluster service +func (s *Server) addMember(member *types.Member) { + s.gossipService.AddMember(member) +} + +// removeMember removes a member using cluster service +func (s *Server) removeMember(nodeID string) { + s.gossipService.RemoveMember(nodeID) +} + +// getMembers returns all cluster members +func (s *Server) getMembers() []*types.Member { + return s.gossipService.GetMembers() +} + +// getJoinedTimestamp returns this node's joined timestamp (startup time) +func (s *Server) getJoinedTimestamp() int64 { + // For now, use a simple approach - this should be stored persistently + return time.Now().UnixMilli() +} + +// getBackupStatus returns the current backup status +func (s *Server) getBackupStatus() types.BackupStatus { + s.backupMu.RLock() + defer s.backupMu.RUnlock() + + status := s.backupStatus + + // Calculate next backup time if scheduler is running + if s.cronScheduler != nil && len(s.cronScheduler.Entries()) > 0 { + nextRun := s.cronScheduler.Entries()[0].Next + if !nextRun.IsZero() { + status.NextBackupTime = nextRun.Unix() + } + } + + return status +} \ No newline at end of file