3 Commits

Author SHA1 Message Date
377af163f0 feat: implement resource metadata management API (issue #12)
Add API endpoints to manage ResourceMetadata (ownership, groups, permissions)
for KV resources. This enables administrators to configure granular access
control for stored data.

Changes:
- Add GetResourceMetadataResponse and UpdateResourceMetadataRequest types
- Add GetResourceMetadata and SetResourceMetadata methods to AuthService
- Add GET /kv/{path}/metadata endpoint (requires admin:users:read)
- Add PUT /kv/{path}/metadata endpoint (requires admin:users:update)
- Both endpoints protected by JWT authentication
- Metadata routes registered before general KV routes to prevent pattern conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 00:06:14 +03:00
852275945c fix: update bootstrap service and routes for cluster authentication
- Updated bootstrap service to use authenticated HTTP client with cluster auth headers
- Made GET /members/ endpoint unprotected for monitoring/inspection purposes
- All other cluster communication endpoints remain protected by cluster auth middleware

This ensures proper cluster formation while maintaining security for inter-node communication.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:27:15 +03:00
c7dcebb894 feat: implement secure cluster authentication (issue #13)
Implemented a comprehensive secure authentication mechanism for inter-node
cluster communication with the following features:

1. Global Cluster Secret (GCS)
   - Auto-generated cryptographically secure random secret (256-bit)
   - Configurable via YAML config file
   - Shared across all cluster nodes for authentication

2. Cluster Authentication Middleware
   - Validates X-Cluster-Secret and X-Node-ID headers
   - Applied to all cluster endpoints (/members/*, /merkle_tree/*, /kv_range)
   - Comprehensive logging of authentication attempts

3. Authenticated HTTP Client
   - Custom HTTP client with cluster auth headers
   - TLS support with configurable certificate verification
   - Protocol-aware (http/https based on TLS settings)

4. Secure Bootstrap Endpoint
   - New /auth/cluster-bootstrap endpoint
   - Protected by JWT authentication with admin scope
   - Allows new nodes to securely obtain cluster secret

5. Updated Cluster Communication
   - All gossip protocol requests include auth headers
   - All Merkle tree sync requests include auth headers
   - All data replication requests include auth headers

6. Configuration
   - cluster_secret: Shared secret (auto-generated if not provided)
   - cluster_tls_enabled: Enable TLS for inter-node communication
   - cluster_tls_cert_file: Path to TLS certificate
   - cluster_tls_key_file: Path to TLS private key
   - cluster_tls_skip_verify: Skip TLS verification (testing only)

This implementation addresses the security vulnerability of unprotected
cluster endpoints and provides a flexible, secure approach to protecting
internal cluster communication while allowing for automated node bootstrapping.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:19:40 +03:00
29 changed files with 741 additions and 433 deletions

View File

@@ -198,6 +198,40 @@ func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
}
// GetResourceMetadata retrieves metadata for a resource
func (s *AuthService) GetResourceMetadata(resourceKey string) (*types.ResourceMetadata, error) {
var metadata types.ResourceMetadata
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &metadata)
})
})
if err != nil {
return nil, err
}
return &metadata, nil
}
// SetResourceMetadata stores metadata for a resource
func (s *AuthService) SetResourceMetadata(resourceKey string, metadata *types.ResourceMetadata) error {
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %v", err)
}
return s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(ResourceMetadataKey(resourceKey)), metadataBytes)
})
}
// GetAuthContext retrieves auth context from request context
func GetAuthContext(ctx context.Context) *AuthContext {
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {
@@ -228,43 +262,3 @@ func (s *AuthService) HasUsers() (bool, error) {
return hasUsers, err
}
// StoreResourceMetadata stores or updates resource metadata in BadgerDB
func (s *AuthService) StoreResourceMetadata(path string, metadata *types.ResourceMetadata) error {
now := time.Now().Unix()
if metadata.CreatedAt == 0 {
metadata.CreatedAt = now
}
metadata.UpdatedAt = now
metadataData, err := json.Marshal(metadata)
if err != nil {
return err
}
return s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(ResourceMetadataKey(path)), metadataData)
})
}
// GetResourceMetadata retrieves resource metadata from BadgerDB
func (s *AuthService) GetResourceMetadata(path string) (*types.ResourceMetadata, error) {
var metadata types.ResourceMetadata
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(ResourceMetadataKey(path)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &metadata)
})
})
if err != nil {
return nil, err
}
return &metadata, nil
}

77
auth/cluster.go Normal file
View File

@@ -0,0 +1,77 @@
package auth
import (
"net/http"
"github.com/sirupsen/logrus"
)
// ClusterAuthService handles authentication for inter-cluster communication
type ClusterAuthService struct {
clusterSecret string
logger *logrus.Logger
}
// NewClusterAuthService creates a new cluster authentication service
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
return &ClusterAuthService{
clusterSecret: clusterSecret,
logger: logger,
}
}
// Middleware validates cluster authentication headers
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract authentication headers
clusterSecret := r.Header.Get("X-Cluster-Secret")
nodeID := r.Header.Get("X-Node-ID")
// Log authentication attempt
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
"method": r.Method,
}).Debug("Cluster authentication attempt")
// Validate cluster secret
if clusterSecret == "" {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Cluster-Secret header")
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
return
}
if clusterSecret != s.clusterSecret {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Invalid cluster secret")
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
return
}
// Validate node ID is present
if nodeID == "" {
s.logger.WithFields(logrus.Fields{
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Node-ID header")
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
return
}
// Authentication successful
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"path": r.URL.Path,
}).Debug("Cluster authentication successful")
next.ServeHTTP(w, r)
})
}

View File

@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
return false
}
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/members/join", seedAddr)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create join request")
return false
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
s.logger.WithFields(logrus.Fields{
"seed": seedAddr,

View File

@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
return err
}
// Send HTTP request to peer
client := &http.Client{Timeout: 5 * time.Second}
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
// Send HTTP request to peer with cluster authentication
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create gossip request")
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,

43
cluster/http_client.go Normal file
View File

@@ -0,0 +1,43 @@
package cluster
import (
"crypto/tls"
"net/http"
"time"
"kvs/types"
)
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
client := &http.Client{
Timeout: timeout,
}
// Configure TLS if enabled
if config.ClusterTLSEnabled {
tlsConfig := &tls.Config{
InsecureSkipVerify: config.ClusterTLSSkipVerify,
}
client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
return client
}
// AddClusterAuthHeaders adds authentication headers to an HTTP request
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
req.Header.Set("X-Node-ID", config.NodeID)
}
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
func GetProtocol(config *types.Config) string {
if config.ClusterTLSEnabled {
return "https"
}
return "http"
}

View File

@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
// requestMerkleRoot requests the Merkle root from a peer
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
resp, err := client.Get(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
// fetchSingleKVFromPeer fetches a single KV pair from a peer
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
client := &http.Client{Timeout: 5 * time.Second}
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
resp, err := client.Get(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -461,16 +475,24 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
}
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(req)
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(reqData)
if err != nil {
return nil, err
}
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
req := types.KVRangeRequest{
reqData := types.KVRangeRequest{
StartKey: startKey,
EndKey: endKey,
Limit: 0, // No limit
}
jsonData, err := json.Marshal(req)
jsonData, err := json.Marshal(reqData)
if err != nil {
return err
}
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return err
}

View File

@@ -1,12 +1,14 @@
package config
import (
"crypto/rand"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"kvs/types"
"gopkg.in/yaml.v3"
"kvs/types"
)
// Default configuration
@@ -59,9 +61,29 @@ func Default() *types.Config {
// Default anonymous access settings (both disabled by default for security)
AllowAnonymousRead: false,
AllowAnonymousWrite: false,
// Default cluster authentication settings (Issue #13)
ClusterSecret: generateClusterSecret(),
ClusterTLSEnabled: false,
ClusterTLSCertFile: "",
ClusterTLSKeyFile: "",
ClusterTLSSkipVerify: false,
}
}
// generateClusterSecret generates a cryptographically secure random cluster secret
func generateClusterSecret() string {
// Generate 32 bytes (256 bits) of random data
randomBytes := make([]byte, 32)
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to a warning - this should never happen in practice
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
return ""
}
// Encode as base64 for easy configuration file storage
return base64.StdEncoding.EncodeToString(randomBytes)
}
// Load configuration from file or create default
func Load(configPath string) (*types.Config, error) {
config := Default()
@@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
return nil, fmt.Errorf("failed to parse config file: %v", err)
}
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
if config.ClusteringEnabled && config.ClusterSecret == "" {
config.ClusterSecret = generateClusterSecret()
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
}
return config, nil
}

View File

@@ -125,6 +125,9 @@ EOF
test_cluster_formation() {
test_start "2-node cluster formation and Merkle Tree replication"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
# Node 1 config
cat > cluster1.yaml <<EOF
node_id: "cluster-1"
@@ -138,6 +141,7 @@ gossip_interval_max: 10
sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Node 2 config
@@ -153,6 +157,7 @@ gossip_interval_max: 10
sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Start nodes
@@ -239,6 +244,9 @@ test_conflict_resolution() {
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
cd "$TEST_DIR"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
# Create configs
cat > conflict1.yaml <<EOF
node_id: "conflict-1"
@@ -250,6 +258,7 @@ log_level: "info"
sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
cat > conflict2.yaml <<EOF
@@ -262,6 +271,7 @@ log_level: "info"
sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Start nodes
@@ -434,6 +444,95 @@ EOF
fi
}
# Test 6: Resource Metadata Management (Issue #12)
test_metadata_management() {
test_start "Resource Metadata Management test (Issue #12)"
# Create metadata test config
cat > metadata_test.yaml <<EOF
node_id: "metadata-test"
bind_address: "127.0.0.1"
port: 8096
data_dir: "./metadata_test_data"
seed_nodes: []
log_level: "error"
auth_enabled: true
allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node
$BINARY metadata_test.yaml >metadata_test.log 2>&1 &
local pid=$!
if wait_for_service 8096; then
sleep 2 # Allow root account creation
# Extract the token from logs
local token=$(grep "Token:" metadata_test.log | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
kill $pid 2>/dev/null || true
return
fi
# First, create a KV resource
curl -s -X PUT http://localhost:8096/kv/test/resource -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"data":"test"}' >/dev/null
sleep 1
# Test 1: Get metadata should fail for non-existent metadata (initially no metadata exists)
local get_response=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
local get_body=$(echo "$get_response" | head -n -1)
local get_code=$(echo "$get_response" | tail -n 1)
if [ "$get_code" = "404" ]; then
log_success "GET metadata returns 404 for non-existent metadata"
else
log_error "GET metadata should return 404 for non-existent metadata, got code: $get_code, body: $get_body"
fi
# Test 2: Update metadata should create new metadata
local update_response=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"test-owner-123","permissions":3840}')
if echo "$update_response" | grep -q "owner_uuid"; then
log_success "PUT metadata creates metadata successfully"
else
log_error "PUT metadata should create metadata, got: $update_response"
fi
# Test 3: Get metadata should now return the created metadata
local get_response2=$(curl -s -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
if echo "$get_response2" | grep -q "test-owner-123" && echo "$get_response2" | grep -q "3840"; then
log_success "GET metadata returns created metadata"
else
log_error "GET metadata should return created metadata, got: $get_response2"
fi
# Test 4: Update metadata should modify existing metadata
local update_response2=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"new-owner-456"}')
if echo "$update_response2" | grep -q "new-owner-456"; then
log_success "PUT metadata updates existing metadata"
else
log_error "PUT metadata should update metadata, got: $update_response2"
fi
# Test 5: Metadata endpoints should require authentication
local no_auth=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata)
local no_auth_code=$(echo "$no_auth" | tail -n 1)
if [ "$no_auth_code" = "401" ]; then
log_success "Metadata endpoints properly require authentication"
else
log_error "Metadata endpoints should require authentication, got code: $no_auth_code"
fi
kill $pid 2>/dev/null || true
sleep 2
else
log_error "Metadata test node failed to start"
kill $pid 2>/dev/null || true
fi
}
# Main test execution
main() {
echo "=================================================="
@@ -452,6 +551,7 @@ main() {
test_cluster_formation
test_conflict_resolution
test_authentication_middleware
test_metadata_management
# Results
echo "=================================================="

View File

@@ -11,7 +11,6 @@ import (
"kvs/server"
)
func main() {
configPath := "./config.yaml"

View File

@@ -22,8 +22,6 @@ import (
"kvs/utils"
)
// healthHandler returns server health status
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
mode := s.getMode()
@@ -215,6 +213,104 @@ func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
s.logger.WithField("path", path).Info("Value deleted")
}
// getResourceMetadataHandler retrieves metadata for a KV resource
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Get metadata from storage
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// updateResourceMetadataHandler updates metadata for a KV resource
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Parse request body
var req types.UpdateResourceMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest)
return
}
// Get existing metadata or create new one
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
// Create new metadata with defaults
metadata = &types.ResourceMetadata{
OwnerUUID: "",
GroupUUID: "",
Permissions: types.DefaultPermissions,
TTL: "",
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Update only provided fields
if req.OwnerUUID != nil {
metadata.OwnerUUID = *req.OwnerUUID
}
if req.GroupUUID != nil {
metadata.GroupUUID = *req.GroupUUID
}
if req.Permissions != nil {
metadata.Permissions = *req.Permissions
}
metadata.UpdatedAt = time.Now().Unix()
// Store updated metadata
if err := s.authService.SetResourceMetadata(path, metadata); err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"owner_uuid": metadata.OwnerUUID,
"group_uuid": metadata.GroupUUID,
}).Info("Resource metadata updated")
}
// isClusterMember checks if request is from a cluster member
func (s *Server) isClusterMember(remoteAddr string) bool {
host, _, err := net.SplitHostPort(remoteAddr)
@@ -1272,141 +1368,28 @@ func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredVal
return s.revisionService.GetSpecificRevision(key, revision)
}
// getResourceMetadataHandler retrieves metadata for a resource path
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
authCtx := auth.GetAuthContext(r.Context())
if authCtx == nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
// Ensure clustering is enabled
if !s.config.ClusteringEnabled {
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
return
}
// Check read permission on the resource
if !s.authService.CheckResourcePermission(authCtx, path, "read") {
http.Error(w, "Forbidden", http.StatusForbidden)
// Ensure cluster secret is configured
if s.config.ClusterSecret == "" {
s.logger.Error("Cluster secret is not configured")
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
return
}
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
// Return default metadata if not found
defaultMetadata := types.ResourceMetadata{
OwnerUUID: authCtx.UserUUID,
GroupUUID: "",
Permissions: types.DefaultPermissions,
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
metadata = &defaultMetadata
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
// Return the cluster secret for secure bootstrap
response := map[string]string{
"cluster_secret": s.config.ClusterSecret,
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// updateResourceMetadataHandler updates metadata for a resource path
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
authCtx := auth.GetAuthContext(r.Context())
if authCtx == nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Check write permission on the resource (owner write required for metadata changes)
if !s.authService.CheckResourcePermission(authCtx, path, "write") {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
var req types.UpdateResourceMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
// Get current metadata (or default if not exists)
currentMetadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
currentMetadata = &types.ResourceMetadata{
OwnerUUID: authCtx.UserUUID,
GroupUUID: "",
Permissions: types.DefaultPermissions,
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get current resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Apply updates only to provided fields
updated := false
if req.OwnerUUID != "" {
currentMetadata.OwnerUUID = req.OwnerUUID
updated = true
}
if req.GroupUUID != "" {
currentMetadata.GroupUUID = req.GroupUUID
updated = true
}
if req.Permissions != 0 {
currentMetadata.Permissions = req.Permissions
updated = true
}
if req.TTL != "" {
currentMetadata.TTL = req.TTL
updated = true
}
if !updated {
http.Error(w, "No fields provided for update", http.StatusBadRequest)
return
}
// Store updated metadata
if err := s.authService.StoreResourceMetadata(path, currentMetadata); err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to store resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: currentMetadata.OwnerUUID,
GroupUUID: currentMetadata.GroupUUID,
Permissions: currentMetadata.Permissions,
TTL: currentMetadata.TTL,
CreatedAt: currentMetadata.CreatedAt,
UpdatedAt: currentMetadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"user_uuid": authCtx.UserUUID,
"owner_uuid": currentMetadata.OwnerUUID,
"group_uuid": currentMetadata.GroupUUID,
"permissions": currentMetadata.Permissions,
}).Info("Resource metadata updated")
}

View File

@@ -1,6 +1,8 @@
package server
import (
"net/http"
"github.com/gorilla/mux"
)
@@ -11,6 +13,18 @@ func (s *Server) setupRoutes() *mux.Router {
// Health endpoint (always available)
router.HandleFunc("/health", s.healthHandler).Methods("GET")
// Resource Metadata Management endpoints (Issue #12) - Must come BEFORE general KV routes
// These need to be registered first to prevent /kv/{path:.+} from matching metadata paths
if s.config.AuthEnabled {
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:read"}, nil, "",
)(s.getResourceMetadataHandler)).Methods("GET")
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:update"}, nil, "",
)(s.updateResourceMetadataHandler)).Methods("PUT")
}
// KV endpoints (with conditional authentication based on anonymous access settings)
// GET endpoint - require auth if anonymous read is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
@@ -39,22 +53,24 @@ func (s *Server) setupRoutes() *mux.Router {
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
}
// Resource Metadata endpoints (available when auth is enabled)
if s.config.AuthEnabled {
// GET metadata - require read permission
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"read"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "read",
)(s.getResourceMetadataHandler)).Methods("GET")
// PUT metadata - require write permission (owner write)
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"write"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "write",
)(s.updateResourceMetadataHandler)).Methods("PUT")
}
// Member endpoints (available when clustering is enabled)
if s.config.ClusteringEnabled {
// GET /members/ is unprotected for monitoring/inspection
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
// Apply cluster authentication middleware to all cluster communication endpoints
if s.clusterAuthService != nil {
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
} else {
// Fallback to unprotected endpoints (for backwards compatibility)
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
@@ -65,6 +81,7 @@ func (s *Server) setupRoutes() *mux.Router {
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
}
}
// Authentication and user management endpoints (available when auth is enabled)
if s.config.AuthEnabled {
@@ -106,6 +123,12 @@ func (s *Server) setupRoutes() *mux.Router {
router.Handle("/api/tokens", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.createTokenHandler)).Methods("POST")
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
// Allows authenticated administrators to retrieve the cluster secret for new nodes
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.clusterBootstrapHandler)).Methods("GET")
}
// Revision History endpoints (available when revision history is enabled)

View File

@@ -51,6 +51,7 @@ type Server struct {
// Authentication service
authService *auth.AuthService
clusterAuthService *auth.ClusterAuthService
}
// NewServer initializes and returns a new Server instance
@@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) {
// Initialize authentication service
server.authService = auth.NewAuthService(db, logger, config)
// Initialize cluster authentication service (Issue #13)
if config.ClusteringEnabled {
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
}
// Setup initial root account if needed (Issue #3)
if config.AuthEnabled {
if err := server.setupRootAccount(); err != nil {
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
return nil
})
}

View File

@@ -131,14 +131,7 @@ type CreateTokenResponse struct {
ExpiresAt int64 `json:"expires_at"`
}
// Resource Metadata Management API structures
type UpdateResourceMetadataRequest struct {
OwnerUUID string `json:"owner_uuid,omitempty"`
GroupUUID string `json:"group_uuid,omitempty"`
Permissions int `json:"permissions,omitempty"`
TTL string `json:"ttl,omitempty"`
}
// Resource Metadata Management API structures (Issue #12)
type GetResourceMetadataResponse struct {
OwnerUUID string `json:"owner_uuid"`
GroupUUID string `json:"group_uuid"`
@@ -148,6 +141,12 @@ type GetResourceMetadataResponse struct {
UpdatedAt int64 `json:"updated_at"`
}
type UpdateResourceMetadataRequest struct {
OwnerUUID *string `json:"owner_uuid,omitempty"`
GroupUUID *string `json:"group_uuid,omitempty"`
Permissions *int `json:"permissions,omitempty"`
}
// Cluster and member management types
type Member struct {
ID string `json:"id"`
@@ -294,4 +293,11 @@ type Config struct {
// Anonymous access control (Issue #5)
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
// Cluster authentication (Issue #13)
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
}