forked from ryyst/kalzu-value-store
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			2431d3cfb0
			...
			secure-clu
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 852275945c | |||
| c7dcebb894 | 
							
								
								
									
										77
									
								
								auth/cluster.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										77
									
								
								auth/cluster.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,77 @@ | ||||
| package auth | ||||
|  | ||||
| import ( | ||||
| 	"net/http" | ||||
|  | ||||
| 	"github.com/sirupsen/logrus" | ||||
| ) | ||||
|  | ||||
| // ClusterAuthService handles authentication for inter-cluster communication | ||||
| type ClusterAuthService struct { | ||||
| 	clusterSecret string | ||||
| 	logger        *logrus.Logger | ||||
| } | ||||
|  | ||||
| // NewClusterAuthService creates a new cluster authentication service | ||||
| func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService { | ||||
| 	return &ClusterAuthService{ | ||||
| 		clusterSecret: clusterSecret, | ||||
| 		logger:        logger, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Middleware validates cluster authentication headers | ||||
| func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler { | ||||
| 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||
| 		// Extract authentication headers | ||||
| 		clusterSecret := r.Header.Get("X-Cluster-Secret") | ||||
| 		nodeID := r.Header.Get("X-Node-ID") | ||||
|  | ||||
| 		// Log authentication attempt | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"node_id":     nodeID, | ||||
| 			"remote_addr": r.RemoteAddr, | ||||
| 			"path":        r.URL.Path, | ||||
| 			"method":      r.Method, | ||||
| 		}).Debug("Cluster authentication attempt") | ||||
|  | ||||
| 		// Validate cluster secret | ||||
| 		if clusterSecret == "" { | ||||
| 			s.logger.WithFields(logrus.Fields{ | ||||
| 				"node_id":     nodeID, | ||||
| 				"remote_addr": r.RemoteAddr, | ||||
| 				"path":        r.URL.Path, | ||||
| 			}).Warn("Missing X-Cluster-Secret header") | ||||
| 			http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if clusterSecret != s.clusterSecret { | ||||
| 			s.logger.WithFields(logrus.Fields{ | ||||
| 				"node_id":     nodeID, | ||||
| 				"remote_addr": r.RemoteAddr, | ||||
| 				"path":        r.URL.Path, | ||||
| 			}).Warn("Invalid cluster secret") | ||||
| 			http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// Validate node ID is present | ||||
| 		if nodeID == "" { | ||||
| 			s.logger.WithFields(logrus.Fields{ | ||||
| 				"remote_addr": r.RemoteAddr, | ||||
| 				"path":        r.URL.Path, | ||||
| 			}).Warn("Missing X-Node-ID header") | ||||
| 			http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// Authentication successful | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"node_id": nodeID, | ||||
| 			"path":    r.URL.Path, | ||||
| 		}).Debug("Cluster authentication successful") | ||||
|  | ||||
| 		next.ServeHTTP(w, r) | ||||
| 	}) | ||||
| } | ||||
| @@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool { | ||||
| 		return false | ||||
| 	} | ||||
|  | ||||
| 	client := &http.Client{Timeout: 10 * time.Second} | ||||
| 	url := fmt.Sprintf("http://%s/members/join", seedAddr) | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr) | ||||
|  | ||||
| 	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) | ||||
| 	if err != nil { | ||||
| 		s.logger.WithError(err).Error("Failed to create join request") | ||||
| 		return false | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"seed":  seedAddr, | ||||
|   | ||||
| @@ -17,13 +17,13 @@ import ( | ||||
|  | ||||
| // GossipService handles gossip protocol operations | ||||
| type GossipService struct { | ||||
| 	config      *types.Config | ||||
| 	members     map[string]*types.Member | ||||
| 	membersMu   sync.RWMutex | ||||
| 	logger      *logrus.Logger | ||||
| 	ctx         context.Context | ||||
| 	cancel      context.CancelFunc | ||||
| 	wg          sync.WaitGroup | ||||
| 	config    *types.Config | ||||
| 	members   map[string]*types.Member | ||||
| 	membersMu sync.RWMutex | ||||
| 	logger    *logrus.Logger | ||||
| 	ctx       context.Context | ||||
| 	cancel    context.CancelFunc | ||||
| 	wg        sync.WaitGroup | ||||
| } | ||||
|  | ||||
| // NewGossipService creates a new gossip service | ||||
| @@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// Send HTTP request to peer | ||||
| 	client := &http.Client{Timeout: 5 * time.Second} | ||||
| 	url := fmt.Sprintf("http://%s/members/gossip", peer.Address) | ||||
| 	// Send HTTP request to peer with cluster authentication | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 5*time.Second) | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address) | ||||
|  | ||||
| 	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) | ||||
| 	if err != nil { | ||||
| 		s.logger.WithError(err).Error("Failed to create gossip request") | ||||
| 		return err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"peer":  peer.Address, | ||||
|   | ||||
							
								
								
									
										43
									
								
								cluster/http_client.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								cluster/http_client.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,43 @@ | ||||
| package cluster | ||||
|  | ||||
| import ( | ||||
| 	"crypto/tls" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
|  | ||||
| 	"kvs/types" | ||||
| ) | ||||
|  | ||||
| // NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication | ||||
| func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client { | ||||
| 	client := &http.Client{ | ||||
| 		Timeout: timeout, | ||||
| 	} | ||||
|  | ||||
| 	// Configure TLS if enabled | ||||
| 	if config.ClusterTLSEnabled { | ||||
| 		tlsConfig := &tls.Config{ | ||||
| 			InsecureSkipVerify: config.ClusterTLSSkipVerify, | ||||
| 		} | ||||
|  | ||||
| 		client.Transport = &http.Transport{ | ||||
| 			TLSClientConfig: tlsConfig, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return client | ||||
| } | ||||
|  | ||||
| // AddClusterAuthHeaders adds authentication headers to an HTTP request | ||||
| func AddClusterAuthHeaders(req *http.Request, config *types.Config) { | ||||
| 	req.Header.Set("X-Cluster-Secret", config.ClusterSecret) | ||||
| 	req.Header.Set("X-Node-ID", config.NodeID) | ||||
| } | ||||
|  | ||||
| // GetProtocol returns the appropriate protocol (http or https) based on TLS configuration | ||||
| func GetProtocol(config *types.Config) string { | ||||
| 	if config.ClusterTLSEnabled { | ||||
| 		return "https" | ||||
| 	} | ||||
| 	return "http" | ||||
| } | ||||
| @@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() { | ||||
| 	// 2. Compare roots and start recursive diffing if they differ | ||||
| 	if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) { | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"peer":          peer.Address, | ||||
| 			"local_root":    hex.EncodeToString(localRoot.Hash), | ||||
| 			"remote_root":   hex.EncodeToString(remoteRoot.Hash), | ||||
| 			"peer":        peer.Address, | ||||
| 			"local_root":  hex.EncodeToString(localRoot.Hash), | ||||
| 			"remote_root": hex.EncodeToString(remoteRoot.Hash), | ||||
| 		}).Info("Merkle roots differ, starting recursive diff") | ||||
| 		s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot) | ||||
| 	} else { | ||||
| @@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() { | ||||
|  | ||||
| // requestMerkleRoot requests the Merkle root from a peer | ||||
| func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) { | ||||
| 	client := &http.Client{Timeout: 10 * time.Second} | ||||
| 	url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress) | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress) | ||||
|  | ||||
| 	resp, err := client.Get(url) | ||||
| 	req, err := http.NewRequest("GET", url, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re | ||||
| 	// Hashes differ, need to go deeper. | ||||
| 	// Request children from the remote peer for the current range. | ||||
| 	req := types.MerkleTreeDiffRequest{ | ||||
| 		ParentNode: *remoteNode, // We are asking the remote peer about its children for this range | ||||
| 		ParentNode: *remoteNode,    // We are asking the remote peer about its children for this range | ||||
| 		LocalHash:  localNode.Hash, // Our hash for this range | ||||
| 	} | ||||
|  | ||||
| @@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc | ||||
|  | ||||
| // fetchSingleKVFromPeer fetches a single KV pair from a peer | ||||
| func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) { | ||||
| 	client := &http.Client{Timeout: 5 * time.Second} | ||||
| 	url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 5*time.Second) | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path) | ||||
|  | ||||
| 	resp, err := client.Get(url) | ||||
| 	req, err := http.NewRequest("GET", url, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -418,12 +432,12 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal | ||||
| 	// If we can't find membership info, fall back to UUID comparison for deterministic result | ||||
| 	if localMember == nil || remoteMember == nil { | ||||
| 		s.logger.WithFields(logrus.Fields{ | ||||
| 			"key":             key, | ||||
| 			"peerAddress":     peerAddress, | ||||
| 			"localNodeID":     localNodeID, | ||||
| 			"localMember":     localMember != nil, | ||||
| 			"remoteMember":    remoteMember != nil, | ||||
| 			"totalMembers":    len(members), | ||||
| 			"key":          key, | ||||
| 			"peerAddress":  peerAddress, | ||||
| 			"localNodeID":  localNodeID, | ||||
| 			"localMember":  localMember != nil, | ||||
| 			"remoteMember": remoteMember != nil, | ||||
| 			"totalMembers": len(members), | ||||
| 		}).Warn("Could not find membership info for conflict resolution, using UUID comparison") | ||||
| 		if remote.UUID < local.UUID { | ||||
| 			// Remote UUID lexically smaller (deterministic choice) | ||||
| @@ -443,9 +457,9 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal | ||||
| 		err := s.storeReplicatedDataWithMetadata(key, remote) | ||||
| 		if err == nil { | ||||
| 			s.logger.WithFields(logrus.Fields{ | ||||
| 				"key":                key, | ||||
| 				"local_joined":       localMember.JoinedTimestamp, | ||||
| 				"remote_joined":      remoteMember.JoinedTimestamp, | ||||
| 				"key":           key, | ||||
| 				"local_joined":  localMember.JoinedTimestamp, | ||||
| 				"remote_joined": remoteMember.JoinedTimestamp, | ||||
| 			}).Info("Conflict resolved: remote data wins (oldest-node rule)") | ||||
| 		} | ||||
| 		return err | ||||
| @@ -453,24 +467,32 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal | ||||
|  | ||||
| 	// Local node is older or equal, keep local data | ||||
| 	s.logger.WithFields(logrus.Fields{ | ||||
| 		"key":                key, | ||||
| 		"local_joined":       localMember.JoinedTimestamp, | ||||
| 		"remote_joined":      remoteMember.JoinedTimestamp, | ||||
| 		"key":           key, | ||||
| 		"local_joined":  localMember.JoinedTimestamp, | ||||
| 		"remote_joined": remoteMember.JoinedTimestamp, | ||||
| 	}).Info("Conflict resolved: local data wins (oldest-node rule)") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // requestMerkleDiff requests children hashes or keys for a given node/range from a peer | ||||
| func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { | ||||
| 	jsonData, err := json.Marshal(req) | ||||
| func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { | ||||
| 	jsonData, err := json.Marshal(reqData) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	client := &http.Client{Timeout: 10 * time.Second} | ||||
| 	url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress) | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress) | ||||
|  | ||||
| 	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me | ||||
|  | ||||
| // fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally | ||||
| func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { | ||||
| 	req := types.KVRangeRequest{ | ||||
| 	reqData := types.KVRangeRequest{ | ||||
| 		StartKey: startKey, | ||||
| 		EndKey:   endKey, | ||||
| 		Limit:    0, // No limit | ||||
| 	} | ||||
| 	jsonData, err := json.Marshal(req) | ||||
| 	jsonData, err := json.Marshal(reqData) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches | ||||
| 	url := fmt.Sprintf("http://%s/kv_range", peerAddress) | ||||
| 	client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches | ||||
| 	protocol := GetProtocol(s.config) | ||||
| 	url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress) | ||||
|  | ||||
| 	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) | ||||
| 	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	req.Header.Set("Content-Type", "application/json") | ||||
| 	AddClusterAuthHeaders(req, s.config) | ||||
|  | ||||
| 	resp, err := client.Do(req) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
| @@ -1,12 +1,14 @@ | ||||
| package config | ||||
|  | ||||
| import ( | ||||
| 	"crypto/rand" | ||||
| 	"encoding/base64" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
|  | ||||
| 	"kvs/types" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| 	"kvs/types" | ||||
| ) | ||||
|  | ||||
| // Default configuration | ||||
| @@ -33,8 +35,8 @@ func Default() *types.Config { | ||||
| 		CompressionLevel:   3, // Balance between performance and compression ratio | ||||
|  | ||||
| 		// Default TTL and size limit settings | ||||
| 		DefaultTTL:  "0",       // No default TTL | ||||
| 		MaxJSONSize: 1048576,   // 1MB default max JSON size | ||||
| 		DefaultTTL:  "0",     // No default TTL | ||||
| 		MaxJSONSize: 1048576, // 1MB default max JSON size | ||||
|  | ||||
| 		// Default rate limiting settings | ||||
| 		RateLimitRequests: 100,  // 100 requests per window | ||||
| @@ -57,11 +59,31 @@ func Default() *types.Config { | ||||
| 		RevisionHistoryEnabled: true, | ||||
|  | ||||
| 		// Default anonymous access settings (both disabled by default for security) | ||||
| 		AllowAnonymousRead:     false, | ||||
| 		AllowAnonymousWrite:    false, | ||||
| 		AllowAnonymousRead:  false, | ||||
| 		AllowAnonymousWrite: false, | ||||
|  | ||||
| 		// Default cluster authentication settings (Issue #13) | ||||
| 		ClusterSecret:        generateClusterSecret(), | ||||
| 		ClusterTLSEnabled:    false, | ||||
| 		ClusterTLSCertFile:   "", | ||||
| 		ClusterTLSKeyFile:    "", | ||||
| 		ClusterTLSSkipVerify: false, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // generateClusterSecret generates a cryptographically secure random cluster secret | ||||
| func generateClusterSecret() string { | ||||
| 	// Generate 32 bytes (256 bits) of random data | ||||
| 	randomBytes := make([]byte, 32) | ||||
| 	if _, err := rand.Read(randomBytes); err != nil { | ||||
| 		// Fallback to a warning - this should never happen in practice | ||||
| 		fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err) | ||||
| 		return "" | ||||
| 	} | ||||
| 	// Encode as base64 for easy configuration file storage | ||||
| 	return base64.StdEncoding.EncodeToString(randomBytes) | ||||
| } | ||||
|  | ||||
| // Load configuration from file or create default | ||||
| func Load(configPath string) (*types.Config, error) { | ||||
| 	config := Default() | ||||
| @@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) { | ||||
| 		return nil, fmt.Errorf("failed to parse config file: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// Generate cluster secret if not provided and clustering is enabled (Issue #13) | ||||
| 	if config.ClusteringEnabled && config.ClusterSecret == "" { | ||||
| 		config.ClusterSecret = generateClusterSecret() | ||||
| 		fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n") | ||||
| 		fmt.Printf("         To share this secret with other nodes, add it to your config:\n") | ||||
| 		fmt.Printf("         cluster_secret: %s\n", config.ClusterSecret) | ||||
| 	} | ||||
|  | ||||
| 	return config, nil | ||||
| } | ||||
| @@ -125,6 +125,9 @@ EOF | ||||
| test_cluster_formation() { | ||||
|     test_start "2-node cluster formation and Merkle Tree replication" | ||||
|  | ||||
|     # Shared cluster secret for authentication (Issue #13) | ||||
|     local CLUSTER_SECRET="test-cluster-secret-12345678901234567890" | ||||
|  | ||||
|     # Node 1 config | ||||
|     cat > cluster1.yaml <<EOF | ||||
| node_id: "cluster-1" | ||||
| @@ -138,6 +141,7 @@ gossip_interval_max: 10 | ||||
| sync_interval: 10 | ||||
| allow_anonymous_read: true | ||||
| allow_anonymous_write: true | ||||
| cluster_secret: "$CLUSTER_SECRET" | ||||
| EOF | ||||
|  | ||||
|     # Node 2 config | ||||
| @@ -153,6 +157,7 @@ gossip_interval_max: 10 | ||||
| sync_interval: 10 | ||||
| allow_anonymous_read: true | ||||
| allow_anonymous_write: true | ||||
| cluster_secret: "$CLUSTER_SECRET" | ||||
| EOF | ||||
|      | ||||
|     # Start nodes | ||||
| @@ -239,6 +244,9 @@ test_conflict_resolution() { | ||||
|     if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then | ||||
|         cd "$TEST_DIR" | ||||
|  | ||||
|         # Shared cluster secret for authentication (Issue #13) | ||||
|         local CLUSTER_SECRET="conflict-cluster-secret-1234567890123" | ||||
|  | ||||
|         # Create configs | ||||
|         cat > conflict1.yaml <<EOF | ||||
| node_id: "conflict-1" | ||||
| @@ -250,6 +258,7 @@ log_level: "info" | ||||
| sync_interval: 3 | ||||
| allow_anonymous_read: true | ||||
| allow_anonymous_write: true | ||||
| cluster_secret: "$CLUSTER_SECRET" | ||||
| EOF | ||||
|  | ||||
|         cat > conflict2.yaml <<EOF | ||||
| @@ -262,6 +271,7 @@ log_level: "info" | ||||
| sync_interval: 3 | ||||
| allow_anonymous_read: true | ||||
| allow_anonymous_write: true | ||||
| cluster_secret: "$CLUSTER_SECRET" | ||||
| EOF | ||||
|          | ||||
|         # Start nodes | ||||
|   | ||||
							
								
								
									
										1
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								main.go
									
									
									
									
									
								
							| @@ -11,7 +11,6 @@ import ( | ||||
| 	"kvs/server" | ||||
| ) | ||||
|  | ||||
|  | ||||
| func main() { | ||||
| 	configPath := "./config.yaml" | ||||
|  | ||||
|   | ||||
| @@ -22,8 +22,6 @@ import ( | ||||
| 	"kvs/utils" | ||||
| ) | ||||
|  | ||||
|  | ||||
|  | ||||
| // healthHandler returns server health status | ||||
| func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	mode := s.getMode() | ||||
| @@ -1271,3 +1269,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error | ||||
| func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) { | ||||
| 	return s.revisionService.GetSpecificRevision(key, revision) | ||||
| } | ||||
|  | ||||
| // clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13) | ||||
| func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	// Ensure clustering is enabled | ||||
| 	if !s.config.ClusteringEnabled { | ||||
| 		http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Ensure cluster secret is configured | ||||
| 	if s.config.ClusterSecret == "" { | ||||
| 		s.logger.Error("Cluster secret is not configured") | ||||
| 		http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Return the cluster secret for secure bootstrap | ||||
| 	response := map[string]string{ | ||||
| 		"cluster_secret": s.config.ClusterSecret, | ||||
| 	} | ||||
|  | ||||
| 	s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap") | ||||
|  | ||||
| 	w.Header().Set("Content-Type", "application/json") | ||||
| 	json.NewEncoder(w).Encode(response) | ||||
| } | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| package server | ||||
|  | ||||
| import ( | ||||
| 	"net/http" | ||||
|  | ||||
| 	"github.com/gorilla/mux" | ||||
| ) | ||||
|  | ||||
| @@ -41,16 +43,32 @@ func (s *Server) setupRoutes() *mux.Router { | ||||
|  | ||||
| 	// Member endpoints (available when clustering is enabled) | ||||
| 	if s.config.ClusteringEnabled { | ||||
| 		// GET /members/ is unprotected for monitoring/inspection | ||||
| 		router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") | ||||
| 		router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") | ||||
| 		router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") | ||||
| 		router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") | ||||
| 		router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") | ||||
|  | ||||
| 		// Merkle Tree endpoints (clustering feature) | ||||
| 		router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") | ||||
| 		router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") | ||||
| 		router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") | ||||
| 		// Apply cluster authentication middleware to all cluster communication endpoints | ||||
| 		if s.clusterAuthService != nil { | ||||
| 			router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST") | ||||
| 			router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE") | ||||
| 			router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST") | ||||
| 			router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST") | ||||
|  | ||||
| 			// Merkle Tree endpoints (clustering feature) | ||||
| 			router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET") | ||||
| 			router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST") | ||||
| 			router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST") | ||||
| 		} else { | ||||
| 			// Fallback to unprotected endpoints (for backwards compatibility) | ||||
| 			router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") | ||||
| 			router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") | ||||
| 			router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") | ||||
| 			router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") | ||||
|  | ||||
| 			// Merkle Tree endpoints (clustering feature) | ||||
| 			router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") | ||||
| 			router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") | ||||
| 			router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Authentication and user management endpoints (available when auth is enabled) | ||||
| @@ -93,6 +111,12 @@ func (s *Server) setupRoutes() *mux.Router { | ||||
| 		router.Handle("/api/tokens", s.authService.Middleware( | ||||
| 			[]string{"admin:tokens:create"}, nil, "", | ||||
| 		)(s.createTokenHandler)).Methods("POST") | ||||
|  | ||||
| 		// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication | ||||
| 		// Allows authenticated administrators to retrieve the cluster secret for new nodes | ||||
| 		router.Handle("/auth/cluster-bootstrap", s.authService.Middleware( | ||||
| 			[]string{"admin:tokens:create"}, nil, "", | ||||
| 		)(s.clusterBootstrapHandler)).Methods("GET") | ||||
| 	} | ||||
|  | ||||
| 	// Revision History endpoints (available when revision history is enabled) | ||||
|   | ||||
| @@ -50,7 +50,8 @@ type Server struct { | ||||
| 	backupMu      sync.RWMutex       // Protects backup status | ||||
|  | ||||
| 	// Authentication service | ||||
| 	authService *auth.AuthService | ||||
| 	authService        *auth.AuthService | ||||
| 	clusterAuthService *auth.ClusterAuthService | ||||
| } | ||||
|  | ||||
| // NewServer initializes and returns a new Server instance | ||||
| @@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) { | ||||
| 	// Initialize authentication service | ||||
| 	server.authService = auth.NewAuthService(db, logger, config) | ||||
|  | ||||
| 	// Initialize cluster authentication service (Issue #13) | ||||
| 	if config.ClusteringEnabled { | ||||
| 		server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger) | ||||
| 	} | ||||
|  | ||||
| 	// Setup initial root account if needed (Issue #3) | ||||
| 	if config.AuthEnabled { | ||||
| 		if err := server.setupRootAccount(); err != nil { | ||||
| @@ -269,10 +275,10 @@ func (s *Server) createRootUserAndToken() error { | ||||
|  | ||||
| 	// Log the token securely (one-time display) | ||||
| 	s.logger.WithFields(logrus.Fields{ | ||||
| 		"user_uuid":    rootUserUUID, | ||||
| 		"group_uuid":   adminGroupUUID, | ||||
| 		"expires_at":   time.Unix(expiresAt, 0).Format(time.RFC3339), | ||||
| 		"expires_in":   "24 hours", | ||||
| 		"user_uuid":  rootUserUUID, | ||||
| 		"group_uuid": adminGroupUUID, | ||||
| 		"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339), | ||||
| 		"expires_in": "24 hours", | ||||
| 	}).Warn("Root account created - SAVE THIS TOKEN:") | ||||
|  | ||||
| 	// Display token prominently | ||||
| @@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error { | ||||
| 		return nil | ||||
| 	}) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -12,10 +12,10 @@ import ( | ||||
|  | ||||
| // StorageService handles all BadgerDB operations and data management | ||||
| type StorageService struct { | ||||
| 	db               *badger.DB | ||||
| 	config           *types.Config | ||||
| 	compressionSvc   *CompressionService | ||||
| 	logger           *logrus.Logger | ||||
| 	db             *badger.DB | ||||
| 	config         *types.Config | ||||
| 	compressionSvc *CompressionService | ||||
| 	logger         *logrus.Logger | ||||
| } | ||||
|  | ||||
| // NewStorageService creates a new storage service | ||||
|   | ||||
| @@ -13,20 +13,20 @@ type StoredValue struct { | ||||
|  | ||||
| // User represents a system user | ||||
| type User struct { | ||||
| 	UUID         string   `json:"uuid"`         // Server-generated UUID | ||||
| 	UUID         string   `json:"uuid"`          // Server-generated UUID | ||||
| 	NicknameHash string   `json:"nickname_hash"` // SHA3-512 hash of nickname | ||||
| 	Groups       []string `json:"groups"`       // List of group UUIDs this user belongs to | ||||
| 	CreatedAt    int64    `json:"created_at"`   // Unix timestamp | ||||
| 	UpdatedAt    int64    `json:"updated_at"`   // Unix timestamp | ||||
| 	Groups       []string `json:"groups"`        // List of group UUIDs this user belongs to | ||||
| 	CreatedAt    int64    `json:"created_at"`    // Unix timestamp | ||||
| 	UpdatedAt    int64    `json:"updated_at"`    // Unix timestamp | ||||
| } | ||||
|  | ||||
| // Group represents a user group | ||||
| type Group struct { | ||||
| 	UUID        string   `json:"uuid"`        // Server-generated UUID | ||||
| 	NameHash    string   `json:"name_hash"`   // SHA3-512 hash of group name | ||||
| 	Members     []string `json:"members"`     // List of user UUIDs in this group | ||||
| 	CreatedAt   int64    `json:"created_at"`  // Unix timestamp | ||||
| 	UpdatedAt   int64    `json:"updated_at"`  // Unix timestamp | ||||
| 	UUID      string   `json:"uuid"`       // Server-generated UUID | ||||
| 	NameHash  string   `json:"name_hash"`  // SHA3-512 hash of group name | ||||
| 	Members   []string `json:"members"`    // List of user UUIDs in this group | ||||
| 	CreatedAt int64    `json:"created_at"` // Unix timestamp | ||||
| 	UpdatedAt int64    `json:"updated_at"` // Unix timestamp | ||||
| } | ||||
|  | ||||
| // APIToken represents a JWT authentication token | ||||
| @@ -40,12 +40,12 @@ type APIToken struct { | ||||
|  | ||||
| // ResourceMetadata contains ownership and permission information for stored resources | ||||
| type ResourceMetadata struct { | ||||
| 	OwnerUUID   string `json:"owner_uuid"`   // UUID of the resource owner | ||||
| 	GroupUUID   string `json:"group_uuid"`   // UUID of the resource group | ||||
| 	Permissions int    `json:"permissions"`  // 12-bit permission mask (POSIX-inspired) | ||||
| 	TTL         string `json:"ttl"`          // Time-to-live duration (Go format) | ||||
| 	CreatedAt   int64  `json:"created_at"`   // Unix timestamp when resource was created | ||||
| 	UpdatedAt   int64  `json:"updated_at"`   // Unix timestamp when resource was last updated | ||||
| 	OwnerUUID   string `json:"owner_uuid"`  // UUID of the resource owner | ||||
| 	GroupUUID   string `json:"group_uuid"`  // UUID of the resource group | ||||
| 	Permissions int    `json:"permissions"` // 12-bit permission mask (POSIX-inspired) | ||||
| 	TTL         string `json:"ttl"`         // Time-to-live duration (Go format) | ||||
| 	CreatedAt   int64  `json:"created_at"`  // Unix timestamp when resource was created | ||||
| 	UpdatedAt   int64  `json:"updated_at"`  // Unix timestamp when resource was last updated | ||||
| } | ||||
|  | ||||
| // Permission constants for POSIX-inspired ACL | ||||
| @@ -231,28 +231,28 @@ type KVRangeResponse struct { | ||||
|  | ||||
| // Configuration | ||||
| type Config struct { | ||||
| 	NodeID             string   `yaml:"node_id"` | ||||
| 	BindAddress        string   `yaml:"bind_address"` | ||||
| 	Port               int      `yaml:"port"` | ||||
| 	DataDir            string   `yaml:"data_dir"` | ||||
| 	SeedNodes          []string `yaml:"seed_nodes"` | ||||
| 	ReadOnly           bool     `yaml:"read_only"` | ||||
| 	LogLevel           string   `yaml:"log_level"` | ||||
| 	GossipIntervalMin  int      `yaml:"gossip_interval_min"` | ||||
| 	GossipIntervalMax  int      `yaml:"gossip_interval_max"` | ||||
| 	SyncInterval       int      `yaml:"sync_interval"` | ||||
| 	CatchupInterval    int      `yaml:"catchup_interval"` | ||||
| 	BootstrapMaxAgeHours int    `yaml:"bootstrap_max_age_hours"` | ||||
| 	ThrottleDelayMs    int      `yaml:"throttle_delay_ms"` | ||||
| 	FetchDelayMs       int      `yaml:"fetch_delay_ms"` | ||||
| 	NodeID               string   `yaml:"node_id"` | ||||
| 	BindAddress          string   `yaml:"bind_address"` | ||||
| 	Port                 int      `yaml:"port"` | ||||
| 	DataDir              string   `yaml:"data_dir"` | ||||
| 	SeedNodes            []string `yaml:"seed_nodes"` | ||||
| 	ReadOnly             bool     `yaml:"read_only"` | ||||
| 	LogLevel             string   `yaml:"log_level"` | ||||
| 	GossipIntervalMin    int      `yaml:"gossip_interval_min"` | ||||
| 	GossipIntervalMax    int      `yaml:"gossip_interval_max"` | ||||
| 	SyncInterval         int      `yaml:"sync_interval"` | ||||
| 	CatchupInterval      int      `yaml:"catchup_interval"` | ||||
| 	BootstrapMaxAgeHours int      `yaml:"bootstrap_max_age_hours"` | ||||
| 	ThrottleDelayMs      int      `yaml:"throttle_delay_ms"` | ||||
| 	FetchDelayMs         int      `yaml:"fetch_delay_ms"` | ||||
|  | ||||
| 	// Database compression configuration | ||||
| 	CompressionEnabled bool `yaml:"compression_enabled"` | ||||
| 	CompressionLevel   int  `yaml:"compression_level"` | ||||
|  | ||||
| 	// TTL configuration | ||||
| 	DefaultTTL    string `yaml:"default_ttl"`      // Go duration format, "0" means no default TTL | ||||
| 	MaxJSONSize   int    `yaml:"max_json_size"`    // Maximum JSON size in bytes | ||||
| 	DefaultTTL  string `yaml:"default_ttl"`   // Go duration format, "0" means no default TTL | ||||
| 	MaxJSONSize int    `yaml:"max_json_size"` // Maximum JSON size in bytes | ||||
|  | ||||
| 	// Rate limiting configuration | ||||
| 	RateLimitRequests int    `yaml:"rate_limit_requests"` // Max requests per window | ||||
| @@ -262,10 +262,10 @@ type Config struct { | ||||
| 	TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log | ||||
|  | ||||
| 	// Backup system configuration | ||||
| 	BackupEnabled   bool   `yaml:"backup_enabled"`    // Enable/disable automated backups | ||||
| 	BackupSchedule  string `yaml:"backup_schedule"`   // Cron schedule format | ||||
| 	BackupPath      string `yaml:"backup_path"`       // Directory to store backups | ||||
| 	BackupRetention int    `yaml:"backup_retention"`  // Days to keep backups | ||||
| 	BackupEnabled   bool   `yaml:"backup_enabled"`   // Enable/disable automated backups | ||||
| 	BackupSchedule  string `yaml:"backup_schedule"`  // Cron schedule format | ||||
| 	BackupPath      string `yaml:"backup_path"`      // Directory to store backups | ||||
| 	BackupRetention int    `yaml:"backup_retention"` // Days to keep backups | ||||
|  | ||||
| 	// Feature toggles for optional functionalities | ||||
| 	AuthEnabled            bool `yaml:"auth_enabled"`             // Enable/disable authentication system | ||||
| @@ -275,6 +275,13 @@ type Config struct { | ||||
| 	RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history | ||||
|  | ||||
| 	// Anonymous access control (Issue #5) | ||||
| 	AllowAnonymousRead     bool `yaml:"allow_anonymous_read"`     // Allow unauthenticated read access to KV endpoints | ||||
| 	AllowAnonymousWrite    bool `yaml:"allow_anonymous_write"`    // Allow unauthenticated write access to KV endpoints | ||||
| 	AllowAnonymousRead  bool `yaml:"allow_anonymous_read"`  // Allow unauthenticated read access to KV endpoints | ||||
| 	AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints | ||||
|  | ||||
| 	// Cluster authentication (Issue #13) | ||||
| 	ClusterSecret        string `yaml:"cluster_secret"`          // Shared secret for cluster authentication (auto-generated if empty) | ||||
| 	ClusterTLSEnabled    bool   `yaml:"cluster_tls_enabled"`     // Require TLS for inter-node communication | ||||
| 	ClusterTLSCertFile   string `yaml:"cluster_tls_cert_file"`   // Path to TLS certificate file | ||||
| 	ClusterTLSKeyFile    string `yaml:"cluster_tls_key_file"`    // Path to TLS private key file | ||||
| 	ClusterTLSSkipVerify bool   `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user