forked from ryyst/kalzu-value-store
Compare commits
2 Commits
kalzu/issu
...
secure-clu
Author | SHA1 | Date | |
---|---|---|---|
852275945c | |||
c7dcebb894 |
40
auth/auth.go
40
auth/auth.go
@@ -228,43 +228,3 @@ func (s *AuthService) HasUsers() (bool, error) {
|
|||||||
|
|
||||||
return hasUsers, err
|
return hasUsers, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreResourceMetadata stores or updates resource metadata in BadgerDB
|
|
||||||
func (s *AuthService) StoreResourceMetadata(path string, metadata *types.ResourceMetadata) error {
|
|
||||||
now := time.Now().Unix()
|
|
||||||
if metadata.CreatedAt == 0 {
|
|
||||||
metadata.CreatedAt = now
|
|
||||||
}
|
|
||||||
metadata.UpdatedAt = now
|
|
||||||
|
|
||||||
metadataData, err := json.Marshal(metadata)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.db.Update(func(txn *badger.Txn) error {
|
|
||||||
return txn.Set([]byte(ResourceMetadataKey(path)), metadataData)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetResourceMetadata retrieves resource metadata from BadgerDB
|
|
||||||
func (s *AuthService) GetResourceMetadata(path string) (*types.ResourceMetadata, error) {
|
|
||||||
var metadata types.ResourceMetadata
|
|
||||||
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
|
||||||
item, err := txn.Get([]byte(ResourceMetadataKey(path)))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return item.Value(func(val []byte) error {
|
|
||||||
return json.Unmarshal(val, &metadata)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &metadata, nil
|
|
||||||
}
|
|
||||||
|
77
auth/cluster.go
Normal file
77
auth/cluster.go
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClusterAuthService handles authentication for inter-cluster communication
|
||||||
|
type ClusterAuthService struct {
|
||||||
|
clusterSecret string
|
||||||
|
logger *logrus.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClusterAuthService creates a new cluster authentication service
|
||||||
|
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
|
||||||
|
return &ClusterAuthService{
|
||||||
|
clusterSecret: clusterSecret,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Middleware validates cluster authentication headers
|
||||||
|
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Extract authentication headers
|
||||||
|
clusterSecret := r.Header.Get("X-Cluster-Secret")
|
||||||
|
nodeID := r.Header.Get("X-Node-ID")
|
||||||
|
|
||||||
|
// Log authentication attempt
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
"method": r.Method,
|
||||||
|
}).Debug("Cluster authentication attempt")
|
||||||
|
|
||||||
|
// Validate cluster secret
|
||||||
|
if clusterSecret == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Cluster-Secret header")
|
||||||
|
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterSecret != s.clusterSecret {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Invalid cluster secret")
|
||||||
|
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate node ID is present
|
||||||
|
if nodeID == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Node-ID header")
|
||||||
|
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Authentication successful
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Debug("Cluster authentication successful")
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/join", seedAddr)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create join request")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"seed": seedAddr,
|
"seed": seedAddr,
|
||||||
|
@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send HTTP request to peer
|
// Send HTTP request to peer with cluster authentication
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create gossip request")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"peer": peer.Address,
|
"peer": peer.Address,
|
||||||
|
43
cluster/http_client.go
Normal file
43
cluster/http_client.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"kvs/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
|
||||||
|
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure TLS if enabled
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: config.ClusterTLSSkipVerify,
|
||||||
|
}
|
||||||
|
|
||||||
|
client.Transport = &http.Transport{
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddClusterAuthHeaders adds authentication headers to an HTTP request
|
||||||
|
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
|
||||||
|
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
|
||||||
|
req.Header.Set("X-Node-ID", config.NodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
|
||||||
|
func GetProtocol(config *types.Config) string {
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
return "https"
|
||||||
|
}
|
||||||
|
return "http"
|
||||||
|
}
|
@@ -174,158 +174,3 @@ func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.Me
|
|||||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
||||||
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetKeysInRange retrieves all keys within a given range using the Merkle tree
|
|
||||||
// This traverses the tree to find leaf nodes in the range without loading full values
|
|
||||||
func (s *MerkleService) GetKeysInRange(startKey, endKey string, limit int) ([]string, error) {
|
|
||||||
pairs, err := s.GetAllKVPairsForMerkleTree()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
|
||||||
keys := make([]string, 0, len(filteredPairs))
|
|
||||||
for k := range filteredPairs {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
sort.Strings(keys)
|
|
||||||
|
|
||||||
if limit > 0 && len(keys) > limit {
|
|
||||||
keys = keys[:limit]
|
|
||||||
return keys, nil // Note: Truncation handled in handler
|
|
||||||
}
|
|
||||||
|
|
||||||
return keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetKeysInPrefix retrieves keys that match a prefix (for _ls)
|
|
||||||
func (s *MerkleService) GetKeysInPrefix(prefix string, limit int) ([]string, error) {
|
|
||||||
// Compute endKey as the next lexicographical prefix
|
|
||||||
endKey := prefix + "~" // Simple sentinel for prefix range [prefix, prefix~]
|
|
||||||
|
|
||||||
keys, err := s.GetKeysInRange(prefix, endKey, limit)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter to direct children only (strip prefix and ensure no deeper nesting)
|
|
||||||
directChildren := make([]string, 0, len(keys))
|
|
||||||
for _, key := range keys {
|
|
||||||
if strings.HasPrefix(key, prefix) {
|
|
||||||
subpath := strings.TrimPrefix(key, prefix)
|
|
||||||
if subpath != "" && !strings.Contains(subpath, "/") { // Direct child: no further "/"
|
|
||||||
directChildren = append(directChildren, subpath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Strings(directChildren)
|
|
||||||
|
|
||||||
if limit > 0 && len(directChildren) > limit {
|
|
||||||
directChildren = directChildren[:limit]
|
|
||||||
}
|
|
||||||
|
|
||||||
return directChildren, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTreeForPrefix builds a recursive tree for a prefix
|
|
||||||
func (s *MerkleService) GetTreeForPrefix(prefix string, maxDepth int, limit int) (*KeyTreeResponse, error) {
|
|
||||||
if maxDepth <= 0 {
|
|
||||||
maxDepth = 5 // Default safety limit
|
|
||||||
}
|
|
||||||
|
|
||||||
tree := &KeyTreeResponse{
|
|
||||||
Path: prefix,
|
|
||||||
}
|
|
||||||
|
|
||||||
var buildTree func(string, int) error
|
|
||||||
var total int
|
|
||||||
|
|
||||||
buildTree = func(currentPrefix string, depth int) error {
|
|
||||||
if depth > maxDepth || total >= limit {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get direct children
|
|
||||||
childrenKeys, err := s.GetKeysInPrefix(currentPrefix, limit-total)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeChildren := make([]interface{}, 0, len(childrenKeys))
|
|
||||||
for _, subkey := range childrenKeys {
|
|
||||||
total++
|
|
||||||
if total >= limit {
|
|
||||||
tree.Truncated = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fullKey := currentPrefix + subkey
|
|
||||||
// Get timestamp for this key
|
|
||||||
timestamp, err := s.getTimestampForKey(fullKey)
|
|
||||||
if err != nil {
|
|
||||||
timestamp = 0 // Fallback
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if this has children (simple check: query subprefix)
|
|
||||||
subPrefix := fullKey + "/"
|
|
||||||
subChildrenKeys, _ := s.GetKeysInPrefix(subPrefix, 1) // Probe for existence
|
|
||||||
|
|
||||||
if len(subChildrenKeys) > 0 && depth < maxDepth {
|
|
||||||
// Recursive node
|
|
||||||
subTree := &KeyTreeNode{
|
|
||||||
Subkey: subkey,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
if err := buildTree(subPrefix, depth+1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
subTree.Children = tree.Children // Wait, no: this is wrong, need to set properly
|
|
||||||
// Actually, since buildTree populates the parent, but wait - restructure
|
|
||||||
|
|
||||||
// Better: populate subTree.Children here
|
|
||||||
// But to avoid deep recursion, limit probes
|
|
||||||
nodeChildren = append(nodeChildren, subTree)
|
|
||||||
} else {
|
|
||||||
// Leaf
|
|
||||||
nodeChildren = append(nodeChildren, &KeyListItem{
|
|
||||||
Subkey: subkey,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now set to parent - but since recursive, need to return the list
|
|
||||||
// Refactor: make buildTree return the children list
|
|
||||||
return nil // Simplified for now; implement iteratively if needed
|
|
||||||
}
|
|
||||||
|
|
||||||
err := buildTree(prefix, 1)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
tree.Total = total
|
|
||||||
return tree, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to get timestamp for a key
|
|
||||||
func (s *MerkleService) getTimestampForKey(key string) (int64, error) {
|
|
||||||
var timestamp int64
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
|
||||||
item, err := txn.Get([]byte(key))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var storedValue types.StoredValue
|
|
||||||
return item.Value(func(val []byte) error {
|
|
||||||
return json.Unmarshal(val, &storedValue)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return storedValue.Timestamp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note: The recursive implementation above has a bug in populating children.
|
|
||||||
// For production, implement iteratively with a stack to build the tree structure.
|
|
||||||
|
@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
|
|||||||
|
|
||||||
// requestMerkleRoot requests the Merkle root from a peer
|
// requestMerkleRoot requests the Merkle root from a peer
|
||||||
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
|
|||||||
|
|
||||||
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -461,16 +475,24 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
}
|
}
|
||||||
|
|
||||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||||
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
|
|||||||
|
|
||||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||||
req := types.KVRangeRequest{
|
reqData := types.KVRangeRequest{
|
||||||
StartKey: startKey,
|
StartKey: startKey,
|
||||||
EndKey: endKey,
|
EndKey: endKey,
|
||||||
Limit: 0, // No limit
|
Limit: 0, // No limit
|
||||||
}
|
}
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
|
||||||
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -1,12 +1,14 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"kvs/types"
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
"kvs/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration
|
// Default configuration
|
||||||
@@ -59,9 +61,29 @@ func Default() *types.Config {
|
|||||||
// Default anonymous access settings (both disabled by default for security)
|
// Default anonymous access settings (both disabled by default for security)
|
||||||
AllowAnonymousRead: false,
|
AllowAnonymousRead: false,
|
||||||
AllowAnonymousWrite: false,
|
AllowAnonymousWrite: false,
|
||||||
|
|
||||||
|
// Default cluster authentication settings (Issue #13)
|
||||||
|
ClusterSecret: generateClusterSecret(),
|
||||||
|
ClusterTLSEnabled: false,
|
||||||
|
ClusterTLSCertFile: "",
|
||||||
|
ClusterTLSKeyFile: "",
|
||||||
|
ClusterTLSSkipVerify: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateClusterSecret generates a cryptographically secure random cluster secret
|
||||||
|
func generateClusterSecret() string {
|
||||||
|
// Generate 32 bytes (256 bits) of random data
|
||||||
|
randomBytes := make([]byte, 32)
|
||||||
|
if _, err := rand.Read(randomBytes); err != nil {
|
||||||
|
// Fallback to a warning - this should never happen in practice
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// Encode as base64 for easy configuration file storage
|
||||||
|
return base64.StdEncoding.EncodeToString(randomBytes)
|
||||||
|
}
|
||||||
|
|
||||||
// Load configuration from file or create default
|
// Load configuration from file or create default
|
||||||
func Load(configPath string) (*types.Config, error) {
|
func Load(configPath string) (*types.Config, error) {
|
||||||
config := Default()
|
config := Default()
|
||||||
@@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
|
|||||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
|
||||||
|
if config.ClusteringEnabled && config.ClusterSecret == "" {
|
||||||
|
config.ClusterSecret = generateClusterSecret()
|
||||||
|
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
|
||||||
|
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
|
||||||
|
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
|
||||||
|
}
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
@@ -119,35 +119,15 @@ EOF
|
|||||||
|
|
||||||
kill $pid 2>/dev/null || true
|
kill $pid 2>/dev/null || true
|
||||||
sleep 2
|
sleep 2
|
||||||
|
|
||||||
# Test _ls endpoint
|
|
||||||
echo "Testing _ls endpoint..."
|
|
||||||
curl -X PUT http://localhost:8080/kv/home/room/closet/socks -H "Content-Type: application/json" -d '{"data":"socks"}'
|
|
||||||
curl -X PUT http://localhost:8080/kv/home/room/bed/sheets -H "Content-Type: application/json" -d '{"data":"sheets"}'
|
|
||||||
sleep 2 # Allow indexing
|
|
||||||
|
|
||||||
ls_response=$(curl -s http://localhost:8080/kv/home/room/_ls)
|
|
||||||
if echo "$ls_response" | jq -e '.children | length == 2' >/dev/null; then
|
|
||||||
echo "✓ _ls returns correct number of children"
|
|
||||||
else
|
|
||||||
echo "✗ _ls failed"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Test _tree endpoint
|
|
||||||
tree_response=$(curl -s http://localhost:8080/kv/home/_tree?depth=2)
|
|
||||||
if echo "$tree_response" | jq -e '.total > 0' >/dev/null; then
|
|
||||||
echo "✓ _tree returns tree structure"
|
|
||||||
else
|
|
||||||
echo "✗ _tree failed"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Test 3: Cluster formation
|
# Test 3: Cluster formation
|
||||||
test_cluster_formation() {
|
test_cluster_formation() {
|
||||||
test_start "2-node cluster formation and Merkle Tree replication"
|
test_start "2-node cluster formation and Merkle Tree replication"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
|
||||||
|
|
||||||
# Node 1 config
|
# Node 1 config
|
||||||
cat > cluster1.yaml <<EOF
|
cat > cluster1.yaml <<EOF
|
||||||
node_id: "cluster-1"
|
node_id: "cluster-1"
|
||||||
@@ -161,6 +141,7 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Node 2 config
|
# Node 2 config
|
||||||
@@ -176,6 +157,7 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
@@ -262,6 +244,9 @@ test_conflict_resolution() {
|
|||||||
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
||||||
cd "$TEST_DIR"
|
cd "$TEST_DIR"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
|
||||||
|
|
||||||
# Create configs
|
# Create configs
|
||||||
cat > conflict1.yaml <<EOF
|
cat > conflict1.yaml <<EOF
|
||||||
node_id: "conflict-1"
|
node_id: "conflict-1"
|
||||||
@@ -273,6 +258,7 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
cat > conflict2.yaml <<EOF
|
cat > conflict2.yaml <<EOF
|
||||||
@@ -285,6 +271,7 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
|
120
issues/7and12.md
120
issues/7and12.md
@@ -1,120 +0,0 @@
|
|||||||
#7 Add _ls and _tree Endpoints for Hierarchical Key Listing Using Merkle Tree
|
|
||||||
-----------------------------------------
|
|
||||||
|
|
||||||
KVS supports hierarchical keys (e.g., /home/room/closet/socks), which is great for organizing data like a file system. However, there's currently no built-in way for clients to discover or list subkeys under a given prefix/path. This makes it hard to build intuitive tools or UIs that need to navigate the keyspace, such as a web-based explorer or CLI client.
|
|
||||||
|
|
||||||
Add two new read-only endpoints that leverage the existing Merkle tree infrastructure for efficient prefix-based key listing. This aligns with KVS's modular design, eventual consistency model, and Merkle-based sync (no need for full DB scans—traverse the tree to identify relevant leaf nodes in O(log N) time).
|
|
||||||
Proposed Endpoints
|
|
||||||
|
|
||||||
Direct Children Listing (_ls or _list):
|
|
||||||
Endpoint: GET /kv/{path}/_ls (or GET /kv/{path}/_list for clarity).
|
|
||||||
Purpose: Returns a sorted list of direct subkeys under the given path/prefix (non-recursive).
|
|
||||||
Query Params (optional):
|
|
||||||
limit: Max number of keys to return (default: 100, max: 1000).
|
|
||||||
include_metadata: If true, include basic metadata like timestamps (default: false).
|
|
||||||
Response (JSON):
|
|
||||||
|
|
||||||
{
|
|
||||||
"path": "/home/room",
|
|
||||||
"children": [
|
|
||||||
{ "subkey": "closet", "timestamp": 1695280000000 },
|
|
||||||
{ "subkey": "bed", "timestamp": 1695279000000 }
|
|
||||||
],
|
|
||||||
"total": 2,
|
|
||||||
"truncated": false
|
|
||||||
}
|
|
||||||
|
|
||||||
Behavior:
|
|
||||||
Treat {path} as a prefix (e.g., /home/room/ → keys starting with /home/room/ but not /home/room/sub/).
|
|
||||||
Use the Merkle tree to find leaf nodes in the prefix range [prefix, prefix~] (where ~ is the next lexicographical prefix).
|
|
||||||
Skip index keys (e.g., _ts:*).
|
|
||||||
Respect auth: Use existing middleware (e.g., read scope if auth_enabled: true).
|
|
||||||
In read-only/syncing modes: Allow if not modifying data.
|
|
||||||
|
|
||||||
Recursive Tree View (_tree):
|
|
||||||
|
|
||||||
Endpoint: GET /kv/{path}/_tree.
|
|
||||||
Purpose: Returns a recursive tree structure of all subkeys under the given path (depth-first or breadth-first, configurable).
|
|
||||||
Query Params (optional):
|
|
||||||
depth: Max recursion depth (default: unlimited, but suggest 5 for safety).
|
|
||||||
limit: Max total keys (default: 500, max: 5000).
|
|
||||||
include_metadata: Include timestamps/UUIDs (default: false).
|
|
||||||
format: json (default) or nested (tree-like JSON).
|
|
||||||
Response (JSON, nested format):
|
|
||||||
|
|
||||||
{
|
|
||||||
"path": "/home/room",
|
|
||||||
"children": [
|
|
||||||
{
|
|
||||||
"subkey": "closet",
|
|
||||||
"children": [
|
|
||||||
{ "subkey": "socks", "timestamp": 1695281000000 }
|
|
||||||
],
|
|
||||||
"timestamp": 1695280000000
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"subkey": "bed",
|
|
||||||
"timestamp": 1695279000000
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"total": 3,
|
|
||||||
"truncated": false
|
|
||||||
}
|
|
||||||
|
|
||||||
Behavior:
|
|
||||||
Build on _ls logic: Recursively query sub-prefixes via Merkle tree traversal.
|
|
||||||
Prune at depth or limit to avoid overload.
|
|
||||||
Same auth and mode rules as _ls.
|
|
||||||
|
|
||||||
Integration with Existing Systems
|
|
||||||
|
|
||||||
Merkle Tree Usage: Extend cluster/merkle.go (e.g., add GetKeysInRange(startKey, endKey) []string method) to traverse nodes covering the prefix range without fetching full values. Reuse buildMerkleTreeFromPairs and filterPairsByRange from handlers.go.
|
|
||||||
Range Query Reuse: Build on existing KVRangeRequest/KVRangeResponse in types.go and getKVRangeHandler (strip values to return just keys for efficiency).
|
|
||||||
Auth & Permissions: Apply via authService.Middleware (e.g., read scope). Respect allow_anonymous_read.
|
|
||||||
Config Toggle: Add key_listing_enabled: true to types.Config for optional disable (e.g., for security in public clusters).
|
|
||||||
Distributed Consistency: Since Merkle trees are synced, listings will be eventually consistent across nodes. Add a consistent: true query param to force a quick Merkle refresh if needed.
|
|
||||||
|
|
||||||
|
|
||||||
#12 Missing API Endpoints for Resource Metadata Management (Ownership & Permissions)
|
|
||||||
-----------------------------------------
|
|
||||||
|
|
||||||
The KVS system currently lacks API endpoints to manage ResourceMetadata for key-value paths (/kv/{path}). While the AuthService and permissions.go implement robust permission checking based on OwnerUUID, GroupUUID, and Permissions, there are no exposed routes to:
|
|
||||||
|
|
||||||
Assign group-level permissions: Users cannot grant read/write access to specific groups for a given key-value path.
|
|
||||||
|
|
||||||
Change resource ownership: Users cannot transfer ownership of a key-value entry to another user.
|
|
||||||
|
|
||||||
This prevents administrators from fully leveraging the existing authentication and authorization framework for fine-grained access control over stored data.
|
|
||||||
|
|
||||||
Impact:
|
|
||||||
|
|
||||||
Limited administrative control over data access.
|
|
||||||
|
|
||||||
Inability to implement granular, group-based access policies for KV data.
|
|
||||||
|
|
||||||
Difficulty in reassigning data ownership when users or roles change.
|
|
||||||
|
|
||||||
Proposed Solution:
|
|
||||||
Implement new API endpoints (e.g., /kv/{path}/metadata) to allow authenticated and authorized users to:
|
|
||||||
|
|
||||||
Set/update the OwnerUUID for a given path.
|
|
||||||
|
|
||||||
Set/update the GroupUUID for a given path.
|
|
||||||
|
|
||||||
Set/update the Permissions bitmask for a given path.
|
|
||||||
|
|
||||||
Relevant Files:
|
|
||||||
|
|
||||||
server/routes.go (for new API routes)
|
|
||||||
|
|
||||||
server/handlers.go (for implementing new handlers)
|
|
||||||
|
|
||||||
auth/auth.go (for AuthService methods to interact with ResourceMetadata)
|
|
||||||
|
|
||||||
auth/permissions.go (existing logic for permission checks)
|
|
||||||
|
|
||||||
types/types.go (for ResourceMetadata structure)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
1
main.go
1
main.go
@@ -11,7 +11,6 @@ import (
|
|||||||
"kvs/server"
|
"kvs/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configPath := "./config.yaml"
|
configPath := "./config.yaml"
|
||||||
|
|
||||||
|
@@ -1097,102 +1097,6 @@ func (s *Server) getSpecificRevisionHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
json.NewEncoder(w).Encode(storedValue)
|
json.NewEncoder(w).Encode(storedValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getKeyListHandler handles _ls endpoint for direct children
|
|
||||||
func (s *Server) getKeyListHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := "/" + vars["path"] // Ensure leading slash for consistency
|
|
||||||
|
|
||||||
// Parse query params
|
|
||||||
limitStr := r.URL.Query().Get("limit")
|
|
||||||
limit := 100 // Default
|
|
||||||
if limitStr != "" {
|
|
||||||
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
|
|
||||||
limit = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
includeMetadata := r.URL.Query().Get("include_metadata") == "true"
|
|
||||||
|
|
||||||
mode := s.getMode()
|
|
||||||
if mode == "syncing" {
|
|
||||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
keys, err := s.merkleService.GetKeysInPrefix(path, limit)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get keys in prefix")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response := KeyListResponse{
|
|
||||||
Path: path,
|
|
||||||
Children: make([]struct{ Subkey string; Timestamp int64 }, len(keys)),
|
|
||||||
Total: len(keys),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, subkey := range keys {
|
|
||||||
fullKey := path + subkey
|
|
||||||
if includeMetadata {
|
|
||||||
ts, err := s.merkleService.getTimestampForKey(fullKey)
|
|
||||||
if err == nil {
|
|
||||||
response.Children[i].Timestamp = ts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response.Children[i].Subkey = subkey
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(keys) >= limit {
|
|
||||||
response.Truncated = true
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
json.NewEncoder(w).Encode(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getKeyTreeHandler handles _tree endpoint for recursive tree
|
|
||||||
func (s *Server) getKeyTreeHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := "/" + vars["path"]
|
|
||||||
|
|
||||||
// Parse query params
|
|
||||||
depthStr := r.URL.Query().Get("depth")
|
|
||||||
maxDepth := 0 // Unlimited
|
|
||||||
if depthStr != "" {
|
|
||||||
if d, err := strconv.Atoi(depthStr); err == nil && d > 0 {
|
|
||||||
maxDepth = d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
limitStr := r.URL.Query().Get("limit")
|
|
||||||
limit := 500
|
|
||||||
if limitStr != "" {
|
|
||||||
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 5000 {
|
|
||||||
limit = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
includeMetadata := r.URL.Query().Get("include_metadata") == "true"
|
|
||||||
|
|
||||||
mode := s.getMode()
|
|
||||||
if mode == "syncing" {
|
|
||||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tree, err := s.merkleService.GetTreeForPrefix(path, maxDepth, limit)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to build tree")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
json.NewEncoder(w).Encode(tree)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// calculateHash computes SHA256 hash of data
|
// calculateHash computes SHA256 hash of data
|
||||||
func calculateHash(data []byte) []byte {
|
func calculateHash(data []byte) []byte {
|
||||||
h := sha256.New()
|
h := sha256.New()
|
||||||
@@ -1366,141 +1270,28 @@ func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredVal
|
|||||||
return s.revisionService.GetSpecificRevision(key, revision)
|
return s.revisionService.GetSpecificRevision(key, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getResourceMetadataHandler retrieves metadata for a resource path
|
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
|
||||||
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
// Ensure clustering is enabled
|
||||||
path := vars["path"]
|
if !s.config.ClusteringEnabled {
|
||||||
|
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
|
||||||
authCtx := auth.GetAuthContext(r.Context())
|
|
||||||
if authCtx == nil {
|
|
||||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check read permission on the resource
|
// Ensure cluster secret is configured
|
||||||
if !s.authService.CheckResourcePermission(authCtx, path, "read") {
|
if s.config.ClusterSecret == "" {
|
||||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
s.logger.Error("Cluster secret is not configured")
|
||||||
|
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata, err := s.authService.GetResourceMetadata(path)
|
// Return the cluster secret for secure bootstrap
|
||||||
if err == badger.ErrKeyNotFound {
|
response := map[string]string{
|
||||||
// Return default metadata if not found
|
"cluster_secret": s.config.ClusterSecret,
|
||||||
defaultMetadata := types.ResourceMetadata{
|
|
||||||
OwnerUUID: authCtx.UserUUID,
|
|
||||||
GroupUUID: "",
|
|
||||||
Permissions: types.DefaultPermissions,
|
|
||||||
CreatedAt: time.Now().Unix(),
|
|
||||||
UpdatedAt: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
metadata = &defaultMetadata
|
|
||||||
} else if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := types.GetResourceMetadataResponse{
|
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
|
||||||
OwnerUUID: metadata.OwnerUUID,
|
|
||||||
GroupUUID: metadata.GroupUUID,
|
|
||||||
Permissions: metadata.Permissions,
|
|
||||||
TTL: metadata.TTL,
|
|
||||||
CreatedAt: metadata.CreatedAt,
|
|
||||||
UpdatedAt: metadata.UpdatedAt,
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
json.NewEncoder(w).Encode(response)
|
json.NewEncoder(w).Encode(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateResourceMetadataHandler updates metadata for a resource path
|
|
||||||
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := vars["path"]
|
|
||||||
|
|
||||||
authCtx := auth.GetAuthContext(r.Context())
|
|
||||||
if authCtx == nil {
|
|
||||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check write permission on the resource (owner write required for metadata changes)
|
|
||||||
if !s.authService.CheckResourcePermission(authCtx, path, "write") {
|
|
||||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var req types.UpdateResourceMetadataRequest
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
||||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get current metadata (or default if not exists)
|
|
||||||
currentMetadata, err := s.authService.GetResourceMetadata(path)
|
|
||||||
if err == badger.ErrKeyNotFound {
|
|
||||||
currentMetadata = &types.ResourceMetadata{
|
|
||||||
OwnerUUID: authCtx.UserUUID,
|
|
||||||
GroupUUID: "",
|
|
||||||
Permissions: types.DefaultPermissions,
|
|
||||||
CreatedAt: time.Now().Unix(),
|
|
||||||
UpdatedAt: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
} else if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get current resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply updates only to provided fields
|
|
||||||
updated := false
|
|
||||||
if req.OwnerUUID != "" {
|
|
||||||
currentMetadata.OwnerUUID = req.OwnerUUID
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.GroupUUID != "" {
|
|
||||||
currentMetadata.GroupUUID = req.GroupUUID
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.Permissions != 0 {
|
|
||||||
currentMetadata.Permissions = req.Permissions
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.TTL != "" {
|
|
||||||
currentMetadata.TTL = req.TTL
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !updated {
|
|
||||||
http.Error(w, "No fields provided for update", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store updated metadata
|
|
||||||
if err := s.authService.StoreResourceMetadata(path, currentMetadata); err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to store resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response := types.GetResourceMetadataResponse{
|
|
||||||
OwnerUUID: currentMetadata.OwnerUUID,
|
|
||||||
GroupUUID: currentMetadata.GroupUUID,
|
|
||||||
Permissions: currentMetadata.Permissions,
|
|
||||||
TTL: currentMetadata.TTL,
|
|
||||||
CreatedAt: currentMetadata.CreatedAt,
|
|
||||||
UpdatedAt: currentMetadata.UpdatedAt,
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
json.NewEncoder(w).Encode(response)
|
|
||||||
|
|
||||||
s.logger.WithFields(logrus.Fields{
|
|
||||||
"path": path,
|
|
||||||
"user_uuid": authCtx.UserUUID,
|
|
||||||
"owner_uuid": currentMetadata.OwnerUUID,
|
|
||||||
"group_uuid": currentMetadata.GroupUUID,
|
|
||||||
"permissions": currentMetadata.Permissions,
|
|
||||||
}).Info("Resource metadata updated")
|
|
||||||
}
|
|
||||||
|
@@ -1,6 +1,8 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -39,43 +41,24 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource Metadata endpoints (available when auth is enabled)
|
|
||||||
if s.config.AuthEnabled {
|
|
||||||
// GET metadata - require read permission
|
|
||||||
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
|
||||||
[]string{"read"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "read",
|
|
||||||
)(s.getResourceMetadataHandler)).Methods("GET")
|
|
||||||
|
|
||||||
// PUT metadata - require write permission (owner write)
|
|
||||||
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
|
||||||
[]string{"write"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "write",
|
|
||||||
)(s.updateResourceMetadataHandler)).Methods("PUT")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Key listing endpoints (read-only, leverage Merkle tree)
|
|
||||||
if s.config.ClusteringEnabled { // Require Merkle for efficiency
|
|
||||||
// _ls endpoint - require read if auth enabled and not anonymous
|
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
|
||||||
router.Handle("/kv/{path:.+}/_ls", s.authService.Middleware(
|
|
||||||
[]string{"read"}, nil, "",
|
|
||||||
)(s.getKeyListHandler)).Methods("GET")
|
|
||||||
} else {
|
|
||||||
router.HandleFunc("/kv/{path:.+}/_ls", s.getKeyListHandler).Methods("GET")
|
|
||||||
}
|
|
||||||
|
|
||||||
// _tree endpoint - same auth rules
|
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
|
||||||
router.Handle("/kv/{path:.+}/_tree", s.authService.Middleware(
|
|
||||||
[]string{"read"}, nil, "",
|
|
||||||
)(s.getKeyTreeHandler)).Methods("GET")
|
|
||||||
} else {
|
|
||||||
router.HandleFunc("/kv/{path:.+}/_tree", s.getKeyTreeHandler).Methods("GET")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Member endpoints (available when clustering is enabled)
|
// Member endpoints (available when clustering is enabled)
|
||||||
if s.config.ClusteringEnabled {
|
if s.config.ClusteringEnabled {
|
||||||
|
// GET /members/ is unprotected for monitoring/inspection
|
||||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||||
|
|
||||||
|
// Apply cluster authentication middleware to all cluster communication endpoints
|
||||||
|
if s.clusterAuthService != nil {
|
||||||
|
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
||||||
|
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
||||||
|
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
||||||
|
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
|
||||||
|
|
||||||
|
// Merkle Tree endpoints (clustering feature)
|
||||||
|
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
|
||||||
|
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
|
||||||
|
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
||||||
|
} else {
|
||||||
|
// Fallback to unprotected endpoints (for backwards compatibility)
|
||||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||||
@@ -86,6 +69,7 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Authentication and user management endpoints (available when auth is enabled)
|
// Authentication and user management endpoints (available when auth is enabled)
|
||||||
if s.config.AuthEnabled {
|
if s.config.AuthEnabled {
|
||||||
@@ -127,6 +111,12 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/api/tokens", s.authService.Middleware(
|
router.Handle("/api/tokens", s.authService.Middleware(
|
||||||
[]string{"admin:tokens:create"}, nil, "",
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
)(s.createTokenHandler)).Methods("POST")
|
)(s.createTokenHandler)).Methods("POST")
|
||||||
|
|
||||||
|
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
|
||||||
|
// Allows authenticated administrators to retrieve the cluster secret for new nodes
|
||||||
|
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
|
||||||
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
|
)(s.clusterBootstrapHandler)).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Revision History endpoints (available when revision history is enabled)
|
// Revision History endpoints (available when revision history is enabled)
|
||||||
|
@@ -51,6 +51,7 @@ type Server struct {
|
|||||||
|
|
||||||
// Authentication service
|
// Authentication service
|
||||||
authService *auth.AuthService
|
authService *auth.AuthService
|
||||||
|
clusterAuthService *auth.ClusterAuthService
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer initializes and returns a new Server instance
|
// NewServer initializes and returns a new Server instance
|
||||||
@@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) {
|
|||||||
// Initialize authentication service
|
// Initialize authentication service
|
||||||
server.authService = auth.NewAuthService(db, logger, config)
|
server.authService = auth.NewAuthService(db, logger, config)
|
||||||
|
|
||||||
|
// Initialize cluster authentication service (Issue #13)
|
||||||
|
if config.ClusteringEnabled {
|
||||||
|
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
|
||||||
|
}
|
||||||
|
|
||||||
// Setup initial root account if needed (Issue #3)
|
// Setup initial root account if needed (Issue #3)
|
||||||
if config.AuthEnabled {
|
if config.AuthEnabled {
|
||||||
if err := server.setupRootAccount(); err != nil {
|
if err := server.setupRootAccount(); err != nil {
|
||||||
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -131,23 +131,6 @@ type CreateTokenResponse struct {
|
|||||||
ExpiresAt int64 `json:"expires_at"`
|
ExpiresAt int64 `json:"expires_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource Metadata Management API structures
|
|
||||||
type UpdateResourceMetadataRequest struct {
|
|
||||||
OwnerUUID string `json:"owner_uuid,omitempty"`
|
|
||||||
GroupUUID string `json:"group_uuid,omitempty"`
|
|
||||||
Permissions int `json:"permissions,omitempty"`
|
|
||||||
TTL string `json:"ttl,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type GetResourceMetadataResponse struct {
|
|
||||||
OwnerUUID string `json:"owner_uuid"`
|
|
||||||
GroupUUID string `json:"group_uuid"`
|
|
||||||
Permissions int `json:"permissions"`
|
|
||||||
TTL string `json:"ttl"`
|
|
||||||
CreatedAt int64 `json:"created_at"`
|
|
||||||
UpdatedAt int64 `json:"updated_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cluster and member management types
|
// Cluster and member management types
|
||||||
type Member struct {
|
type Member struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
@@ -232,38 +215,6 @@ type MerkleTreeDiffResponse struct {
|
|||||||
Keys []string `json:"keys,omitempty"` // Actual keys if this is a leaf-level diff
|
Keys []string `json:"keys,omitempty"` // Actual keys if this is a leaf-level diff
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyListResponse is the response for _ls endpoint
|
|
||||||
type KeyListResponse struct {
|
|
||||||
Path string `json:"path"`
|
|
||||||
Children []struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
} `json:"children"`
|
|
||||||
Total int `json:"total"`
|
|
||||||
Truncated bool `json:"truncated"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyTreeResponse is the response for _tree endpoint
|
|
||||||
type KeyTreeResponse struct {
|
|
||||||
Path string `json:"path"`
|
|
||||||
Children []interface{} `json:"children"` // Mixed: either KeyTreeNode or KeyListItem for leaves
|
|
||||||
Total int `json:"total"`
|
|
||||||
Truncated bool `json:"truncated"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyTreeNode represents a node in the tree
|
|
||||||
type KeyTreeNode struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
Children []interface{} `json:"children,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyListItem represents a leaf in the tree (without children)
|
|
||||||
type KeyListItem struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// For fetching a range of KV pairs
|
// For fetching a range of KV pairs
|
||||||
type KVRangeRequest struct {
|
type KVRangeRequest struct {
|
||||||
StartKey string `json:"start_key"`
|
StartKey string `json:"start_key"`
|
||||||
@@ -327,6 +278,10 @@ type Config struct {
|
|||||||
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
||||||
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
||||||
|
|
||||||
// Key listing configuration
|
// Cluster authentication (Issue #13)
|
||||||
KeyListingEnabled bool `yaml:"key_listing_enabled"` // Enable/disable hierarchical key listing
|
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
|
||||||
|
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
|
||||||
|
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
|
||||||
|
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
|
||||||
|
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user