diff --git a/.gitignore b/.gitignore index 6174009..fb5e89c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .claude/ +.kvs/ data/ data*/ +integration_test/ *.yaml !config.yaml kvs diff --git a/CLAUDE.md b/CLAUDE.md index 2770760..2f344bc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,10 +10,16 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co go build -o kvs . # Run with default config (auto-generates config.yaml) -./kvs +./kvs start config.yaml # Run with custom config -./kvs /path/to/config.yaml +./kvs start /path/to/config.yaml + +# Check running instances +./kvs status + +# Stop instance +./kvs stop config # Run comprehensive integration tests ./integration_test.sh @@ -25,6 +31,32 @@ go run test_conflict.go data1 data2 go build -o kvs . && ./integration_test.sh ``` +### Process Management Commands +```bash +# Start as background daemon +./kvs start # .yaml extension optional + +# Stop daemon +./kvs stop # Graceful SIGTERM shutdown + +# Restart daemon +./kvs restart # Stop then start + +# Show status +./kvs status # All instances +./kvs status # Specific instance + +# Run in foreground (for debugging) +./kvs # Logs to stdout, blocks terminal + +# View daemon logs +tail -f ~/.kvs/logs/kvs_.yaml.log + +# Global state directories +~/.kvs/pids/ # PID files (works from any directory) +~/.kvs/logs/ # Daemon log files +``` + ### Development Workflow ```bash # Format and check code @@ -38,11 +70,25 @@ go mod tidy go build . # Test specific cluster scenarios -./kvs node1.yaml & # Terminal 1 -./kvs node2.yaml & # Terminal 2 +./kvs start node1.yaml +./kvs start node2.yaml + +# Wait for cluster formation +sleep 5 + +# Test data operations curl -X PUT http://localhost:8081/kv/test/data -H "Content-Type: application/json" -d '{"test":"data"}' curl http://localhost:8082/kv/test/data # Should replicate within ~30 seconds -pkill kvs + +# Check daemon status +./kvs status + +# View logs +tail -f ~/.kvs/logs/kvs_node1.yaml.log + +# Cleanup +./kvs stop node1 +./kvs stop node2 ``` ## Architecture Overview @@ -58,7 +104,8 @@ KVS is a **distributed, eventually consistent key-value store** built around thr #### Modular Package Design - **`auth/`** - Complete JWT authentication system with POSIX-inspired permissions -- **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees) +- **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees) +- **`daemon/`** - Process management (daemonization, PID files, lifecycle) - **`storage/`** - BadgerDB abstraction with compression and revision history - **`server/`** - HTTP handlers, routing, and lifecycle management - **`features/`** - Utility functions for TTL, rate limiting, tamper logging, backup @@ -147,9 +194,18 @@ Creates two BadgerDB instances with intentionally conflicting data (same path, s - **Bootstrap sync**: Up to 30 days of historical data for new nodes #### Main Entry Point Flow -1. `main.go` loads config (auto-generates default if missing) -2. `server.NewServer()` initializes all subsystems -3. Graceful shutdown handling with `SIGINT`/`SIGTERM` -4. All business logic delegated to modular packages +1. `main.go` parses command-line arguments for subcommands (`start`, `stop`, `status`, `restart`) +2. For daemon mode: `daemon.Daemonize()` spawns background process and manages PID files +3. For server mode: loads config (auto-generates default if missing) +4. `server.NewServer()` initializes all subsystems +5. Graceful shutdown handling with `SIGINT`/`SIGTERM` +6. All business logic delegated to modular packages + +#### Daemon Architecture +- **PID Management**: Global PID files stored in `~/.kvs/pids/` for cross-directory access +- **Logging**: Daemon logs written to `~/.kvs/logs/{config-name}.log` +- **Process Lifecycle**: Spawns detached process via `exec.Command()` with `Setsid: true` +- **Config Normalization**: Supports both `node1` and `node1.yaml` formats +- **Stale PID Detection**: Checks process existence via `Signal(0)` before operations This architecture enables easy feature addition, comprehensive testing, and reliable operation in distributed environments while maintaining simplicity for single-node deployments. \ No newline at end of file diff --git a/README.md b/README.md index d58f4dd..d9a0f43 100644 --- a/README.md +++ b/README.md @@ -69,11 +69,67 @@ go build -o kvs . ### Quick Test ```bash -# Start standalone node -./kvs +# Start standalone node (uses config.yaml if it exists, or creates it) +./kvs start config.yaml # Test the API curl http://localhost:8080/health + +# Check status +./kvs status + +# Stop when done +./kvs stop config +``` + +## 🎮 Process Management + +KVS includes systemd-style daemon commands for easy process management: + +```bash +# Start as background daemon +./kvs start config.yaml # or just: ./kvs start config +./kvs start node1.yaml # Start with custom config + +# Check status +./kvs status # Show all running instances +./kvs status node1 # Show specific instance + +# Stop daemon +./kvs stop node1 # Graceful shutdown + +# Restart daemon +./kvs restart node1 # Stop and start + +# Run in foreground (traditional) +./kvs node1.yaml # Logs to stdout +``` + +### Daemon Features +- **Global PID tracking**: PID files stored in `~/.kvs/pids/` (works from any directory) +- **Automatic logging**: Logs written to `~/.kvs/logs/{config-name}.log` +- **Flexible naming**: Config extension optional (`node1` or `node1.yaml` both work) +- **Graceful shutdown**: SIGTERM sent for clean shutdown +- **Stale PID cleanup**: Automatically detects and cleans dead processes +- **Multi-instance**: Run multiple KVS instances on same machine + +### Example Workflow +```bash +# Start 3-node cluster as daemons +./kvs start node1.yaml +./kvs start node2.yaml +./kvs start node3.yaml + +# Check cluster status +./kvs status + +# View logs +tail -f ~/.kvs/logs/kvs_node1.yaml.log + +# Stop entire cluster +./kvs stop node1 +./kvs stop node2 +./kvs stop node3 ``` ## ⚙️ Configuration @@ -308,17 +364,23 @@ clustering_enabled: true #### Start the Cluster ```bash -# Terminal 1 -./kvs node1.yaml - -# Terminal 2 (wait a few seconds) -./kvs node2.yaml - -# Terminal 3 (wait a few seconds) -./kvs node3.yaml +# Start as daemons +./kvs start node1.yaml +sleep 2 +./kvs start node2.yaml +sleep 2 +./kvs start node3.yaml # Verify cluster formation curl http://localhost:8081/members/ # Should show all 3 nodes + +# Check daemon status +./kvs status + +# Stop cluster when done +./kvs stop node1 +./kvs stop node2 +./kvs stop node3 ``` ## 🔄 How It Works @@ -364,9 +426,10 @@ go build -o kvs . ./integration_test.sh # Manual basic functionality test -./kvs & +./kvs start config.yaml +sleep 2 curl http://localhost:8080/health -pkill kvs +./kvs stop config # Manual cluster test (requires creating configs) echo 'node_id: "test1" @@ -379,8 +442,9 @@ port: 8082 seed_nodes: ["127.0.0.1:8081"] auth_enabled: false' > test2.yaml -./kvs test1.yaml & -./kvs test2.yaml & +./kvs start test1.yaml +sleep 2 +./kvs start test2.yaml # Test data replication (wait for cluster formation) sleep 10 @@ -393,7 +457,8 @@ sleep 30 curl http://localhost:8082/kv/test/data # Cleanup -pkill kvs +./kvs stop test1 +./kvs stop test2 rm test1.yaml test2.yaml ``` @@ -418,17 +483,22 @@ auth_enabled: false log_level: "debug"' > conflict2.yaml # Start nodes with conflicting data -./kvs conflict1.yaml & -./kvs conflict2.yaml & +./kvs start conflict1.yaml +sleep 2 +./kvs start conflict2.yaml # Watch logs for conflict resolution +tail -f ~/.kvs/logs/kvs_conflict1.yaml.log ~/.kvs/logs/kvs_conflict2.yaml.log & + # Both nodes will converge within ~10-30 seconds # Check final state sleep 30 curl http://localhost:9111/kv/test/conflict/data curl http://localhost:9112/kv/test/conflict/data -pkill kvs +# Cleanup +./kvs stop conflict1 +./kvs stop conflict2 rm conflict1.yaml conflict2.yaml ``` @@ -474,6 +544,10 @@ kvs/ ├── config/ # Configuration management │ └── config.go # Config loading & defaults │ +├── daemon/ # Process management +│ ├── daemonize.go # Background process spawning +│ └── pid.go # PID file management +│ ├── features/ # Utility features │ ├── auth.go # Auth utilities │ ├── backup.go # Backup system @@ -580,8 +654,9 @@ type StoredValue struct { ## 🛡️ Production Considerations ### Deployment -- Use systemd or similar for process management -- Configure log rotation for JSON logs +- Built-in daemon commands (`start`/`stop`/`restart`/`status`) for process management +- Alternatively, use systemd or similar for advanced orchestration +- Logs automatically written to `~/.kvs/logs/` (configure log rotation) - Set up monitoring for `/health` endpoint - Use reverse proxy (nginx/traefik) for TLS and load balancing diff --git a/auth/auth.go b/auth/auth.go index 00fa831..70405aa 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -41,7 +41,7 @@ func NewAuthService(db *badger.DB, logger *logrus.Logger, config *types.Config) // StoreAPIToken stores an API token in BadgerDB with TTL func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error { tokenHash := utils.HashToken(tokenString) - + apiToken := types.APIToken{ TokenHash: tokenHash, UserUUID: userUUID, @@ -57,13 +57,13 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes return s.db.Update(func(txn *badger.Txn) error { entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData) - + // Set TTL to the token expiration time ttl := time.Until(time.Unix(expiresAt, 0)) if ttl > 0 { entry = entry.WithTTL(ttl) } - + return txn.SetEntry(entry) }) } @@ -71,7 +71,7 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes // GetAPIToken retrieves an API token from BadgerDB by hash func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) { var apiToken types.APIToken - + err := s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(TokenStorageKey(tokenHash))) if err != nil { @@ -198,6 +198,40 @@ func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember) } +// GetResourceMetadata retrieves metadata for a resource +func (s *AuthService) GetResourceMetadata(resourceKey string) (*types.ResourceMetadata, error) { + var metadata types.ResourceMetadata + + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &metadata) + }) + }) + + if err != nil { + return nil, err + } + + return &metadata, nil +} + +// SetResourceMetadata stores metadata for a resource +func (s *AuthService) SetResourceMetadata(resourceKey string, metadata *types.ResourceMetadata) error { + metadataBytes, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %v", err) + } + + return s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(ResourceMetadataKey(resourceKey)), metadataBytes) + }) +} + // GetAuthContext retrieves auth context from request context func GetAuthContext(ctx context.Context) *AuthContext { if authCtx, ok := ctx.Value("auth").(*AuthContext); ok { @@ -209,22 +243,22 @@ func GetAuthContext(ctx context.Context) *AuthContext { // HasUsers checks if any users exist in the database func (s *AuthService) HasUsers() (bool, error) { var hasUsers bool - + err := s.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false // We only need to check if keys exist iterator := txn.NewIterator(opts) defer iterator.Close() - + // Look for any key starting with "user:" prefix := []byte("user:") for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() { hasUsers = true return nil // Found at least one user, can exit early } - + return nil }) - + return hasUsers, err -} \ No newline at end of file +} diff --git a/auth/cluster.go b/auth/cluster.go new file mode 100644 index 0000000..090ed5d --- /dev/null +++ b/auth/cluster.go @@ -0,0 +1,77 @@ +package auth + +import ( + "net/http" + + "github.com/sirupsen/logrus" +) + +// ClusterAuthService handles authentication for inter-cluster communication +type ClusterAuthService struct { + clusterSecret string + logger *logrus.Logger +} + +// NewClusterAuthService creates a new cluster authentication service +func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService { + return &ClusterAuthService{ + clusterSecret: clusterSecret, + logger: logger, + } +} + +// Middleware validates cluster authentication headers +func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Extract authentication headers + clusterSecret := r.Header.Get("X-Cluster-Secret") + nodeID := r.Header.Get("X-Node-ID") + + // Log authentication attempt + s.logger.WithFields(logrus.Fields{ + "node_id": nodeID, + "remote_addr": r.RemoteAddr, + "path": r.URL.Path, + "method": r.Method, + }).Debug("Cluster authentication attempt") + + // Validate cluster secret + if clusterSecret == "" { + s.logger.WithFields(logrus.Fields{ + "node_id": nodeID, + "remote_addr": r.RemoteAddr, + "path": r.URL.Path, + }).Warn("Missing X-Cluster-Secret header") + http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized) + return + } + + if clusterSecret != s.clusterSecret { + s.logger.WithFields(logrus.Fields{ + "node_id": nodeID, + "remote_addr": r.RemoteAddr, + "path": r.URL.Path, + }).Warn("Invalid cluster secret") + http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized) + return + } + + // Validate node ID is present + if nodeID == "" { + s.logger.WithFields(logrus.Fields{ + "remote_addr": r.RemoteAddr, + "path": r.URL.Path, + }).Warn("Missing X-Node-ID header") + http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized) + return + } + + // Authentication successful + s.logger.WithFields(logrus.Fields{ + "node_id": nodeID, + "path": r.URL.Path, + }).Debug("Cluster authentication successful") + + next.ServeHTTP(w, r) + }) +} diff --git a/auth/jwt.go b/auth/jwt.go index eda4f3a..5b3e781 100644 --- a/auth/jwt.go +++ b/auth/jwt.go @@ -64,4 +64,4 @@ func ValidateJWT(tokenString string) (*JWTClaims, error) { } return nil, fmt.Errorf("invalid token") -} \ No newline at end of file +} diff --git a/auth/middleware.go b/auth/middleware.go index 07c244e..82b348e 100644 --- a/auth/middleware.go +++ b/auth/middleware.go @@ -33,7 +33,7 @@ func (s *AuthService) Middleware(requiredScopes []string, resourceKeyExtractor f next(w, r) return } - + // Authenticate request authCtx, err := s.AuthenticateRequest(r) if err != nil { @@ -102,7 +102,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl next(w, r) return } - + // Extract auth context to get user UUID authCtx := GetAuthContext(r.Context()) if authCtx == nil { @@ -110,7 +110,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl next(w, r) return } - + // Check rate limit allowed, err := s.checkRateLimit(authCtx.UserUUID) if err != nil { @@ -118,22 +118,22 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } - + if !allowed { s.authService.logger.WithFields(logrus.Fields{ "user_uuid": authCtx.UserUUID, "limit": s.config.RateLimitRequests, "window": s.config.RateLimitWindow, }).Info("Rate limit exceeded") - + // Set rate limit headers w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests)) w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow) - + http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) return } - + next(w, r) } } @@ -151,8 +151,8 @@ func (s *RateLimitService) checkRateLimit(userUUID string) (bool, error) { if s.config.RateLimitRequests <= 0 { return true, nil // Rate limiting disabled } - + // Simplified rate limiting - in practice this would use the full implementation // that was in main.go with proper window calculations and BadgerDB storage return true, nil // For now, always allow -} \ No newline at end of file +} diff --git a/auth/permissions.go b/auth/permissions.go index 7130da1..9e39fc3 100644 --- a/auth/permissions.go +++ b/auth/permissions.go @@ -15,7 +15,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b return (permissions & types.PermGroupCreate) != 0 } return (permissions & types.PermOthersCreate) != 0 - + case "delete": if isOwner { return (permissions & types.PermOwnerDelete) != 0 @@ -24,7 +24,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b return (permissions & types.PermGroupDelete) != 0 } return (permissions & types.PermOthersDelete) != 0 - + case "write": if isOwner { return (permissions & types.PermOwnerWrite) != 0 @@ -33,7 +33,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b return (permissions & types.PermGroupWrite) != 0 } return (permissions & types.PermOthersWrite) != 0 - + case "read": if isOwner { return (permissions & types.PermOwnerRead) != 0 @@ -42,7 +42,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b return (permissions & types.PermGroupRead) != 0 } return (permissions & types.PermOthersRead) != 0 - + default: return false } @@ -51,7 +51,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b // CheckUserResourceRelationship determines user relationship to resource func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) { isOwner = (userUUID == metadata.OwnerUUID) - + if metadata.GroupUUID != "" { for _, groupUUID := range userGroups { if groupUUID == metadata.GroupUUID { @@ -60,6 +60,6 @@ func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMeta } } } - + return isOwner, isGroupMember -} \ No newline at end of file +} diff --git a/auth/storage.go b/auth/storage.go index bccf16e..889cca1 100644 --- a/auth/storage.go +++ b/auth/storage.go @@ -16,4 +16,4 @@ func TokenStorageKey(tokenHash string) string { func ResourceMetadataKey(resourceKey string) string { return resourceKey + ":metadata" -} \ No newline at end of file +} diff --git a/cluster/bootstrap.go b/cluster/bootstrap.go index f8e4704..314ec88 100644 --- a/cluster/bootstrap.go +++ b/cluster/bootstrap.go @@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool { return false } - client := &http.Client{Timeout: 10 * time.Second} - url := fmt.Sprintf("http://%s/members/join", seedAddr) + client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr) - resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithError(err).Error("Failed to create join request") + return false + } + req.Header.Set("Content-Type", "application/json") + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { s.logger.WithFields(logrus.Fields{ "seed": seedAddr, @@ -142,4 +151,4 @@ func (s *BootstrapService) performGradualSync() { } s.logger.Info("Gradual sync completed") -} \ No newline at end of file +} diff --git a/cluster/gossip.go b/cluster/gossip.go index cbdf74a..f58683f 100644 --- a/cluster/gossip.go +++ b/cluster/gossip.go @@ -17,13 +17,13 @@ import ( // GossipService handles gossip protocol operations type GossipService struct { - config *types.Config - members map[string]*types.Member - membersMu sync.RWMutex - logger *logrus.Logger - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + config *types.Config + members map[string]*types.Member + membersMu sync.RWMutex + logger *logrus.Logger + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } // NewGossipService creates a new gossip service @@ -44,7 +44,7 @@ func (s *GossipService) Start() { s.logger.Info("Clustering disabled, skipping gossip routine") return } - + s.wg.Add(1) go s.gossipRoutine() } @@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error { return err } - // Send HTTP request to peer - client := &http.Client{Timeout: 5 * time.Second} - url := fmt.Sprintf("http://%s/members/gossip", peer.Address) + // Send HTTP request to peer with cluster authentication + client := NewAuthenticatedHTTPClient(s.config, 5*time.Second) + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address) - resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + s.logger.WithError(err).Error("Failed to create gossip request") + return err + } + req.Header.Set("Content-Type", "application/json") + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { s.logger.WithFields(logrus.Fields{ "peer": peer.Address, @@ -300,4 +309,4 @@ func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID func (s *GossipService) GetJoinedTimestamp() int64 { // This should be implemented by the server that uses this service return time.Now().UnixMilli() -} \ No newline at end of file +} diff --git a/cluster/http_client.go b/cluster/http_client.go new file mode 100644 index 0000000..70397b6 --- /dev/null +++ b/cluster/http_client.go @@ -0,0 +1,43 @@ +package cluster + +import ( + "crypto/tls" + "net/http" + "time" + + "kvs/types" +) + +// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication +func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client { + client := &http.Client{ + Timeout: timeout, + } + + // Configure TLS if enabled + if config.ClusterTLSEnabled { + tlsConfig := &tls.Config{ + InsecureSkipVerify: config.ClusterTLSSkipVerify, + } + + client.Transport = &http.Transport{ + TLSClientConfig: tlsConfig, + } + } + + return client +} + +// AddClusterAuthHeaders adds authentication headers to an HTTP request +func AddClusterAuthHeaders(req *http.Request, config *types.Config) { + req.Header.Set("X-Cluster-Secret", config.ClusterSecret) + req.Header.Set("X-Node-ID", config.NodeID) +} + +// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration +func GetProtocol(config *types.Config) string { + if config.ClusterTLSEnabled { + return "https" + } + return "http" +} diff --git a/cluster/merkle.go b/cluster/merkle.go index e6032fe..df84486 100644 --- a/cluster/merkle.go +++ b/cluster/merkle.go @@ -170,7 +170,7 @@ func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.Me if err != nil { return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err) } - + filteredPairs := FilterPairsByRange(pairs, startKey, endKey) return s.BuildMerkleTreeFromPairs(filteredPairs) -} \ No newline at end of file +} diff --git a/cluster/sync.go b/cluster/sync.go index 84cfda8..a6f5fbd 100644 --- a/cluster/sync.go +++ b/cluster/sync.go @@ -51,11 +51,11 @@ func (s *SyncService) Start() { s.logger.Info("Clustering disabled, skipping sync routines") return } - + // Start sync routine s.wg.Add(1) go s.syncRoutine() - + // Start Merkle tree rebuild routine s.wg.Add(1) go s.merkleTreeRebuildRoutine() @@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() { // 2. Compare roots and start recursive diffing if they differ if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) { s.logger.WithFields(logrus.Fields{ - "peer": peer.Address, - "local_root": hex.EncodeToString(localRoot.Hash), - "remote_root": hex.EncodeToString(remoteRoot.Hash), + "peer": peer.Address, + "local_root": hex.EncodeToString(localRoot.Hash), + "remote_root": hex.EncodeToString(remoteRoot.Hash), }).Info("Merkle roots differ, starting recursive diff") s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot) } else { @@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() { // requestMerkleRoot requests the Merkle root from a peer func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) { - client := &http.Client{Timeout: 10 * time.Second} - url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress) + client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress) - resp, err := client.Get(url) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { return nil, err } @@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re // Hashes differ, need to go deeper. // Request children from the remote peer for the current range. req := types.MerkleTreeDiffRequest{ - ParentNode: *remoteNode, // We are asking the remote peer about its children for this range + ParentNode: *remoteNode, // We are asking the remote peer about its children for this range LocalHash: localNode.Hash, // Our hash for this range } @@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc // fetchSingleKVFromPeer fetches a single KV pair from a peer func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) { - client := &http.Client{Timeout: 5 * time.Second} - url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) + client := NewAuthenticatedHTTPClient(s.config, 5*time.Second) + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path) - resp, err := client.Get(url) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { return nil, err } @@ -398,14 +412,14 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal // Timestamps are equal - need sophisticated conflict resolution s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule") - + // Get cluster members to determine which node is older members := s.gossipService.GetMembers() - + // Find the local node and the remote node in membership var localMember, remoteMember *types.Member localNodeID := s.config.NodeID - + for _, member := range members { if member.ID == localNodeID { localMember = member @@ -414,16 +428,16 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal remoteMember = member } } - + // If we can't find membership info, fall back to UUID comparison for deterministic result if localMember == nil || remoteMember == nil { s.logger.WithFields(logrus.Fields{ - "key": key, - "peerAddress": peerAddress, - "localNodeID": localNodeID, - "localMember": localMember != nil, - "remoteMember": remoteMember != nil, - "totalMembers": len(members), + "key": key, + "peerAddress": peerAddress, + "localNodeID": localNodeID, + "localMember": localMember != nil, + "remoteMember": remoteMember != nil, + "totalMembers": len(members), }).Warn("Could not find membership info for conflict resolution, using UUID comparison") if remote.UUID < local.UUID { // Remote UUID lexically smaller (deterministic choice) @@ -436,41 +450,49 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)") return nil } - + // Apply oldest-node rule: node with earliest joined_timestamp wins if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { // Remote node is older, its data wins err := s.storeReplicatedDataWithMetadata(key, remote) if err == nil { s.logger.WithFields(logrus.Fields{ - "key": key, - "local_joined": localMember.JoinedTimestamp, - "remote_joined": remoteMember.JoinedTimestamp, + "key": key, + "local_joined": localMember.JoinedTimestamp, + "remote_joined": remoteMember.JoinedTimestamp, }).Info("Conflict resolved: remote data wins (oldest-node rule)") } return err } - + // Local node is older or equal, keep local data s.logger.WithFields(logrus.Fields{ - "key": key, - "local_joined": localMember.JoinedTimestamp, - "remote_joined": remoteMember.JoinedTimestamp, + "key": key, + "local_joined": localMember.JoinedTimestamp, + "remote_joined": remoteMember.JoinedTimestamp, }).Info("Conflict resolved: local data wins (oldest-node rule)") return nil } // requestMerkleDiff requests children hashes or keys for a given node/range from a peer -func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { - jsonData, err := json.Marshal(req) +func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { + jsonData, err := json.Marshal(reqData) if err != nil { return nil, err } - client := &http.Client{Timeout: 10 * time.Second} - url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress) + client := NewAuthenticatedHTTPClient(s.config, 10*time.Second) + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress) - resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { return nil, err } @@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me // fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { - req := types.KVRangeRequest{ + reqData := types.KVRangeRequest{ StartKey: startKey, EndKey: endKey, Limit: 0, // No limit } - jsonData, err := json.Marshal(req) + jsonData, err := json.Marshal(reqData) if err != nil { return err } - client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches - url := fmt.Sprintf("http://%s/kv_range", peerAddress) + client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches + protocol := GetProtocol(s.config) + url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress) - resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + AddClusterAuthHeaders(req, s.config) + + resp, err := client.Do(req) if err != nil { return err } @@ -568,4 +598,4 @@ func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey st } } return nil -} \ No newline at end of file +} diff --git a/config/config.go b/config/config.go index 693331f..43089ab 100644 --- a/config/config.go +++ b/config/config.go @@ -1,12 +1,14 @@ package config import ( + "crypto/rand" + "encoding/base64" "fmt" "os" "path/filepath" - "kvs/types" "gopkg.in/yaml.v3" + "kvs/types" ) // Default configuration @@ -27,41 +29,61 @@ func Default() *types.Config { BootstrapMaxAgeHours: 720, // 30 days ThrottleDelayMs: 100, FetchDelayMs: 50, - + // Default compression settings CompressionEnabled: true, CompressionLevel: 3, // Balance between performance and compression ratio - + // Default TTL and size limit settings - DefaultTTL: "0", // No default TTL - MaxJSONSize: 1048576, // 1MB default max JSON size - + DefaultTTL: "0", // No default TTL + MaxJSONSize: 1048576, // 1MB default max JSON size + // Default rate limiting settings RateLimitRequests: 100, // 100 requests per window RateLimitWindow: "1m", // 1 minute window - + // Default tamper-evident logging settings TamperLogActions: []string{"data_write", "user_create", "auth_failure"}, - + // Default backup system settings BackupEnabled: true, BackupSchedule: "0 0 * * *", // Daily at midnight BackupPath: "./backups", BackupRetention: 7, // Keep backups for 7 days - + // Default feature toggle settings (all enabled by default) AuthEnabled: true, TamperLoggingEnabled: true, ClusteringEnabled: true, RateLimitingEnabled: true, RevisionHistoryEnabled: true, - + // Default anonymous access settings (both disabled by default for security) - AllowAnonymousRead: false, - AllowAnonymousWrite: false, + AllowAnonymousRead: false, + AllowAnonymousWrite: false, + + // Default cluster authentication settings (Issue #13) + ClusterSecret: generateClusterSecret(), + ClusterTLSEnabled: false, + ClusterTLSCertFile: "", + ClusterTLSKeyFile: "", + ClusterTLSSkipVerify: false, } } +// generateClusterSecret generates a cryptographically secure random cluster secret +func generateClusterSecret() string { + // Generate 32 bytes (256 bits) of random data + randomBytes := make([]byte, 32) + if _, err := rand.Read(randomBytes); err != nil { + // Fallback to a warning - this should never happen in practice + fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err) + return "" + } + // Encode as base64 for easy configuration file storage + return base64.StdEncoding.EncodeToString(randomBytes) +} + // Load configuration from file or create default func Load(configPath string) (*types.Config, error) { config := Default() @@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) { return nil, fmt.Errorf("failed to parse config file: %v", err) } + // Generate cluster secret if not provided and clustering is enabled (Issue #13) + if config.ClusteringEnabled && config.ClusterSecret == "" { + config.ClusterSecret = generateClusterSecret() + fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n") + fmt.Printf(" To share this secret with other nodes, add it to your config:\n") + fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret) + } + return config, nil -} \ No newline at end of file +} diff --git a/daemon/daemonize.go b/daemon/daemonize.go new file mode 100644 index 0000000..b8494bf --- /dev/null +++ b/daemon/daemonize.go @@ -0,0 +1,87 @@ +package daemon + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "syscall" +) + +// GetLogFilePath returns the log file path for a given config file +func GetLogFilePath(configPath string) (string, error) { + logDir, err := getLogDir() + if err != nil { + return "", err + } + + absConfigPath, err := filepath.Abs(configPath) + if err != nil { + return "", fmt.Errorf("failed to get absolute config path: %w", err) + } + + basename := filepath.Base(configPath) + name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename + return filepath.Join(logDir, name+".log"), nil +} + +// Daemonize spawns the process as a daemon and returns +func Daemonize(configPath string) error { + // Get absolute path to the current executable + executable, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) + } + + // Get absolute path to config + absConfigPath, err := filepath.Abs(configPath) + if err != nil { + return fmt.Errorf("failed to get absolute config path: %w", err) + } + + // Check if already running + _, running, err := ReadPID(configPath) + if err != nil { + return fmt.Errorf("failed to check if instance is running: %w", err) + } + if running { + return fmt.Errorf("instance is already running") + } + + // Spawn the process in background with --daemon flag + cmd := exec.Command(executable, "--daemon", absConfigPath) + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setsid: true, // Create new session + } + + // Redirect stdout/stderr to log file + logDir, err := getLogDir() + if err != nil { + return fmt.Errorf("failed to get log directory: %w", err) + } + if err := os.MkdirAll(logDir, 0755); err != nil { + return fmt.Errorf("failed to create log directory: %w", err) + } + + basename := filepath.Base(configPath) + name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename + logFile := filepath.Join(logDir, name+".log") + + f, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open log file: %w", err) + } + defer f.Close() + + cmd.Stdout = f + cmd.Stderr = f + + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start daemon: %w", err) + } + + fmt.Printf("Started KVS instance '%s' (PID will be written by daemon)\n", filepath.Base(configPath)) + fmt.Printf("Logs: %s\n", logFile) + + return nil +} diff --git a/daemon/pid.go b/daemon/pid.go new file mode 100644 index 0000000..8e5240d --- /dev/null +++ b/daemon/pid.go @@ -0,0 +1,171 @@ +package daemon + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "syscall" +) + +// getPIDDir returns the absolute path to the PID directory +func getPIDDir() (string, error) { + homeDir, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("failed to get user home directory: %w", err) + } + return filepath.Join(homeDir, ".kvs", "pids"), nil +} + +// getLogDir returns the absolute path to the log directory +func getLogDir() (string, error) { + homeDir, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("failed to get user home directory: %w", err) + } + return filepath.Join(homeDir, ".kvs", "logs"), nil +} + +// GetPIDFilePath returns the PID file path for a given config file +func GetPIDFilePath(configPath string) string { + pidDir, err := getPIDDir() + if err != nil { + // Fallback to local directory + pidDir = ".kvs/pids" + } + + // Extract basename without extension + basename := filepath.Base(configPath) + name := strings.TrimSuffix(basename, filepath.Ext(basename)) + + return filepath.Join(pidDir, name+".pid") +} + +// EnsurePIDDir creates the PID directory if it doesn't exist +func EnsurePIDDir() error { + pidDir, err := getPIDDir() + if err != nil { + return err + } + return os.MkdirAll(pidDir, 0755) +} + +// WritePID writes the current process PID to a file +func WritePID(configPath string) error { + if err := EnsurePIDDir(); err != nil { + return fmt.Errorf("failed to create PID directory: %w", err) + } + + pidFile := GetPIDFilePath(configPath) + pid := os.Getpid() + + return os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", pid)), 0644) +} + +// ReadPID reads the PID from a file and checks if the process is running +func ReadPID(configPath string) (int, bool, error) { + pidFile := GetPIDFilePath(configPath) + + data, err := os.ReadFile(pidFile) + if err != nil { + if os.IsNotExist(err) { + return 0, false, nil + } + return 0, false, fmt.Errorf("failed to read PID file: %w", err) + } + + pidStr := strings.TrimSpace(string(data)) + pid, err := strconv.Atoi(pidStr) + if err != nil { + return 0, false, fmt.Errorf("invalid PID in file: %w", err) + } + + // Check if process is actually running + process, err := os.FindProcess(pid) + if err != nil { + return pid, false, nil + } + + // Send signal 0 to check if process exists + err = process.Signal(syscall.Signal(0)) + if err != nil { + return pid, false, nil + } + + return pid, true, nil +} + +// RemovePID removes the PID file +func RemovePID(configPath string) error { + pidFile := GetPIDFilePath(configPath) + err := os.Remove(pidFile) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove PID file: %w", err) + } + return nil +} + +// ListRunningInstances returns a list of running KVS instances +func ListRunningInstances() ([]InstanceInfo, error) { + var instances []InstanceInfo + + pidDir, err := getPIDDir() + if err != nil { + return nil, err + } + + // Check if PID directory exists + if _, err := os.Stat(pidDir); os.IsNotExist(err) { + return instances, nil + } + + entries, err := os.ReadDir(pidDir) + if err != nil { + return nil, fmt.Errorf("failed to read PID directory: %w", err) + } + + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pid") { + continue + } + + name := strings.TrimSuffix(entry.Name(), ".pid") + configPath := name + ".yaml" // Assume .yaml extension + + pid, running, err := ReadPID(configPath) + if err != nil { + continue + } + + instances = append(instances, InstanceInfo{ + Name: name, + PID: pid, + Running: running, + }) + } + + return instances, nil +} + +// InstanceInfo holds information about a KVS instance +type InstanceInfo struct { + Name string + PID int + Running bool +} + +// StopProcess stops a process by PID +func StopProcess(pid int) error { + process, err := os.FindProcess(pid) + if err != nil { + return fmt.Errorf("failed to find process: %w", err) + } + + // Try graceful shutdown first (SIGTERM) + if err := process.Signal(syscall.SIGTERM); err != nil { + return fmt.Errorf("failed to send SIGTERM: %w", err) + } + + return nil +} diff --git a/features/auth.go b/features/auth.go index 65580a5..8b9ffd6 100644 --- a/features/auth.go +++ b/features/auth.go @@ -99,4 +99,4 @@ func ExtractKVResourceKey(r *http.Request) string { return path } return "" -} \ No newline at end of file +} diff --git a/features/backup.go b/features/backup.go index 893351b..f73e4da 100644 --- a/features/backup.go +++ b/features/backup.go @@ -8,4 +8,4 @@ import ( // GetBackupFilename generates a filename for a backup func GetBackupFilename(timestamp time.Time) string { return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02")) -} \ No newline at end of file +} diff --git a/features/features.go b/features/features.go index 6ee027e..e86694c 100644 --- a/features/features.go +++ b/features/features.go @@ -1,4 +1,4 @@ // Package features provides utility functions for KVS authentication, validation, // logging, backup, and other operational features. These functions were extracted // from main.go to improve code organization and maintainability. -package features \ No newline at end of file +package features diff --git a/features/ratelimit.go b/features/ratelimit.go index 99ef49b..5906e0e 100644 --- a/features/ratelimit.go +++ b/features/ratelimit.go @@ -5,4 +5,4 @@ import "fmt" // GetRateLimitKey generates the storage key for rate limiting func GetRateLimitKey(userUUID string, windowStart int64) string { return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart) -} \ No newline at end of file +} diff --git a/features/revision.go b/features/revision.go index d348010..395df73 100644 --- a/features/revision.go +++ b/features/revision.go @@ -5,4 +5,4 @@ import "fmt" // GetRevisionKey generates the storage key for a specific revision func GetRevisionKey(baseKey string, revision int) string { return fmt.Sprintf("%s:rev:%d", baseKey, revision) -} \ No newline at end of file +} diff --git a/features/tamperlog.go b/features/tamperlog.go index 6e57f21..8e5c569 100644 --- a/features/tamperlog.go +++ b/features/tamperlog.go @@ -21,4 +21,4 @@ func GenerateLogSignature(timestamp, action, userUUID, resource string) string { // Concatenate all fields in a deterministic order data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource) return utils.HashSHA3512(data) -} \ No newline at end of file +} diff --git a/features/validation.go b/features/validation.go index 096c3b5..de570df 100644 --- a/features/validation.go +++ b/features/validation.go @@ -21,4 +21,4 @@ func ParseTTL(ttlString string) (time.Duration, error) { } return duration, nil -} \ No newline at end of file +} diff --git a/integration_test.sh b/integration_test.sh index e209cf3..183544f 100755 --- a/integration_test.sh +++ b/integration_test.sh @@ -45,6 +45,7 @@ cleanup() { log_info "Cleaning up test environment..." pkill -f "$BINARY" 2>/dev/null || true rm -rf "$TEST_DIR" 2>/dev/null || true + rm -rf "$HOME/.kvs" 2>/dev/null || true # Clean up PID and log files from home dir sleep 2 # Allow processes to fully terminate } @@ -53,7 +54,7 @@ wait_for_service() { local port=$1 local timeout=${2:-30} local count=0 - + while [ $count -lt $timeout ]; do if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then return 0 @@ -64,6 +65,15 @@ wait_for_service() { return 1 } +# Get log file path for a config file (matches daemon naming convention) +get_log_file() { + local config=$1 + local abs_path=$(realpath "$config") + local basename=$(basename "$config") + local dirname=$(basename $(dirname "$abs_path")) + echo "$HOME/.kvs/logs/${dirname}_${basename}.log" +} + # Test 1: Build verification test_build() { test_start "Binary build verification" @@ -82,7 +92,7 @@ test_build() { # Test 2: Basic functionality test_basic_functionality() { test_start "Basic functionality test" - + # Create basic config cat > basic.yaml </dev/null 2>&1 & - local pid=$! - + + # Start node using daemon command + $BINARY start basic.yaml >/dev/null 2>&1 + sleep 2 + if wait_for_service 8090; then # Test basic CRUD local put_result=$(curl -s -X PUT http://localhost:8090/kv/test/basic \ -H "Content-Type: application/json" \ -d '{"message":"hello world"}') - + local get_result=$(curl -s http://localhost:8090/kv/test/basic) - local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) # Adjusted jq path - + local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) + if [ "$message" = "hello world" ]; then log_success "Basic CRUD operations work" else @@ -116,15 +126,18 @@ EOF else log_error "Basic test node failed to start" fi - - kill $pid 2>/dev/null || true - sleep 2 + + $BINARY stop basic.yaml >/dev/null 2>&1 + sleep 1 } # Test 3: Cluster formation test_cluster_formation() { test_start "2-node cluster formation and Merkle Tree replication" - + + # Shared cluster secret for authentication (Issue #13) + local CLUSTER_SECRET="test-cluster-secret-12345678901234567890" + # Node 1 config cat > cluster1.yaml < cluster2.yaml </dev/null 2>&1 & - local pid1=$! - + # Start nodes using daemon commands + $BINARY start cluster1.yaml >/dev/null 2>&1 + sleep 2 + if ! wait_for_service 8101; then log_error "Cluster node 1 failed to start" - kill $pid1 2>/dev/null || true + $BINARY stop cluster1.yaml >/dev/null 2>&1 return 1 fi - - sleep 2 # Give node 1 a moment to fully initialize - $BINARY cluster2.yaml >/dev/null 2>&1 & - local pid2=$! - + + $BINARY start cluster2.yaml >/dev/null 2>&1 + sleep 2 + if ! wait_for_service 8102; then log_error "Cluster node 2 failed to start" - kill $pid1 $pid2 2>/dev/null || true + $BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1 return 1 fi @@ -219,9 +233,9 @@ EOF else log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)" fi - - kill $pid1 $pid2 2>/dev/null || true - sleep 2 + + $BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1 + sleep 1 } # Test 4: Conflict resolution (Merkle Tree based) @@ -230,15 +244,18 @@ EOF # but same path. The Merkle tree sync should then trigger conflict resolution. test_conflict_resolution() { test_start "Conflict resolution test (Merkle Tree based)" - + # Create conflicting data using our utility rm -rf conflict1_data conflict2_data 2>/dev/null || true mkdir -p conflict1_data conflict2_data - + cd "$SCRIPT_DIR" if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then cd "$TEST_DIR" - + + # Shared cluster secret for authentication (Issue #13) + local CLUSTER_SECRET="conflict-cluster-secret-1234567890123" + # Create configs cat > conflict1.yaml < conflict2.yaml <conflict1.log 2>&1 & - local pid1=$! - + $BINARY start conflict1.yaml >/dev/null 2>&1 + sleep 2 + if wait_for_service 8111; then + $BINARY start conflict2.yaml >/dev/null 2>&1 sleep 2 - $BINARY conflict2.yaml >conflict2.log 2>&1 & - local pid2=$! - + if wait_for_service 8112; then # Get initial data (full StoredValue) local node1_initial_full=$(curl -s http://localhost:8111/kv/test/conflict/data) @@ -334,8 +352,10 @@ EOF log_error "Resolved data has inconsistent UUID/Timestamp: N1_UUID=$node1_final_uuid, N1_TS=$node1_final_timestamp, N2_UUID=$node2_final_uuid, N2_TS=$node2_final_timestamp" fi - # Optionally, check logs for conflict resolution messages - if grep -q "Conflict resolved" conflict1.log conflict2.log 2>/dev/null; then + # Check logs for conflict resolution messages + local log1=$(get_log_file conflict1.yaml) + local log2=$(get_log_file conflict2.yaml) + if grep -q "Conflict resolved" "$log1" "$log2" 2>/dev/null; then log_success "Conflict resolution messages found in logs" else log_error "No 'Conflict resolved' messages found in logs, but data converged." @@ -347,14 +367,14 @@ EOF else log_error "Conflict node 2 failed to start" fi - - kill $pid2 2>/dev/null || true + + $BINARY stop conflict2.yaml >/dev/null 2>&1 else log_error "Conflict node 1 failed to start" fi - - kill $pid1 2>/dev/null || true - sleep 2 + + $BINARY stop conflict1.yaml >/dev/null 2>&1 + sleep 1 else cd "$TEST_DIR" log_error "Failed to create conflict test data. Ensure test_conflict.go is correct." @@ -378,22 +398,21 @@ allow_anonymous_read: false allow_anonymous_write: false EOF - # Start node - $BINARY auth_test.yaml >auth_test.log 2>&1 & - local pid=$! - + # Start node using daemon command + $BINARY start auth_test.yaml >/dev/null 2>&1 + sleep 3 # Allow daemon to start and root account creation + if wait_for_service 8095; then - sleep 2 # Allow root account creation - # Extract the token from logs - local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r') - + local log_file=$(get_log_file auth_test.yaml) + local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r') + if [ -z "$token" ]; then log_error "Failed to extract authentication token from logs" - kill $pid 2>/dev/null || true + $BINARY stop auth_test.yaml >/dev/null 2>&1 return fi - + # Test 1: Admin endpoints should fail without authentication local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}') if echo "$no_auth_response" | grep -q "Unauthorized"; then @@ -401,7 +420,7 @@ EOF else log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response" fi - + # Test 2: Admin endpoints should work with valid authentication local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}') if echo "$auth_response" | grep -q "uuid"; then @@ -409,7 +428,7 @@ EOF else log_error "Admin endpoints should work with authentication, got: $auth_response" fi - + # Test 3: KV endpoints should require auth when anonymous access is disabled local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}') if echo "$kv_no_auth" | grep -q "Unauthorized"; then @@ -417,7 +436,7 @@ EOF else log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth" fi - + # Test 4: KV endpoints should work with valid authentication local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}') if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then @@ -425,15 +444,167 @@ EOF else log_error "KV endpoints should work with authentication, got: $kv_auth" fi - - kill $pid 2>/dev/null || true - sleep 2 + + $BINARY stop auth_test.yaml >/dev/null 2>&1 + sleep 1 else log_error "Auth test node failed to start" - kill $pid 2>/dev/null || true + $BINARY stop auth_test.yaml >/dev/null 2>&1 fi } +# Test 6: Resource Metadata Management (Issue #12) +test_metadata_management() { + test_start "Resource Metadata Management test (Issue #12)" + + # Create metadata test config + cat > metadata_test.yaml </dev/null 2>&1 + sleep 3 # Allow daemon to start and root account creation + + if wait_for_service 8096; then + # Extract the token from logs + local log_file=$(get_log_file metadata_test.yaml) + local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r') + + if [ -z "$token" ]; then + log_error "Failed to extract authentication token from logs" + $BINARY stop metadata_test.yaml >/dev/null 2>&1 + return + fi + + # First, create a KV resource + curl -s -X PUT http://localhost:8096/kv/test/resource -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"data":"test"}' >/dev/null + sleep 1 + + # Test 1: Get metadata should fail for non-existent metadata (initially no metadata exists) + local get_response=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token") + local get_body=$(echo "$get_response" | head -n -1) + local get_code=$(echo "$get_response" | tail -n 1) + + if [ "$get_code" = "404" ]; then + log_success "GET metadata returns 404 for non-existent metadata" + else + log_error "GET metadata should return 404 for non-existent metadata, got code: $get_code, body: $get_body" + fi + + # Test 2: Update metadata should create new metadata + local update_response=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"test-owner-123","permissions":3840}') + if echo "$update_response" | grep -q "owner_uuid"; then + log_success "PUT metadata creates metadata successfully" + else + log_error "PUT metadata should create metadata, got: $update_response" + fi + + # Test 3: Get metadata should now return the created metadata + local get_response2=$(curl -s -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token") + if echo "$get_response2" | grep -q "test-owner-123" && echo "$get_response2" | grep -q "3840"; then + log_success "GET metadata returns created metadata" + else + log_error "GET metadata should return created metadata, got: $get_response2" + fi + + # Test 4: Update metadata should modify existing metadata + local update_response2=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"new-owner-456"}') + if echo "$update_response2" | grep -q "new-owner-456"; then + log_success "PUT metadata updates existing metadata" + else + log_error "PUT metadata should update metadata, got: $update_response2" + fi + + # Test 5: Metadata endpoints should require authentication + local no_auth=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata) + local no_auth_code=$(echo "$no_auth" | tail -n 1) + if [ "$no_auth_code" = "401" ]; then + log_success "Metadata endpoints properly require authentication" + else + log_error "Metadata endpoints should require authentication, got code: $no_auth_code" + fi + + $BINARY stop metadata_test.yaml >/dev/null 2>&1 + sleep 1 + else + log_error "Metadata test node failed to start" + $BINARY stop metadata_test.yaml >/dev/null 2>&1 + fi +} + +# Test 7: Daemon commands (start, stop, status, restart) +test_daemon_commands() { + test_start "Daemon command tests (start, stop, status, restart)" + + # Create daemon test config + cat > daemon_test.yaml </dev/null 2>&1 + sleep 3 # Allow daemon to start + + if wait_for_service 8097 5; then + log_success "Daemon 'start' command works" + + # Test 2: Status command shows running + local status_output=$($BINARY status daemon_test.yaml 2>&1) + if echo "$status_output" | grep -q "RUNNING"; then + log_success "Daemon 'status' command shows RUNNING" + else + log_error "Daemon 'status' should show RUNNING, got: $status_output" + fi + + # Test 3: Stop command + $BINARY stop daemon_test.yaml >/dev/null 2>&1 + sleep 2 + + # Check that service is actually stopped + if ! curl -s "http://localhost:8097/health" >/dev/null 2>&1; then + log_success "Daemon 'stop' command works" + else + log_error "Daemon should be stopped but is still responding" + fi + + # Test 4: Restart command + $BINARY restart daemon_test.yaml >/dev/null 2>&1 + sleep 3 + + if wait_for_service 8097 5; then + log_success "Daemon 'restart' command works" + + # Clean up + $BINARY stop daemon_test.yaml >/dev/null 2>&1 + sleep 1 + else + log_error "Daemon 'restart' failed to start service" + fi + else + log_error "Daemon 'start' command failed" + fi + + # Ensure cleanup + pkill -f "daemon_test.yaml" 2>/dev/null || true + sleep 1 +} + # Main test execution main() { echo "==================================================" @@ -452,7 +623,9 @@ main() { test_cluster_formation test_conflict_resolution test_authentication_middleware - + test_metadata_management + test_daemon_commands + # Results echo "==================================================" echo " Test Results" diff --git a/main.go b/main.go index ebcc2d9..66c6665 100644 --- a/main.go +++ b/main.go @@ -6,26 +6,90 @@ import ( "os" "os/signal" "syscall" + "time" + + "path/filepath" + "strings" "kvs/config" + "kvs/daemon" "kvs/server" ) - func main() { - configPath := "./config.yaml" - - // Simple CLI argument parsing - if len(os.Args) > 1 { - configPath = os.Args[1] + if len(os.Args) < 2 { + // No arguments - run in foreground with default config + runServer("./config.yaml", false) + return } + // Check if this is a daemon spawn + if os.Args[1] == "--daemon" { + if len(os.Args) < 3 { + fmt.Fprintf(os.Stderr, "Error: --daemon flag requires config path\n") + os.Exit(1) + } + runServer(os.Args[2], true) + return + } + + // Parse subcommand + command := os.Args[1] + + switch command { + case "start": + if len(os.Args) < 3 { + fmt.Fprintf(os.Stderr, "Usage: kvs start \n") + os.Exit(1) + } + cmdStart(normalizeConfigPath(os.Args[2])) + + case "stop": + if len(os.Args) < 3 { + fmt.Fprintf(os.Stderr, "Usage: kvs stop \n") + os.Exit(1) + } + cmdStop(normalizeConfigPath(os.Args[2])) + + case "restart": + if len(os.Args) < 3 { + fmt.Fprintf(os.Stderr, "Usage: kvs restart \n") + os.Exit(1) + } + cmdRestart(normalizeConfigPath(os.Args[2])) + + case "status": + if len(os.Args) > 2 { + cmdStatusSingle(normalizeConfigPath(os.Args[2])) + } else { + cmdStatusAll() + } + + case "help", "--help", "-h": + printHelp() + + default: + // Backward compatibility: assume it's a config file path + runServer(command, false) + } +} + +func runServer(configPath string, isDaemon bool) { cfg, err := config.Load(configPath) if err != nil { fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err) os.Exit(1) } + // Write PID file if running as daemon + if isDaemon { + if err := daemon.WritePID(configPath); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write PID file: %v\n", err) + os.Exit(1) + } + defer daemon.RemovePID(configPath) + } + kvServer, err := server.NewServer(cfg) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err) @@ -46,3 +110,135 @@ func main() { os.Exit(1) } } + +func cmdStart(configPath string) { + if err := daemon.Daemonize(configPath); err != nil { + fmt.Fprintf(os.Stderr, "Failed to start: %v\n", err) + os.Exit(1) + } +} + +func cmdStop(configPath string) { + pid, running, err := daemon.ReadPID(configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err) + os.Exit(1) + } + + if !running { + fmt.Printf("Instance '%s' is not running\n", configPath) + // Clean up stale PID file + daemon.RemovePID(configPath) + return + } + + fmt.Printf("Stopping instance '%s' (PID %d)...\n", configPath, pid) + if err := daemon.StopProcess(pid); err != nil { + fmt.Fprintf(os.Stderr, "Failed to stop process: %v\n", err) + os.Exit(1) + } + + // Wait a bit and verify it stopped + time.Sleep(1 * time.Second) + _, stillRunning, _ := daemon.ReadPID(configPath) + if stillRunning { + fmt.Printf("Warning: Process may still be running\n") + } else { + daemon.RemovePID(configPath) + fmt.Printf("Stopped successfully\n") + } +} + +func cmdRestart(configPath string) { + // Check if running + _, running, err := daemon.ReadPID(configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to check status: %v\n", err) + os.Exit(1) + } + + if running { + cmdStop(configPath) + // Wait a bit for clean shutdown + time.Sleep(2 * time.Second) + } + + cmdStart(configPath) +} + +func cmdStatusSingle(configPath string) { + pid, running, err := daemon.ReadPID(configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err) + os.Exit(1) + } + + if running { + fmt.Printf("Instance '%s': RUNNING (PID %d)\n", configPath, pid) + } else if pid > 0 { + fmt.Printf("Instance '%s': STOPPED (stale PID %d)\n", configPath, pid) + } else { + fmt.Printf("Instance '%s': STOPPED\n", configPath) + } +} + +func cmdStatusAll() { + instances, err := daemon.ListRunningInstances() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to list instances: %v\n", err) + os.Exit(1) + } + + if len(instances) == 0 { + fmt.Println("No KVS instances found") + return + } + + fmt.Println("KVS Instances:") + for _, inst := range instances { + status := "STOPPED" + if inst.Running { + status = "RUNNING" + } + fmt.Printf(" %-20s %s (PID %d)\n", inst.Name, status, inst.PID) + } +} + +// normalizeConfigPath ensures config path has .yaml extension if not specified +func normalizeConfigPath(path string) string { + // If path doesn't have an extension, add .yaml + if filepath.Ext(path) == "" { + return path + ".yaml" + } + return path +} + +// getConfigIdentifier returns the identifier for a config (basename without extension) +// This is used for PID files and status display +func getConfigIdentifier(path string) string { + basename := filepath.Base(path) + return strings.TrimSuffix(basename, filepath.Ext(basename)) +} + +func printHelp() { + help := `KVS - Distributed Key-Value Store + +Usage: + kvs [config.yaml] Run in foreground (default: ./config.yaml) + kvs start Start as daemon (.yaml extension optional) + kvs stop Stop daemon (.yaml extension optional) + kvs restart Restart daemon (.yaml extension optional) + kvs status [config] Show status (all instances if no config given) + kvs help Show this help + +Examples: + kvs # Run with ./config.yaml in foreground + kvs node1.yaml # Run with node1.yaml in foreground + kvs start node1 # Start node1.yaml as daemon + kvs start node1.yaml # Same as above + kvs stop node1 # Stop node1 daemon + kvs status # Show all running instances + kvs status node1 # Show status of node1 +` + fmt.Print(help) +} diff --git a/server/handlers.go b/server/handlers.go index 843f530..1f76c39 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -22,8 +22,6 @@ import ( "kvs/utils" ) - - // healthHandler returns server health status func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { mode := s.getMode() @@ -215,6 +213,104 @@ func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) { s.logger.WithField("path", path).Info("Value deleted") } +// getResourceMetadataHandler retrieves metadata for a KV resource +func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + path := vars["path"] + + // Get metadata from storage + metadata, err := s.authService.GetResourceMetadata(path) + if err == badger.ErrKeyNotFound { + http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound) + return + } + if err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + response := types.GetResourceMetadataResponse{ + OwnerUUID: metadata.OwnerUUID, + GroupUUID: metadata.GroupUUID, + Permissions: metadata.Permissions, + TTL: metadata.TTL, + CreatedAt: metadata.CreatedAt, + UpdatedAt: metadata.UpdatedAt, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// updateResourceMetadataHandler updates metadata for a KV resource +func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + path := vars["path"] + + // Parse request body + var req types.UpdateResourceMetadataRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest) + return + } + + // Get existing metadata or create new one + metadata, err := s.authService.GetResourceMetadata(path) + if err == badger.ErrKeyNotFound { + // Create new metadata with defaults + metadata = &types.ResourceMetadata{ + OwnerUUID: "", + GroupUUID: "", + Permissions: types.DefaultPermissions, + TTL: "", + CreatedAt: time.Now().Unix(), + UpdatedAt: time.Now().Unix(), + } + } else if err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Update only provided fields + if req.OwnerUUID != nil { + metadata.OwnerUUID = *req.OwnerUUID + } + if req.GroupUUID != nil { + metadata.GroupUUID = *req.GroupUUID + } + if req.Permissions != nil { + metadata.Permissions = *req.Permissions + } + metadata.UpdatedAt = time.Now().Unix() + + // Store updated metadata + if err := s.authService.SetResourceMetadata(path, metadata); err != nil { + s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + response := types.GetResourceMetadataResponse{ + OwnerUUID: metadata.OwnerUUID, + GroupUUID: metadata.GroupUUID, + Permissions: metadata.Permissions, + TTL: metadata.TTL, + CreatedAt: metadata.CreatedAt, + UpdatedAt: metadata.UpdatedAt, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + + s.logger.WithFields(logrus.Fields{ + "path": path, + "owner_uuid": metadata.OwnerUUID, + "group_uuid": metadata.GroupUUID, + }).Info("Resource metadata updated") +} + // isClusterMember checks if request is from a cluster member func (s *Server) isClusterMember(remoteAddr string) bool { host, _, err := net.SplitHostPort(remoteAddr) @@ -1271,3 +1367,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) { return s.revisionService.GetSpecificRevision(key, revision) } + +// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13) +func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) { + // Ensure clustering is enabled + if !s.config.ClusteringEnabled { + http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable) + return + } + + // Ensure cluster secret is configured + if s.config.ClusterSecret == "" { + s.logger.Error("Cluster secret is not configured") + http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError) + return + } + + // Return the cluster secret for secure bootstrap + response := map[string]string{ + "cluster_secret": s.config.ClusterSecret, + } + + s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap") + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} diff --git a/server/routes.go b/server/routes.go index 814e5f7..a927d76 100644 --- a/server/routes.go +++ b/server/routes.go @@ -1,6 +1,8 @@ package server import ( + "net/http" + "github.com/gorilla/mux" ) @@ -11,6 +13,18 @@ func (s *Server) setupRoutes() *mux.Router { // Health endpoint (always available) router.HandleFunc("/health", s.healthHandler).Methods("GET") + // Resource Metadata Management endpoints (Issue #12) - Must come BEFORE general KV routes + // These need to be registered first to prevent /kv/{path:.+} from matching metadata paths + if s.config.AuthEnabled { + router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware( + []string{"admin:users:read"}, nil, "", + )(s.getResourceMetadataHandler)).Methods("GET") + + router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware( + []string{"admin:users:update"}, nil, "", + )(s.updateResourceMetadataHandler)).Methods("PUT") + } + // KV endpoints (with conditional authentication based on anonymous access settings) // GET endpoint - require auth if anonymous read is disabled if s.config.AuthEnabled && !s.config.AllowAnonymousRead { @@ -20,7 +34,7 @@ func (s *Server) setupRoutes() *mux.Router { } else { router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") } - + // PUT endpoint - require auth if anonymous write is disabled if s.config.AuthEnabled && !s.config.AllowAnonymousWrite { router.Handle("/kv/{path:.+}", s.authService.Middleware( @@ -29,7 +43,7 @@ func (s *Server) setupRoutes() *mux.Router { } else { router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") } - + // DELETE endpoint - always require authentication (no anonymous delete) if s.config.AuthEnabled { router.Handle("/kv/{path:.+}", s.authService.Middleware( @@ -41,16 +55,32 @@ func (s *Server) setupRoutes() *mux.Router { // Member endpoints (available when clustering is enabled) if s.config.ClusteringEnabled { + // GET /members/ is unprotected for monitoring/inspection router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") - router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") - router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") - router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") - router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") - // Merkle Tree endpoints (clustering feature) - router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") - router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") - router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") + // Apply cluster authentication middleware to all cluster communication endpoints + if s.clusterAuthService != nil { + router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST") + router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE") + router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST") + router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST") + + // Merkle Tree endpoints (clustering feature) + router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET") + router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST") + router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST") + } else { + // Fallback to unprotected endpoints (for backwards compatibility) + router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") + router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") + router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") + router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") + + // Merkle Tree endpoints (clustering feature) + router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") + router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") + router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") + } } // Authentication and user management endpoints (available when auth is enabled) @@ -59,15 +89,15 @@ func (s *Server) setupRoutes() *mux.Router { router.Handle("/api/users", s.authService.Middleware( []string{"admin:users:create"}, nil, "", )(s.createUserHandler)).Methods("POST") - + router.Handle("/api/users/{uuid}", s.authService.Middleware( []string{"admin:users:read"}, nil, "", )(s.getUserHandler)).Methods("GET") - + router.Handle("/api/users/{uuid}", s.authService.Middleware( []string{"admin:users:update"}, nil, "", )(s.updateUserHandler)).Methods("PUT") - + router.Handle("/api/users/{uuid}", s.authService.Middleware( []string{"admin:users:delete"}, nil, "", )(s.deleteUserHandler)).Methods("DELETE") @@ -76,15 +106,15 @@ func (s *Server) setupRoutes() *mux.Router { router.Handle("/api/groups", s.authService.Middleware( []string{"admin:groups:create"}, nil, "", )(s.createGroupHandler)).Methods("POST") - + router.Handle("/api/groups/{uuid}", s.authService.Middleware( []string{"admin:groups:read"}, nil, "", )(s.getGroupHandler)).Methods("GET") - + router.Handle("/api/groups/{uuid}", s.authService.Middleware( []string{"admin:groups:update"}, nil, "", )(s.updateGroupHandler)).Methods("PUT") - + router.Handle("/api/groups/{uuid}", s.authService.Middleware( []string{"admin:groups:delete"}, nil, "", )(s.deleteGroupHandler)).Methods("DELETE") @@ -93,6 +123,12 @@ func (s *Server) setupRoutes() *mux.Router { router.Handle("/api/tokens", s.authService.Middleware( []string{"admin:tokens:create"}, nil, "", )(s.createTokenHandler)).Methods("POST") + + // Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication + // Allows authenticated administrators to retrieve the cluster secret for new nodes + router.Handle("/auth/cluster-bootstrap", s.authService.Middleware( + []string{"admin:tokens:create"}, nil, "", + )(s.clusterBootstrapHandler)).Methods("GET") } // Revision History endpoints (available when revision history is enabled) diff --git a/server/server.go b/server/server.go index 22735a9..41b6e37 100644 --- a/server/server.go +++ b/server/server.go @@ -50,7 +50,8 @@ type Server struct { backupMu sync.RWMutex // Protects backup status // Authentication service - authService *auth.AuthService + authService *auth.AuthService + clusterAuthService *auth.ClusterAuthService } // NewServer initializes and returns a new Server instance @@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) { // Initialize authentication service server.authService = auth.NewAuthService(db, logger, config) + // Initialize cluster authentication service (Issue #13) + if config.ClusteringEnabled { + server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger) + } + // Setup initial root account if needed (Issue #3) if config.AuthEnabled { if err := server.setupRootAccount(); err != nil { @@ -219,7 +225,7 @@ func (s *Server) setupRootAccount() error { func (s *Server) createRootUserAndToken() error { rootNickname := "root" adminGroupName := "admin" - + // Generate UUIDs rootUserUUID := "root-" + time.Now().Format("20060102-150405") adminGroupUUID := "admin-" + time.Now().Format("20060102-150405") @@ -234,7 +240,7 @@ func (s *Server) createRootUserAndToken() error { UpdatedAt: now, } - // Create root user + // Create root user rootUser := types.User{ UUID: rootUserUUID, NicknameHash: hashUserNickname(rootNickname), @@ -251,7 +257,7 @@ func (s *Server) createRootUserAndToken() error { // Create API token with full administrative scopes adminScopes := []string{ "admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete", - "admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete", + "admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete", "admin:tokens:create", "admin:tokens:revoke", "read", "write", "delete", } @@ -269,13 +275,13 @@ func (s *Server) createRootUserAndToken() error { // Log the token securely (one-time display) s.logger.WithFields(logrus.Fields{ - "user_uuid": rootUserUUID, - "group_uuid": adminGroupUUID, - "expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339), - "expires_in": "24 hours", + "user_uuid": rootUserUUID, + "group_uuid": adminGroupUUID, + "expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339), + "expires_in": "24 hours", }).Warn("Root account created - SAVE THIS TOKEN:") - // Display token prominently + // Display token prominently fmt.Printf("\n" + strings.Repeat("=", 80) + "\n") fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n") fmt.Printf("===========================================\n") @@ -309,7 +315,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error { if err != nil { return fmt.Errorf("failed to marshal user data: %v", err) } - + if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil { return fmt.Errorf("failed to store user: %v", err) } @@ -319,7 +325,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error { if err != nil { return fmt.Errorf("failed to marshal group data: %v", err) } - + if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil { return fmt.Errorf("failed to store group: %v", err) } @@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error { return nil }) } - diff --git a/storage/compression.go b/storage/compression.go index 0a635de..487599c 100644 --- a/storage/compression.go +++ b/storage/compression.go @@ -2,7 +2,7 @@ package storage import ( "fmt" - + "github.com/klauspost/compress/zstd" ) @@ -57,4 +57,4 @@ func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, erro return nil, fmt.Errorf("decompressor not initialized") } return c.decompressor.DecodeAll(compressedData, nil) -} \ No newline at end of file +} diff --git a/storage/revision.go b/storage/revision.go index 25f35c2..c248701 100644 --- a/storage/revision.go +++ b/storage/revision.go @@ -34,10 +34,10 @@ func GetRevisionKey(baseKey string, revision int) string { func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error { // Get existing metadata to check current revisions metadataKey := auth.ResourceMetadataKey(key) - + var metadata types.ResourceMetadata var currentRevisions []int - + // Try to get existing metadata metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey)) if err == badger.ErrKeyNotFound { @@ -60,7 +60,7 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor if err != nil { return fmt.Errorf("failed to unmarshal metadata: %v", err) } - + // Extract current revisions (we store them as a custom field) if metadata.TTL == "" { currentRevisions = []int{} @@ -69,13 +69,13 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys } } - + // Revision rotation logic: shift existing revisions if len(currentRevisions) >= 3 { // Delete oldest revision (rev:3) oldestRevKey := GetRevisionKey(key, 3) txn.Delete([]byte(oldestRevKey)) - + // Shift rev:2 → rev:3 rev2Key := GetRevisionKey(key, 2) rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key)) @@ -83,8 +83,8 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor rev3Key := GetRevisionKey(key, 3) r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl) } - - // Shift rev:1 → rev:2 + + // Shift rev:1 → rev:2 rev1Key := GetRevisionKey(key, 1) rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key)) if err == nil { @@ -92,80 +92,80 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl) } } - + // Store current value as rev:1 currentValueBytes, err := json.Marshal(storedValue) if err != nil { return fmt.Errorf("failed to marshal current value for revision: %v", err) } - + rev1Key := GetRevisionKey(key, 1) err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl) if err != nil { return fmt.Errorf("failed to store revision 1: %v", err) } - + // Update metadata with new revision count metadata.UpdatedAt = time.Now().Unix() metadataBytes, err := json.Marshal(metadata) if err != nil { return fmt.Errorf("failed to marshal metadata: %v", err) } - + return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl) } // GetRevisionHistory retrieves all available revisions for a given key func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) { var revisions []map[string]interface{} - + err := r.storage.db.View(func(txn *badger.Txn) error { // Check revisions 1, 2, 3 for rev := 1; rev <= 3; rev++ { revKey := GetRevisionKey(key, rev) - + revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) if err == badger.ErrKeyNotFound { continue // Skip missing revisions } else if err != nil { return fmt.Errorf("failed to retrieve revision %d: %v", rev, err) } - + var storedValue types.StoredValue err = json.Unmarshal(revData, &storedValue) if err != nil { return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err) } - + var data interface{} err = json.Unmarshal(storedValue.Data, &data) if err != nil { return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err) } - + revision := map[string]interface{}{ "revision": rev, "uuid": storedValue.UUID, "timestamp": storedValue.Timestamp, "data": data, } - + revisions = append(revisions, revision) } - + return nil }) - + if err != nil { return nil, err } - + // Sort revisions by revision number (newest first) // Note: they're already in order since we iterate 1->3, but reverse for newest first for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 { revisions[i], revisions[j] = revisions[j], revisions[i] } - + return revisions, nil } @@ -174,23 +174,23 @@ func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types. if revision < 1 || revision > 3 { return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision) } - + var storedValue types.StoredValue err := r.storage.db.View(func(txn *badger.Txn) error { revKey := GetRevisionKey(key, revision) - + revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) if err != nil { return err } - + return json.Unmarshal(revData, &storedValue) }) - + if err != nil { return nil, err } - + return &storedValue, nil } @@ -200,15 +200,15 @@ func GetRevisionFromPath(path string) (string, int, error) { if len(parts) < 4 || parts[len(parts)-2] != "rev" { return "", 0, fmt.Errorf("invalid revision path format") } - + revisionStr := parts[len(parts)-1] revision, err := strconv.Atoi(revisionStr) if err != nil { return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr) } - + // Reconstruct the base key without the "/rev/N" suffix baseKey := strings.Join(parts[:len(parts)-2], "/") - + return baseKey, revision, nil -} \ No newline at end of file +} diff --git a/storage/storage.go b/storage/storage.go index 952725d..d16ec03 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -12,17 +12,17 @@ import ( // StorageService handles all BadgerDB operations and data management type StorageService struct { - db *badger.DB - config *types.Config - compressionSvc *CompressionService - logger *logrus.Logger + db *badger.DB + config *types.Config + compressionSvc *CompressionService + logger *logrus.Logger } // NewStorageService creates a new storage service func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) { var compressionSvc *CompressionService var err error - + // Initialize compression if enabled if config.CompressionEnabled { compressionSvc, err = NewCompressionService() @@ -50,7 +50,7 @@ func (s *StorageService) Close() { func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error { var finalData []byte var err error - + // Compress data if compression is enabled if s.config.CompressionEnabled && s.compressionSvc != nil { finalData, err = s.compressionSvc.CompressData(data) @@ -60,14 +60,14 @@ func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, } else { finalData = data } - + entry := badger.NewEntry(key, finalData) - + // Apply TTL if specified if ttl > 0 { entry = entry.WithTTL(ttl) } - + return txn.SetEntry(entry) } @@ -77,7 +77,7 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte) if err != nil { return nil, err } - + var compressedData []byte err = item.Value(func(val []byte) error { compressedData = append(compressedData, val...) @@ -86,12 +86,12 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte) if err != nil { return nil, err } - + // Decompress data if compression is enabled if s.config.CompressionEnabled && s.compressionSvc != nil { return s.compressionSvc.DecompressData(compressedData) } - + return compressedData, nil } @@ -109,4 +109,4 @@ func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) { return compressedData, nil } return s.compressionSvc.DecompressData(compressedData) -} \ No newline at end of file +} diff --git a/types/types.go b/types/types.go index bcdf027..0c90b2c 100644 --- a/types/types.go +++ b/types/types.go @@ -13,20 +13,20 @@ type StoredValue struct { // User represents a system user type User struct { - UUID string `json:"uuid"` // Server-generated UUID + UUID string `json:"uuid"` // Server-generated UUID NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname - Groups []string `json:"groups"` // List of group UUIDs this user belongs to - CreatedAt int64 `json:"created_at"` // Unix timestamp - UpdatedAt int64 `json:"updated_at"` // Unix timestamp + Groups []string `json:"groups"` // List of group UUIDs this user belongs to + CreatedAt int64 `json:"created_at"` // Unix timestamp + UpdatedAt int64 `json:"updated_at"` // Unix timestamp } // Group represents a user group type Group struct { - UUID string `json:"uuid"` // Server-generated UUID - NameHash string `json:"name_hash"` // SHA3-512 hash of group name - Members []string `json:"members"` // List of user UUIDs in this group - CreatedAt int64 `json:"created_at"` // Unix timestamp - UpdatedAt int64 `json:"updated_at"` // Unix timestamp + UUID string `json:"uuid"` // Server-generated UUID + NameHash string `json:"name_hash"` // SHA3-512 hash of group name + Members []string `json:"members"` // List of user UUIDs in this group + CreatedAt int64 `json:"created_at"` // Unix timestamp + UpdatedAt int64 `json:"updated_at"` // Unix timestamp } // APIToken represents a JWT authentication token @@ -40,12 +40,12 @@ type APIToken struct { // ResourceMetadata contains ownership and permission information for stored resources type ResourceMetadata struct { - OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner - GroupUUID string `json:"group_uuid"` // UUID of the resource group - Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired) - TTL string `json:"ttl"` // Time-to-live duration (Go format) - CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created - UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated + OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner + GroupUUID string `json:"group_uuid"` // UUID of the resource group + Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired) + TTL string `json:"ttl"` // Time-to-live duration (Go format) + CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created + UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated } // Permission constants for POSIX-inspired ACL @@ -55,19 +55,19 @@ const ( PermOwnerDelete = 1 << 10 PermOwnerWrite = 1 << 9 PermOwnerRead = 1 << 8 - + // Group permissions (bits 7-4) PermGroupCreate = 1 << 7 PermGroupDelete = 1 << 6 PermGroupWrite = 1 << 5 PermGroupRead = 1 << 4 - + // Others permissions (bits 3-0) PermOthersCreate = 1 << 3 PermOthersDelete = 1 << 2 PermOthersWrite = 1 << 1 PermOthersRead = 1 << 0 - + // Default permissions: Owner(1111), Group(0110), Others(0010) DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) | (PermGroupWrite | PermGroupRead) | @@ -131,6 +131,22 @@ type CreateTokenResponse struct { ExpiresAt int64 `json:"expires_at"` } +// Resource Metadata Management API structures (Issue #12) +type GetResourceMetadataResponse struct { + OwnerUUID string `json:"owner_uuid"` + GroupUUID string `json:"group_uuid"` + Permissions int `json:"permissions"` + TTL string `json:"ttl"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` +} + +type UpdateResourceMetadataRequest struct { + OwnerUUID *string `json:"owner_uuid,omitempty"` + GroupUUID *string `json:"group_uuid,omitempty"` + Permissions *int `json:"permissions,omitempty"` +} + // Cluster and member management types type Member struct { ID string `json:"id"` @@ -231,50 +247,57 @@ type KVRangeResponse struct { // Configuration type Config struct { - NodeID string `yaml:"node_id"` - BindAddress string `yaml:"bind_address"` - Port int `yaml:"port"` - DataDir string `yaml:"data_dir"` - SeedNodes []string `yaml:"seed_nodes"` - ReadOnly bool `yaml:"read_only"` - LogLevel string `yaml:"log_level"` - GossipIntervalMin int `yaml:"gossip_interval_min"` - GossipIntervalMax int `yaml:"gossip_interval_max"` - SyncInterval int `yaml:"sync_interval"` - CatchupInterval int `yaml:"catchup_interval"` - BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` - ThrottleDelayMs int `yaml:"throttle_delay_ms"` - FetchDelayMs int `yaml:"fetch_delay_ms"` - + NodeID string `yaml:"node_id"` + BindAddress string `yaml:"bind_address"` + Port int `yaml:"port"` + DataDir string `yaml:"data_dir"` + SeedNodes []string `yaml:"seed_nodes"` + ReadOnly bool `yaml:"read_only"` + LogLevel string `yaml:"log_level"` + GossipIntervalMin int `yaml:"gossip_interval_min"` + GossipIntervalMax int `yaml:"gossip_interval_max"` + SyncInterval int `yaml:"sync_interval"` + CatchupInterval int `yaml:"catchup_interval"` + BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` + ThrottleDelayMs int `yaml:"throttle_delay_ms"` + FetchDelayMs int `yaml:"fetch_delay_ms"` + // Database compression configuration CompressionEnabled bool `yaml:"compression_enabled"` CompressionLevel int `yaml:"compression_level"` - + // TTL configuration - DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL - MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes - + DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL + MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes + // Rate limiting configuration RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format) - + // Tamper-evident logging configuration TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log - + // Backup system configuration - BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups - BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format - BackupPath string `yaml:"backup_path"` // Directory to store backups - BackupRetention int `yaml:"backup_retention"` // Days to keep backups - + BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups + BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format + BackupPath string `yaml:"backup_path"` // Directory to store backups + BackupRetention int `yaml:"backup_retention"` // Days to keep backups + // Feature toggles for optional functionalities AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history - + // Anonymous access control (Issue #5) - AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints - AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints -} \ No newline at end of file + AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints + AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints + + // Cluster authentication (Issue #13) + ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty) + ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication + ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file + ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file + ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only) +} diff --git a/utils/hash.go b/utils/hash.go index 1595b6a..ed23efa 100644 --- a/utils/hash.go +++ b/utils/hash.go @@ -22,4 +22,4 @@ func HashGroupName(groupname string) string { func HashToken(token string) string { return HashSHA3512(token) -} \ No newline at end of file +}