secure-cluster-communication #14
77
auth/cluster.go
Normal file
77
auth/cluster.go
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClusterAuthService handles authentication for inter-cluster communication
|
||||||
|
type ClusterAuthService struct {
|
||||||
|
clusterSecret string
|
||||||
|
logger *logrus.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClusterAuthService creates a new cluster authentication service
|
||||||
|
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
|
||||||
|
return &ClusterAuthService{
|
||||||
|
clusterSecret: clusterSecret,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Middleware validates cluster authentication headers
|
||||||
|
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Extract authentication headers
|
||||||
|
clusterSecret := r.Header.Get("X-Cluster-Secret")
|
||||||
|
nodeID := r.Header.Get("X-Node-ID")
|
||||||
|
|
||||||
|
// Log authentication attempt
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
"method": r.Method,
|
||||||
|
}).Debug("Cluster authentication attempt")
|
||||||
|
|
||||||
|
// Validate cluster secret
|
||||||
|
if clusterSecret == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Cluster-Secret header")
|
||||||
|
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterSecret != s.clusterSecret {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Invalid cluster secret")
|
||||||
|
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate node ID is present
|
||||||
|
if nodeID == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Node-ID header")
|
||||||
|
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Authentication successful
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Debug("Cluster authentication successful")
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
@@ -17,13 +17,13 @@ import (
|
|||||||
|
|
||||||
// GossipService handles gossip protocol operations
|
// GossipService handles gossip protocol operations
|
||||||
type GossipService struct {
|
type GossipService struct {
|
||||||
config *types.Config
|
config *types.Config
|
||||||
members map[string]*types.Member
|
members map[string]*types.Member
|
||||||
membersMu sync.RWMutex
|
membersMu sync.RWMutex
|
||||||
logger *logrus.Logger
|
logger *logrus.Logger
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGossipService creates a new gossip service
|
// NewGossipService creates a new gossip service
|
||||||
@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send HTTP request to peer
|
// Send HTTP request to peer with cluster authentication
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create gossip request")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"peer": peer.Address,
|
"peer": peer.Address,
|
||||||
|
43
cluster/http_client.go
Normal file
43
cluster/http_client.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"kvs/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
|
||||||
|
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure TLS if enabled
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: config.ClusterTLSSkipVerify,
|
||||||
|
}
|
||||||
|
|
||||||
|
client.Transport = &http.Transport{
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddClusterAuthHeaders adds authentication headers to an HTTP request
|
||||||
|
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
|
||||||
|
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
|
||||||
|
req.Header.Set("X-Node-ID", config.NodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
|
||||||
|
func GetProtocol(config *types.Config) string {
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
return "https"
|
||||||
|
}
|
||||||
|
return "http"
|
||||||
|
}
|
@@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() {
|
|||||||
// 2. Compare roots and start recursive diffing if they differ
|
// 2. Compare roots and start recursive diffing if they differ
|
||||||
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"peer": peer.Address,
|
"peer": peer.Address,
|
||||||
"local_root": hex.EncodeToString(localRoot.Hash),
|
"local_root": hex.EncodeToString(localRoot.Hash),
|
||||||
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
||||||
}).Info("Merkle roots differ, starting recursive diff")
|
}).Info("Merkle roots differ, starting recursive diff")
|
||||||
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
||||||
} else {
|
} else {
|
||||||
@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
|
|||||||
|
|
||||||
// requestMerkleRoot requests the Merkle root from a peer
|
// requestMerkleRoot requests the Merkle root from a peer
|
||||||
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re
|
|||||||
// Hashes differ, need to go deeper.
|
// Hashes differ, need to go deeper.
|
||||||
// Request children from the remote peer for the current range.
|
// Request children from the remote peer for the current range.
|
||||||
req := types.MerkleTreeDiffRequest{
|
req := types.MerkleTreeDiffRequest{
|
||||||
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
||||||
LocalHash: localNode.Hash, // Our hash for this range
|
LocalHash: localNode.Hash, // Our hash for this range
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
|
|||||||
|
|
||||||
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -418,12 +432,12 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
||||||
if localMember == nil || remoteMember == nil {
|
if localMember == nil || remoteMember == nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"peerAddress": peerAddress,
|
"peerAddress": peerAddress,
|
||||||
"localNodeID": localNodeID,
|
"localNodeID": localNodeID,
|
||||||
"localMember": localMember != nil,
|
"localMember": localMember != nil,
|
||||||
"remoteMember": remoteMember != nil,
|
"remoteMember": remoteMember != nil,
|
||||||
"totalMembers": len(members),
|
"totalMembers": len(members),
|
||||||
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
||||||
if remote.UUID < local.UUID {
|
if remote.UUID < local.UUID {
|
||||||
// Remote UUID lexically smaller (deterministic choice)
|
// Remote UUID lexically smaller (deterministic choice)
|
||||||
@@ -443,9 +457,9 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"local_joined": localMember.JoinedTimestamp,
|
"local_joined": localMember.JoinedTimestamp,
|
||||||
"remote_joined": remoteMember.JoinedTimestamp,
|
"remote_joined": remoteMember.JoinedTimestamp,
|
||||||
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@@ -453,24 +467,32 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
|
|
||||||
// Local node is older or equal, keep local data
|
// Local node is older or equal, keep local data
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"local_joined": localMember.JoinedTimestamp,
|
"local_joined": localMember.JoinedTimestamp,
|
||||||
"remote_joined": remoteMember.JoinedTimestamp,
|
"remote_joined": remoteMember.JoinedTimestamp,
|
||||||
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||||
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
|
|||||||
|
|
||||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||||
req := types.KVRangeRequest{
|
reqData := types.KVRangeRequest{
|
||||||
StartKey: startKey,
|
StartKey: startKey,
|
||||||
EndKey: endKey,
|
EndKey: endKey,
|
||||||
Limit: 0, // No limit
|
Limit: 0, // No limit
|
||||||
}
|
}
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
|
||||||
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -1,12 +1,14 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"kvs/types"
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
"kvs/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration
|
// Default configuration
|
||||||
@@ -33,8 +35,8 @@ func Default() *types.Config {
|
|||||||
CompressionLevel: 3, // Balance between performance and compression ratio
|
CompressionLevel: 3, // Balance between performance and compression ratio
|
||||||
|
|
||||||
// Default TTL and size limit settings
|
// Default TTL and size limit settings
|
||||||
DefaultTTL: "0", // No default TTL
|
DefaultTTL: "0", // No default TTL
|
||||||
MaxJSONSize: 1048576, // 1MB default max JSON size
|
MaxJSONSize: 1048576, // 1MB default max JSON size
|
||||||
|
|
||||||
// Default rate limiting settings
|
// Default rate limiting settings
|
||||||
RateLimitRequests: 100, // 100 requests per window
|
RateLimitRequests: 100, // 100 requests per window
|
||||||
@@ -57,11 +59,31 @@ func Default() *types.Config {
|
|||||||
RevisionHistoryEnabled: true,
|
RevisionHistoryEnabled: true,
|
||||||
|
|
||||||
// Default anonymous access settings (both disabled by default for security)
|
// Default anonymous access settings (both disabled by default for security)
|
||||||
AllowAnonymousRead: false,
|
AllowAnonymousRead: false,
|
||||||
AllowAnonymousWrite: false,
|
AllowAnonymousWrite: false,
|
||||||
|
|
||||||
|
// Default cluster authentication settings (Issue #13)
|
||||||
|
ClusterSecret: generateClusterSecret(),
|
||||||
|
ClusterTLSEnabled: false,
|
||||||
|
ClusterTLSCertFile: "",
|
||||||
|
ClusterTLSKeyFile: "",
|
||||||
|
ClusterTLSSkipVerify: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateClusterSecret generates a cryptographically secure random cluster secret
|
||||||
|
func generateClusterSecret() string {
|
||||||
|
// Generate 32 bytes (256 bits) of random data
|
||||||
|
randomBytes := make([]byte, 32)
|
||||||
|
if _, err := rand.Read(randomBytes); err != nil {
|
||||||
|
// Fallback to a warning - this should never happen in practice
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// Encode as base64 for easy configuration file storage
|
||||||
|
return base64.StdEncoding.EncodeToString(randomBytes)
|
||||||
|
}
|
||||||
|
|
||||||
// Load configuration from file or create default
|
// Load configuration from file or create default
|
||||||
func Load(configPath string) (*types.Config, error) {
|
func Load(configPath string) (*types.Config, error) {
|
||||||
config := Default()
|
config := Default()
|
||||||
@@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
|
|||||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
|
||||||
|
if config.ClusteringEnabled && config.ClusterSecret == "" {
|
||||||
|
config.ClusterSecret = generateClusterSecret()
|
||||||
|
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
|
||||||
|
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
|
||||||
|
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
|
||||||
|
}
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
1
main.go
1
main.go
@@ -11,7 +11,6 @@ import (
|
|||||||
"kvs/server"
|
"kvs/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configPath := "./config.yaml"
|
configPath := "./config.yaml"
|
||||||
|
|
||||||
|
@@ -22,8 +22,6 @@ import (
|
|||||||
"kvs/utils"
|
"kvs/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// healthHandler returns server health status
|
// healthHandler returns server health status
|
||||||
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
mode := s.getMode()
|
mode := s.getMode()
|
||||||
@@ -1271,3 +1269,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error
|
|||||||
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
|
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
|
||||||
return s.revisionService.GetSpecificRevision(key, revision)
|
return s.revisionService.GetSpecificRevision(key, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
|
||||||
|
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Ensure clustering is enabled
|
||||||
|
if !s.config.ClusteringEnabled {
|
||||||
|
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure cluster secret is configured
|
||||||
|
if s.config.ClusterSecret == "" {
|
||||||
|
s.logger.Error("Cluster secret is not configured")
|
||||||
|
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the cluster secret for secure bootstrap
|
||||||
|
response := map[string]string{
|
||||||
|
"cluster_secret": s.config.ClusterSecret,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
}
|
||||||
|
@@ -1,6 +1,8 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -41,16 +43,31 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
|
|
||||||
// Member endpoints (available when clustering is enabled)
|
// Member endpoints (available when clustering is enabled)
|
||||||
if s.config.ClusteringEnabled {
|
if s.config.ClusteringEnabled {
|
||||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
// Apply cluster authentication middleware if cluster secret is configured
|
||||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
if s.clusterAuthService != nil {
|
||||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
router.Handle("/members/", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMembersHandler))).Methods("GET")
|
||||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
||||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
||||||
|
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
||||||
|
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
|
||||||
|
|
||||||
// Merkle Tree endpoints (clustering feature)
|
// Merkle Tree endpoints (clustering feature)
|
||||||
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
|
||||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
|
||||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
||||||
|
} else {
|
||||||
|
// Fallback to unprotected endpoints (for backwards compatibility)
|
||||||
|
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||||
|
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||||
|
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
||||||
|
|
||||||
|
// Merkle Tree endpoints (clustering feature)
|
||||||
|
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
||||||
|
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Authentication and user management endpoints (available when auth is enabled)
|
// Authentication and user management endpoints (available when auth is enabled)
|
||||||
@@ -93,6 +110,12 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/api/tokens", s.authService.Middleware(
|
router.Handle("/api/tokens", s.authService.Middleware(
|
||||||
[]string{"admin:tokens:create"}, nil, "",
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
)(s.createTokenHandler)).Methods("POST")
|
)(s.createTokenHandler)).Methods("POST")
|
||||||
|
|
||||||
|
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
|
||||||
|
// Allows authenticated administrators to retrieve the cluster secret for new nodes
|
||||||
|
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
|
||||||
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
|
)(s.clusterBootstrapHandler)).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Revision History endpoints (available when revision history is enabled)
|
// Revision History endpoints (available when revision history is enabled)
|
||||||
|
@@ -50,7 +50,8 @@ type Server struct {
|
|||||||
backupMu sync.RWMutex // Protects backup status
|
backupMu sync.RWMutex // Protects backup status
|
||||||
|
|
||||||
// Authentication service
|
// Authentication service
|
||||||
authService *auth.AuthService
|
authService *auth.AuthService
|
||||||
|
clusterAuthService *auth.ClusterAuthService
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer initializes and returns a new Server instance
|
// NewServer initializes and returns a new Server instance
|
||||||
@@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) {
|
|||||||
// Initialize authentication service
|
// Initialize authentication service
|
||||||
server.authService = auth.NewAuthService(db, logger, config)
|
server.authService = auth.NewAuthService(db, logger, config)
|
||||||
|
|
||||||
|
// Initialize cluster authentication service (Issue #13)
|
||||||
|
if config.ClusteringEnabled {
|
||||||
|
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
|
||||||
|
}
|
||||||
|
|
||||||
// Setup initial root account if needed (Issue #3)
|
// Setup initial root account if needed (Issue #3)
|
||||||
if config.AuthEnabled {
|
if config.AuthEnabled {
|
||||||
if err := server.setupRootAccount(); err != nil {
|
if err := server.setupRootAccount(); err != nil {
|
||||||
@@ -269,10 +275,10 @@ func (s *Server) createRootUserAndToken() error {
|
|||||||
|
|
||||||
// Log the token securely (one-time display)
|
// Log the token securely (one-time display)
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"user_uuid": rootUserUUID,
|
"user_uuid": rootUserUUID,
|
||||||
"group_uuid": adminGroupUUID,
|
"group_uuid": adminGroupUUID,
|
||||||
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
|
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
|
||||||
"expires_in": "24 hours",
|
"expires_in": "24 hours",
|
||||||
}).Warn("Root account created - SAVE THIS TOKEN:")
|
}).Warn("Root account created - SAVE THIS TOKEN:")
|
||||||
|
|
||||||
// Display token prominently
|
// Display token prominently
|
||||||
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -12,10 +12,10 @@ import (
|
|||||||
|
|
||||||
// StorageService handles all BadgerDB operations and data management
|
// StorageService handles all BadgerDB operations and data management
|
||||||
type StorageService struct {
|
type StorageService struct {
|
||||||
db *badger.DB
|
db *badger.DB
|
||||||
config *types.Config
|
config *types.Config
|
||||||
compressionSvc *CompressionService
|
compressionSvc *CompressionService
|
||||||
logger *logrus.Logger
|
logger *logrus.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorageService creates a new storage service
|
// NewStorageService creates a new storage service
|
||||||
|
@@ -13,20 +13,20 @@ type StoredValue struct {
|
|||||||
|
|
||||||
// User represents a system user
|
// User represents a system user
|
||||||
type User struct {
|
type User struct {
|
||||||
UUID string `json:"uuid"` // Server-generated UUID
|
UUID string `json:"uuid"` // Server-generated UUID
|
||||||
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
||||||
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group represents a user group
|
// Group represents a user group
|
||||||
type Group struct {
|
type Group struct {
|
||||||
UUID string `json:"uuid"` // Server-generated UUID
|
UUID string `json:"uuid"` // Server-generated UUID
|
||||||
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
||||||
Members []string `json:"members"` // List of user UUIDs in this group
|
Members []string `json:"members"` // List of user UUIDs in this group
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIToken represents a JWT authentication token
|
// APIToken represents a JWT authentication token
|
||||||
@@ -40,12 +40,12 @@ type APIToken struct {
|
|||||||
|
|
||||||
// ResourceMetadata contains ownership and permission information for stored resources
|
// ResourceMetadata contains ownership and permission information for stored resources
|
||||||
type ResourceMetadata struct {
|
type ResourceMetadata struct {
|
||||||
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
||||||
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
||||||
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
||||||
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
||||||
}
|
}
|
||||||
|
|
||||||
// Permission constants for POSIX-inspired ACL
|
// Permission constants for POSIX-inspired ACL
|
||||||
@@ -231,28 +231,28 @@ type KVRangeResponse struct {
|
|||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
NodeID string `yaml:"node_id"`
|
NodeID string `yaml:"node_id"`
|
||||||
BindAddress string `yaml:"bind_address"`
|
BindAddress string `yaml:"bind_address"`
|
||||||
Port int `yaml:"port"`
|
Port int `yaml:"port"`
|
||||||
DataDir string `yaml:"data_dir"`
|
DataDir string `yaml:"data_dir"`
|
||||||
SeedNodes []string `yaml:"seed_nodes"`
|
SeedNodes []string `yaml:"seed_nodes"`
|
||||||
ReadOnly bool `yaml:"read_only"`
|
ReadOnly bool `yaml:"read_only"`
|
||||||
LogLevel string `yaml:"log_level"`
|
LogLevel string `yaml:"log_level"`
|
||||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||||
SyncInterval int `yaml:"sync_interval"`
|
SyncInterval int `yaml:"sync_interval"`
|
||||||
CatchupInterval int `yaml:"catchup_interval"`
|
CatchupInterval int `yaml:"catchup_interval"`
|
||||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||||
|
|
||||||
// Database compression configuration
|
// Database compression configuration
|
||||||
CompressionEnabled bool `yaml:"compression_enabled"`
|
CompressionEnabled bool `yaml:"compression_enabled"`
|
||||||
CompressionLevel int `yaml:"compression_level"`
|
CompressionLevel int `yaml:"compression_level"`
|
||||||
|
|
||||||
// TTL configuration
|
// TTL configuration
|
||||||
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
||||||
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
||||||
|
|
||||||
// Rate limiting configuration
|
// Rate limiting configuration
|
||||||
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
||||||
@@ -262,10 +262,10 @@ type Config struct {
|
|||||||
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
||||||
|
|
||||||
// Backup system configuration
|
// Backup system configuration
|
||||||
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
||||||
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
||||||
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
||||||
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
||||||
|
|
||||||
// Feature toggles for optional functionalities
|
// Feature toggles for optional functionalities
|
||||||
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
||||||
@@ -275,6 +275,13 @@ type Config struct {
|
|||||||
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
||||||
|
|
||||||
// Anonymous access control (Issue #5)
|
// Anonymous access control (Issue #5)
|
||||||
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
||||||
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
||||||
|
|
||||||
|
// Cluster authentication (Issue #13)
|
||||||
|
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
|
||||||
|
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
|
||||||
|
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
|
||||||
|
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
|
||||||
|
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
|
||||||
}
|
}
|
Reference in New Issue
Block a user