package server import ( "encoding/json" "fmt" "net" "net/http" "strconv" "strings" "time" "github.com/dgraph-io/badger/v3" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/kalzu/kvs/types" ) // healthHandler returns server health status func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { mode := s.getMode() memberCount := len(s.getMembers()) health := map[string]interface{}{ "status": "ok", "mode": mode, "member_count": memberCount, "node_id": s.config.NodeID, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(health) } // getKVHandler retrieves a key-value pair func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] var storedValue types.StoredValue err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(path)) if err != nil { return err } return item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) }) if err == badger.ErrKeyNotFound { http.Error(w, "Not Found", http.StatusNotFound) return } if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to get value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(storedValue) } // putKVHandler stores a key-value pair func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var data json.RawMessage if err := json.NewDecoder(r.Body).Decode(&data); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() newUUID := uuid.New().String() storedValue := types.StoredValue{ UUID: newUUID, Timestamp: now, Data: data, } valueBytes, err := json.Marshal(storedValue) if err != nil { s.logger.WithError(err).Error("Failed to marshal stored value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } var isUpdate bool err = s.db.Update(func(txn *badger.Txn) error { // Check if key exists _, err := txn.Get([]byte(path)) isUpdate = (err == nil) // Store main data if err := txn.Set([]byte(path), valueBytes); err != nil { return err } // Store timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", now, path) return txn.Set([]byte(indexKey), []byte(newUUID)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to store value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } response := types.PutResponse{ UUID: newUUID, Timestamp: now, } status := http.StatusCreated if isUpdate { status = http.StatusOK } w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(response) s.logger.WithFields(logrus.Fields{ "path": path, "uuid": newUUID, "timestamp": now, "is_update": isUpdate, }).Info("Value stored") } // deleteKVHandler removes a key-value pair func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) path := vars["path"] mode := s.getMode() if mode == "syncing" { http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) return } if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) { http.Error(w, "Forbidden", http.StatusForbidden) return } var found bool err := s.db.Update(func(txn *badger.Txn) error { // Check if key exists and get timestamp for index cleanup item, err := txn.Get([]byte(path)) if err == badger.ErrKeyNotFound { return nil } if err != nil { return err } found = true var storedValue types.StoredValue err = item.Value(func(val []byte) error { return json.Unmarshal(val, &storedValue) }) if err != nil { return err } // Delete main data if err := txn.Delete([]byte(path)); err != nil { return err } // Delete timestamp index indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path) return txn.Delete([]byte(indexKey)) }) if err != nil { s.logger.WithError(err).WithField("path", path).Error("Failed to delete value") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if !found { http.Error(w, "Not Found", http.StatusNotFound) return } w.WriteHeader(http.StatusNoContent) s.logger.WithField("path", path).Info("Value deleted") } // isClusterMember checks if request is from a cluster member func (s *Server) isClusterMember(remoteAddr string) bool { host, _, err := net.SplitHostPort(remoteAddr) if err != nil { return false } members := s.gossipService.GetMembers() for _, member := range members { memberHost, _, err := net.SplitHostPort(member.Address) if err == nil && memberHost == host { return true } } return false } // getMembersHandler returns current cluster members func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) { members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } // joinMemberHandler handles member join requests func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) { var req types.JoinRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } now := time.Now().UnixMilli() member := &types.Member{ ID: req.ID, Address: req.Address, LastSeen: now, JoinedTimestamp: req.JoinedTimestamp, } s.addMember(member) // Return current member list members := s.getMembers() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(members) } // leaveMemberHandler handles member leave requests func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) { var req types.LeaveRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } s.removeMember(req.ID) w.WriteHeader(http.StatusNoContent) } // pairsByTimeHandler handles queries for key-value pairs by timestamp func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) { var req types.PairsByTimeRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } // Default limit to 15 as per spec if req.Limit <= 0 { req.Limit = 15 } var pairs []types.PairsByTimeResponse err := s.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = req.Limit it := txn.NewIterator(opts) defer it.Close() prefix := []byte("_ts:") // The original logic for prefix filtering here was incomplete. // For Merkle tree sync, this handler is no longer used for core sync. // It remains as a client-facing API. for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() { item := it.Item() key := string(item.Key()) // Parse timestamp index key: "_ts:{timestamp}:{path}" parts := strings.SplitN(key, ":", 3) if len(parts) != 3 { continue } timestamp, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { continue } // Filter by timestamp range if req.StartTimestamp > 0 && timestamp < req.StartTimestamp { continue } if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp { continue } path := parts[2] // Filter by prefix if specified if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) { continue } var uuid string err = item.Value(func(val []byte) error { uuid = string(val) return nil }) if err != nil { continue } pairs = append(pairs, types.PairsByTimeResponse{ Path: path, UUID: uuid, Timestamp: timestamp, }) } return nil }) if err != nil { s.logger.WithError(err).Error("Failed to query pairs by time") http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if len(pairs) == 0 { w.WriteHeader(http.StatusNoContent) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(pairs) } // gossipHandler handles gossip protocol messages func (s *Server) gossipHandler(w http.ResponseWriter, r *http.Request) { var remoteMemberList []types.Member if err := json.NewDecoder(r.Body).Decode(&remoteMemberList); err != nil { http.Error(w, "Bad Request", http.StatusBadRequest) return } // Merge the received member list using cluster service s.gossipService.MergeMemberList(remoteMemberList, s.config.NodeID) // Respond with our current member list localMembers := s.gossipService.GetMembers() gossipResponse := make([]types.Member, len(localMembers)) for i, member := range localMembers { gossipResponse[i] = *member } // Add ourselves to the response selfMember := types.Member{ ID: s.config.NodeID, Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port), LastSeen: time.Now().UnixMilli(), JoinedTimestamp: s.getJoinedTimestamp(), } gossipResponse = append(gossipResponse, selfMember) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(gossipResponse) s.logger.WithField("remote_members", len(remoteMemberList)).Debug("Processed gossip request") } // getBackupStatusHandler returns current backup status func (s *Server) getBackupStatusHandler(w http.ResponseWriter, r *http.Request) { status := s.getBackupStatus() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(status) }