Files
kalzu-value-store/server/handlers.go
ryyst 377af163f0 feat: implement resource metadata management API (issue #12)
Add API endpoints to manage ResourceMetadata (ownership, groups, permissions)
for KV resources. This enables administrators to configure granular access
control for stored data.

Changes:
- Add GetResourceMetadataResponse and UpdateResourceMetadataRequest types
- Add GetResourceMetadata and SetResourceMetadata methods to AuthService
- Add GET /kv/{path}/metadata endpoint (requires admin:users:read)
- Add PUT /kv/{path}/metadata endpoint (requires admin:users:update)
- Both endpoints protected by JWT authentication
- Metadata routes registered before general KV routes to prevent pattern conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 00:06:14 +03:00

1396 lines
37 KiB
Go

package server
import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"net"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/dgraph-io/badger/v4"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"kvs/auth"
"kvs/types"
"kvs/utils"
)
// healthHandler returns server health status
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
mode := s.getMode()
memberCount := len(s.getMembers())
health := map[string]interface{}{
"status": "ok",
"mode": mode,
"member_count": memberCount,
"node_id": s.config.NodeID,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(health)
}
// getKVHandler retrieves a key-value pair
func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
var storedValue types.StoredValue
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(path))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &storedValue)
})
})
if err == badger.ErrKeyNotFound {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get value")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(storedValue)
}
// putKVHandler stores a key-value pair
func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
mode := s.getMode()
if mode == "syncing" {
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
return
}
if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
var data json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
now := time.Now().UnixMilli()
newUUID := uuid.New().String()
storedValue := types.StoredValue{
UUID: newUUID,
Timestamp: now,
Data: data,
}
valueBytes, err := json.Marshal(storedValue)
if err != nil {
s.logger.WithError(err).Error("Failed to marshal stored value")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
var isUpdate bool
err = s.db.Update(func(txn *badger.Txn) error {
// Check if key exists
_, err := txn.Get([]byte(path))
isUpdate = (err == nil)
// Store main data
if err := txn.Set([]byte(path), valueBytes); err != nil {
return err
}
// Store timestamp index
indexKey := fmt.Sprintf("_ts:%020d:%s", now, path)
return txn.Set([]byte(indexKey), []byte(newUUID))
})
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to store value")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.PutResponse{
UUID: newUUID,
Timestamp: now,
}
status := http.StatusCreated
if isUpdate {
status = http.StatusOK
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"uuid": newUUID,
"timestamp": now,
"is_update": isUpdate,
}).Info("Value stored")
}
// deleteKVHandler removes a key-value pair
func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
mode := s.getMode()
if mode == "syncing" {
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
return
}
if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
var found bool
err := s.db.Update(func(txn *badger.Txn) error {
// Check if key exists and get timestamp for index cleanup
item, err := txn.Get([]byte(path))
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
found = true
var storedValue types.StoredValue
err = item.Value(func(val []byte) error {
return json.Unmarshal(val, &storedValue)
})
if err != nil {
return err
}
// Delete main data
if err := txn.Delete([]byte(path)); err != nil {
return err
}
// Delete timestamp index
indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path)
return txn.Delete([]byte(indexKey))
})
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to delete value")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if !found {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
s.logger.WithField("path", path).Info("Value deleted")
}
// getResourceMetadataHandler retrieves metadata for a KV resource
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Get metadata from storage
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// updateResourceMetadataHandler updates metadata for a KV resource
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Parse request body
var req types.UpdateResourceMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest)
return
}
// Get existing metadata or create new one
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
// Create new metadata with defaults
metadata = &types.ResourceMetadata{
OwnerUUID: "",
GroupUUID: "",
Permissions: types.DefaultPermissions,
TTL: "",
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Update only provided fields
if req.OwnerUUID != nil {
metadata.OwnerUUID = *req.OwnerUUID
}
if req.GroupUUID != nil {
metadata.GroupUUID = *req.GroupUUID
}
if req.Permissions != nil {
metadata.Permissions = *req.Permissions
}
metadata.UpdatedAt = time.Now().Unix()
// Store updated metadata
if err := s.authService.SetResourceMetadata(path, metadata); err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"owner_uuid": metadata.OwnerUUID,
"group_uuid": metadata.GroupUUID,
}).Info("Resource metadata updated")
}
// isClusterMember checks if request is from a cluster member
func (s *Server) isClusterMember(remoteAddr string) bool {
host, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
return false
}
members := s.gossipService.GetMembers()
for _, member := range members {
memberHost, _, err := net.SplitHostPort(member.Address)
if err == nil && memberHost == host {
return true
}
}
return false
}
// getMembersHandler returns current cluster members
func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) {
members := s.getMembers()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(members)
}
// joinMemberHandler handles member join requests
func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) {
var req types.JoinRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
now := time.Now().UnixMilli()
member := &types.Member{
ID: req.ID,
Address: req.Address,
LastSeen: now,
JoinedTimestamp: req.JoinedTimestamp,
}
s.addMember(member)
// Return current member list
members := s.getMembers()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(members)
}
// leaveMemberHandler handles member leave requests
func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) {
var req types.LeaveRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
s.removeMember(req.ID)
w.WriteHeader(http.StatusNoContent)
}
// pairsByTimeHandler handles queries for key-value pairs by timestamp
func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) {
var req types.PairsByTimeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
// Default limit to 15 as per spec
if req.Limit <= 0 {
req.Limit = 15
}
var pairs []types.PairsByTimeResponse
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = req.Limit
it := txn.NewIterator(opts)
defer it.Close()
prefix := []byte("_ts:")
// The original logic for prefix filtering here was incomplete.
// For Merkle tree sync, this handler is no longer used for core sync.
// It remains as a client-facing API.
for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() {
item := it.Item()
key := string(item.Key())
// Parse timestamp index key: "_ts:{timestamp}:{path}"
parts := strings.SplitN(key, ":", 3)
if len(parts) != 3 {
continue
}
timestamp, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
continue
}
// Filter by timestamp range
if req.StartTimestamp > 0 && timestamp < req.StartTimestamp {
continue
}
if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp {
continue
}
path := parts[2]
// Filter by prefix if specified
if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) {
continue
}
var uuid string
err = item.Value(func(val []byte) error {
uuid = string(val)
return nil
})
if err != nil {
continue
}
pairs = append(pairs, types.PairsByTimeResponse{
Path: path,
UUID: uuid,
Timestamp: timestamp,
})
}
return nil
})
if err != nil {
s.logger.WithError(err).Error("Failed to query pairs by time")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if len(pairs) == 0 {
w.WriteHeader(http.StatusNoContent)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(pairs)
}
// gossipHandler handles gossip protocol messages
func (s *Server) gossipHandler(w http.ResponseWriter, r *http.Request) {
var remoteMemberList []types.Member
if err := json.NewDecoder(r.Body).Decode(&remoteMemberList); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
// Merge the received member list using cluster service
s.gossipService.MergeMemberList(remoteMemberList, s.config.NodeID)
// Respond with our current member list
localMembers := s.gossipService.GetMembers()
gossipResponse := make([]types.Member, len(localMembers))
for i, member := range localMembers {
gossipResponse[i] = *member
}
// Add ourselves to the response
selfMember := types.Member{
ID: s.config.NodeID,
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
LastSeen: time.Now().UnixMilli(),
JoinedTimestamp: s.getJoinedTimestamp(),
}
gossipResponse = append(gossipResponse, selfMember)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(gossipResponse)
s.logger.WithField("remote_members", len(remoteMemberList)).Debug("Processed gossip request")
}
// getBackupStatusHandler returns current backup status
func (s *Server) getBackupStatusHandler(w http.ResponseWriter, r *http.Request) {
status := s.getBackupStatus()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}
// getMerkleRootHandler returns the current Merkle tree root
func (s *Server) getMerkleRootHandler(w http.ResponseWriter, r *http.Request) {
root := s.syncService.GetMerkleRoot()
if root == nil {
http.Error(w, "Merkle tree not initialized", http.StatusInternalServerError)
return
}
resp := types.MerkleRootResponse{
Root: root,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func (s *Server) getMerkleDiffHandler(w http.ResponseWriter, r *http.Request) {
var req types.MerkleTreeDiffRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
localPairs, err := s.getAllKVPairsForMerkleTree()
if err != nil {
s.logger.WithError(err).Error("Failed to get KV pairs for Merkle diff")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Build the local types.MerkleNode for the requested range to compare with the remote's hash
localSubTreeRoot, err := s.buildMerkleTreeFromPairs(s.filterPairsByRange(localPairs, req.ParentNode.StartKey, req.ParentNode.EndKey))
if err != nil {
s.logger.WithError(err).Error("Failed to build sub-Merkle tree for diff request")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if localSubTreeRoot == nil { // This can happen if the range is empty locally
localSubTreeRoot = &types.MerkleNode{Hash: calculateHash([]byte("empty_tree")), StartKey: req.ParentNode.StartKey, EndKey: req.ParentNode.EndKey}
}
resp := types.MerkleTreeDiffResponse{}
// If hashes match, no need to send children or keys
if bytes.Equal(req.LocalHash, localSubTreeRoot.Hash) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
return
}
// Hashes differ, so we need to provide more detail.
// Get all keys within the parent node's range locally
var keysInRange []string
for key := range s.filterPairsByRange(localPairs, req.ParentNode.StartKey, req.ParentNode.EndKey) {
keysInRange = append(keysInRange, key)
}
sort.Strings(keysInRange)
const diffLeafThreshold = 10 // If a range has <= 10 keys, we consider it a leaf-level diff
if len(keysInRange) <= diffLeafThreshold {
// This is a leaf-level diff, return the actual keys in the range
resp.Keys = keysInRange
} else {
// types.Group keys into sub-ranges and return their types.MerkleNode representations
// For simplicity, let's split the range into two halves.
mid := len(keysInRange) / 2
leftKeys := keysInRange[:mid]
rightKeys := keysInRange[mid:]
if len(leftKeys) > 0 {
leftRangePairs := s.filterPairsByRange(localPairs, leftKeys[0], leftKeys[len(leftKeys)-1])
leftNode, err := s.buildMerkleTreeFromPairs(leftRangePairs)
if err != nil {
s.logger.WithError(err).Error("Failed to build left child node for diff")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if leftNode != nil {
resp.Children = append(resp.Children, *leftNode)
}
}
if len(rightKeys) > 0 {
rightRangePairs := s.filterPairsByRange(localPairs, rightKeys[0], rightKeys[len(rightKeys)-1])
rightNode, err := s.buildMerkleTreeFromPairs(rightRangePairs)
if err != nil {
s.logger.WithError(err).Error("Failed to build right child node for diff")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if rightNode != nil {
resp.Children = append(resp.Children, *rightNode)
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func (s *Server) getKVRangeHandler(w http.ResponseWriter, r *http.Request) {
var req types.KVRangeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
var pairs []struct {
Path string `json:"path"`
StoredValue types.StoredValue `json:"stored_value"`
}
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
it := txn.NewIterator(opts)
defer it.Close()
count := 0
// Start iteration from the requested StartKey
for it.Seek([]byte(req.StartKey)); it.Valid(); it.Next() {
item := it.Item()
key := string(item.Key())
if strings.HasPrefix(key, "_ts:") {
continue // Skip index keys
}
// Stop if we exceed the EndKey (if provided)
if req.EndKey != "" && key > req.EndKey {
break
}
// Stop if we hit the limit (if provided)
if req.Limit > 0 && count >= req.Limit {
break
}
var storedValue types.StoredValue
err := item.Value(func(val []byte) error {
return json.Unmarshal(val, &storedValue)
})
if err != nil {
s.logger.WithError(err).WithField("key", key).Warn("Failed to unmarshal stored value in KV range, skipping")
continue
}
pairs = append(pairs, struct {
Path string `json:"path"`
StoredValue types.StoredValue `json:"stored_value"`
}{Path: key, StoredValue: storedValue})
count++
}
return nil
})
if err != nil {
s.logger.WithError(err).Error("Failed to query KV range")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(types.KVRangeResponse{Pairs: pairs})
}
func (s *Server) createUserHandler(w http.ResponseWriter, r *http.Request) {
var req types.CreateUserRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if req.Nickname == "" {
http.Error(w, "Nickname is required", http.StatusBadRequest)
return
}
// Generate UUID for the user
userUUID := uuid.New().String()
now := time.Now().Unix()
user := types.User{
UUID: userUUID,
NicknameHash: utils.HashUserNickname(req.Nickname),
Groups: []string{},
CreatedAt: now,
UpdatedAt: now,
}
// Store user in BadgerDB
userData, err := json.Marshal(user)
if err != nil {
s.logger.WithError(err).Error("Failed to marshal user data")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(auth.UserStorageKey(userUUID)), userData)
})
if err != nil {
s.logger.WithError(err).Error("Failed to store user")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("user_uuid", userUUID).Info("types.User created successfully")
response := types.CreateUserResponse{UUID: userUUID}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) getUserHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
userUUID := vars["uuid"]
if userUUID == "" {
http.Error(w, "types.User UUID is required", http.StatusBadRequest)
return
}
var user types.User
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(auth.UserStorageKey(userUUID)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &user)
})
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.User not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to get user")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetUserResponse{
UUID: user.UUID,
NicknameHash: user.NicknameHash,
Groups: user.Groups,
CreatedAt: user.CreatedAt,
UpdatedAt: user.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) updateUserHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
userUUID := vars["uuid"]
if userUUID == "" {
http.Error(w, "types.User UUID is required", http.StatusBadRequest)
return
}
var req types.UpdateUserRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
err := s.db.Update(func(txn *badger.Txn) error {
// Get existing user
item, err := txn.Get([]byte(auth.UserStorageKey(userUUID)))
if err != nil {
return err
}
var user types.User
err = item.Value(func(val []byte) error {
return json.Unmarshal(val, &user)
})
if err != nil {
return err
}
// Update fields if provided
now := time.Now().Unix()
user.UpdatedAt = now
if req.Nickname != "" {
user.NicknameHash = utils.HashUserNickname(req.Nickname)
}
if req.Groups != nil {
user.Groups = req.Groups
}
// Store updated user
userData, err := json.Marshal(user)
if err != nil {
return err
}
return txn.Set([]byte(auth.UserStorageKey(userUUID)), userData)
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.User not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to update user")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("user_uuid", userUUID).Info("types.User updated successfully")
w.WriteHeader(http.StatusOK)
}
func (s *Server) deleteUserHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
userUUID := vars["uuid"]
if userUUID == "" {
http.Error(w, "types.User UUID is required", http.StatusBadRequest)
return
}
err := s.db.Update(func(txn *badger.Txn) error {
// Check if user exists first
_, err := txn.Get([]byte(auth.UserStorageKey(userUUID)))
if err != nil {
return err
}
// Delete the user
return txn.Delete([]byte(auth.UserStorageKey(userUUID)))
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.User not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to delete user")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("user_uuid", userUUID).Info("types.User deleted successfully")
w.WriteHeader(http.StatusOK)
}
func (s *Server) createGroupHandler(w http.ResponseWriter, r *http.Request) {
var req types.CreateGroupRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if req.Groupname == "" {
http.Error(w, "Groupname is required", http.StatusBadRequest)
return
}
// Generate UUID for the group
groupUUID := uuid.New().String()
now := time.Now().Unix()
group := types.Group{
UUID: groupUUID,
NameHash: utils.HashGroupName(req.Groupname),
Members: req.Members,
CreatedAt: now,
UpdatedAt: now,
}
if group.Members == nil {
group.Members = []string{}
}
// Store group in BadgerDB
groupData, err := json.Marshal(group)
if err != nil {
s.logger.WithError(err).Error("Failed to marshal group data")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(auth.GroupStorageKey(groupUUID)), groupData)
})
if err != nil {
s.logger.WithError(err).Error("Failed to store group")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("group_uuid", groupUUID).Info("types.Group created successfully")
response := types.CreateGroupResponse{UUID: groupUUID}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) getGroupHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
groupUUID := vars["uuid"]
if groupUUID == "" {
http.Error(w, "types.Group UUID is required", http.StatusBadRequest)
return
}
var group types.Group
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(auth.GroupStorageKey(groupUUID)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &group)
})
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.Group not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to get group")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetGroupResponse{
UUID: group.UUID,
NameHash: group.NameHash,
Members: group.Members,
CreatedAt: group.CreatedAt,
UpdatedAt: group.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) updateGroupHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
groupUUID := vars["uuid"]
if groupUUID == "" {
http.Error(w, "types.Group UUID is required", http.StatusBadRequest)
return
}
var req types.UpdateGroupRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
err := s.db.Update(func(txn *badger.Txn) error {
// Get existing group
item, err := txn.Get([]byte(auth.GroupStorageKey(groupUUID)))
if err != nil {
return err
}
var group types.Group
err = item.Value(func(val []byte) error {
return json.Unmarshal(val, &group)
})
if err != nil {
return err
}
// Update fields
now := time.Now().Unix()
group.UpdatedAt = now
group.Members = req.Members
if group.Members == nil {
group.Members = []string{}
}
// Store updated group
groupData, err := json.Marshal(group)
if err != nil {
return err
}
return txn.Set([]byte(auth.GroupStorageKey(groupUUID)), groupData)
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.Group not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to update group")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("group_uuid", groupUUID).Info("types.Group updated successfully")
w.WriteHeader(http.StatusOK)
}
func (s *Server) deleteGroupHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
groupUUID := vars["uuid"]
if groupUUID == "" {
http.Error(w, "types.Group UUID is required", http.StatusBadRequest)
return
}
err := s.db.Update(func(txn *badger.Txn) error {
// Check if group exists first
_, err := txn.Get([]byte(auth.GroupStorageKey(groupUUID)))
if err != nil {
return err
}
// Delete the group
return txn.Delete([]byte(auth.GroupStorageKey(groupUUID)))
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.Group not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to delete group")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithField("group_uuid", groupUUID).Info("types.Group deleted successfully")
w.WriteHeader(http.StatusOK)
}
func (s *Server) createTokenHandler(w http.ResponseWriter, r *http.Request) {
var req types.CreateTokenRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
if req.UserUUID == "" {
http.Error(w, "types.User UUID is required", http.StatusBadRequest)
return
}
if len(req.Scopes) == 0 {
http.Error(w, "At least one scope is required", http.StatusBadRequest)
return
}
// Verify user exists
err := s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get([]byte(auth.UserStorageKey(req.UserUUID)))
return err
})
if err == badger.ErrKeyNotFound {
http.Error(w, "types.User not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).Error("Failed to verify user")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Generate JWT token
tokenString, expiresAt, err := auth.GenerateJWT(req.UserUUID, req.Scopes, 1) // 1 hour default
if err != nil {
s.logger.WithError(err).Error("Failed to generate JWT token")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Store token in BadgerDB
err = s.storeAPIToken(tokenString, req.UserUUID, req.Scopes, expiresAt)
if err != nil {
s.logger.WithError(err).Error("Failed to store API token")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
s.logger.WithFields(logrus.Fields{
"user_uuid": req.UserUUID,
"scopes": req.Scopes,
"expires_at": expiresAt,
}).Info("API token created successfully")
response := types.CreateTokenResponse{
Token: tokenString,
ExpiresAt: expiresAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) getRevisionHistoryHandler(w http.ResponseWriter, r *http.Request) {
// Check if revision history is enabled
if !s.config.RevisionHistoryEnabled {
http.Error(w, "Revision history is disabled", http.StatusServiceUnavailable)
return
}
vars := mux.Vars(r)
key := vars["key"]
if key == "" {
http.Error(w, "Key is required", http.StatusBadRequest)
return
}
revisions, err := s.getRevisionHistory(key)
if err != nil {
s.logger.WithError(err).WithField("key", key).Error("Failed to get revision history")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if len(revisions) == 0 {
http.Error(w, "No revisions found", http.StatusNotFound)
return
}
response := map[string]interface{}{
"revisions": revisions,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (s *Server) getSpecificRevisionHandler(w http.ResponseWriter, r *http.Request) {
// Check if revision history is enabled
if !s.config.RevisionHistoryEnabled {
http.Error(w, "Revision history is disabled", http.StatusServiceUnavailable)
return
}
vars := mux.Vars(r)
key := vars["key"]
revisionStr := vars["revision"]
if key == "" {
http.Error(w, "Key is required", http.StatusBadRequest)
return
}
if revisionStr == "" {
http.Error(w, "Revision is required", http.StatusBadRequest)
return
}
revision, err := strconv.Atoi(revisionStr)
if err != nil {
http.Error(w, "Invalid revision number", http.StatusBadRequest)
return
}
storedValue, err := s.getSpecificRevision(key, revision)
if err == badger.ErrKeyNotFound {
http.Error(w, "Revision not found", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithFields(logrus.Fields{
"key": key,
"revision": revision,
}).Error("Failed to get specific revision")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(storedValue)
}
// calculateHash computes SHA256 hash of data
func calculateHash(data []byte) []byte {
h := sha256.New()
h.Write(data)
return h.Sum(nil)
}
// getAllKVPairsForMerkleTree retrieves all key-value pairs for Merkle tree operations
func (s *Server) getAllKVPairsForMerkleTree() (map[string]*types.StoredValue, error) {
pairs := make(map[string]*types.StoredValue)
err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true // We need the values for hashing
it := txn.NewIterator(opts)
defer it.Close()
// Iterate over all actual data keys (not _ts: indexes)
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := string(item.Key())
if strings.HasPrefix(key, "_ts:") {
continue // Skip index keys
}
var storedValue types.StoredValue
err := item.Value(func(val []byte) error {
return json.Unmarshal(val, &storedValue)
})
if err != nil {
s.logger.WithError(err).WithField("key", key).Warn("Failed to unmarshal stored value for Merkle tree, skipping")
continue
}
pairs[key] = &storedValue
}
return nil
})
if err != nil {
return nil, err
}
return pairs, nil
}
// buildMerkleTreeFromPairs constructs a Merkle tree from key-value pairs
func (s *Server) buildMerkleTreeFromPairs(pairs map[string]*types.StoredValue) (*types.MerkleNode, error) {
if len(pairs) == 0 {
return &types.MerkleNode{Hash: calculateHash([]byte("empty_tree")), StartKey: "", EndKey: ""}, nil
}
// Sort keys to ensure consistent tree structure
keys := make([]string, 0, len(pairs))
for k := range pairs {
keys = append(keys, k)
}
sort.Strings(keys)
// Create leaf nodes
leafNodes := make([]*types.MerkleNode, len(keys))
for i, key := range keys {
storedValue := pairs[key]
hash := s.calculateLeafHash(key, storedValue)
leafNodes[i] = &types.MerkleNode{Hash: hash, StartKey: key, EndKey: key}
}
// Recursively build parent nodes
return s.buildMerkleTreeRecursive(leafNodes)
}
// filterPairsByRange filters key-value pairs by key range
func (s *Server) filterPairsByRange(allPairs map[string]*types.StoredValue, startKey, endKey string) map[string]*types.StoredValue {
filtered := make(map[string]*types.StoredValue)
for key, value := range allPairs {
if (startKey == "" || key >= startKey) && (endKey == "" || key <= endKey) {
filtered[key] = value
}
}
return filtered
}
// calculateLeafHash generates a hash for a leaf node
func (s *Server) calculateLeafHash(path string, storedValue *types.StoredValue) []byte {
// Concatenate path, UUID, timestamp, and the raw data bytes for hashing
// Ensure a consistent order of fields for hashing
dataToHash := bytes.Buffer{}
dataToHash.WriteString(path)
dataToHash.WriteByte(':')
dataToHash.WriteString(storedValue.UUID)
dataToHash.WriteByte(':')
dataToHash.WriteString(strconv.FormatInt(storedValue.Timestamp, 10))
dataToHash.WriteByte(':')
dataToHash.Write(storedValue.Data) // Use raw bytes of json.RawMessage
return calculateHash(dataToHash.Bytes())
}
// buildMerkleTreeRecursive builds Merkle tree recursively from nodes
func (s *Server) buildMerkleTreeRecursive(nodes []*types.MerkleNode) (*types.MerkleNode, error) {
if len(nodes) == 0 {
return nil, nil
}
if len(nodes) == 1 {
return nodes[0], nil
}
var nextLevel []*types.MerkleNode
for i := 0; i < len(nodes); i += 2 {
left := nodes[i]
var right *types.MerkleNode
if i+1 < len(nodes) {
right = nodes[i+1]
}
var combinedHash []byte
var endKey string
if right != nil {
combinedHash = calculateHash(append(left.Hash, right.Hash...))
endKey = right.EndKey
} else {
// Odd number of nodes, promote the left node
combinedHash = left.Hash
endKey = left.EndKey
}
parentNode := &types.MerkleNode{
Hash: combinedHash,
StartKey: left.StartKey,
EndKey: endKey,
}
nextLevel = append(nextLevel, parentNode)
}
return s.buildMerkleTreeRecursive(nextLevel)
}
func (s *Server) storeAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error {
tokenHash := utils.HashToken(tokenString)
apiToken := types.APIToken{
TokenHash: tokenHash,
UserUUID: userUUID,
Scopes: scopes,
IssuedAt: time.Now().Unix(),
ExpiresAt: expiresAt,
}
tokenData, err := json.Marshal(apiToken)
if err != nil {
return err
}
return s.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(auth.TokenStorageKey(tokenHash)), tokenData)
// Set TTL to the token expiration time
ttl := time.Until(time.Unix(expiresAt, 0))
if ttl > 0 {
entry = entry.WithTTL(ttl)
}
return txn.SetEntry(entry)
})
}
// getRevisionHistory retrieves revision history for a key
func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error) {
return s.revisionService.GetRevisionHistory(key)
}
// getSpecificRevision retrieves a specific revision of a key
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
return s.revisionService.GetSpecificRevision(key, revision)
}
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
// Ensure clustering is enabled
if !s.config.ClusteringEnabled {
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
return
}
// Ensure cluster secret is configured
if s.config.ClusterSecret == "" {
s.logger.Error("Cluster secret is not configured")
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
return
}
// Return the cluster secret for secure bootstrap
response := map[string]string{
"cluster_secret": s.config.ClusterSecret,
}
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}