6 Commits

Author SHA1 Message Date
64680a6ece docs: document daemon process management commands
Update README.md and CLAUDE.md to document new process management:
- Add "Process Management" section with daemon commands
- Update all examples to use `./kvs start/stop/status` instead of `&` and `pkill`
- Document global PID/log directories (~/.kvs/)
- Update cluster setup examples
- Update development workflow
- Add daemon package to project structure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 23:10:25 +03:00
4c3fcbc45a test: refactor integration tests to use daemon commands
Update integration_test.sh to use new daemon management commands
instead of manual background processes and PIDs:
- Replace `kvs config.yaml &` with `kvs start config.yaml`
- Replace `kill $pid` with `kvs stop config.yaml`
- Update log file paths to use ~/.kvs/logs/
- Add integration_test/ directory to gitignore

All tests now use clean daemon lifecycle management.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 22:59:25 +03:00
a41e0d625c feat: add process management commands for daemon control
Add systemd-style subcommands for managing KVS instances:
- start <config>  - Daemonize and run in background
- stop <config>   - Gracefully stop daemon
- restart <config> - Restart daemon
- status [config] - Show status of all or specific instances

Key features:
- PID files stored in ~/.kvs/pids/ (global across all directories)
- Logs stored in ~/.kvs/logs/
- Config names support both 'node1' and 'node1.yaml' formats
- Backward compatible: 'kvs config.yaml' still runs in foreground
- Proper stale PID detection and cleanup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 22:56:16 +03:00
377af163f0 feat: implement resource metadata management API (issue #12)
Add API endpoints to manage ResourceMetadata (ownership, groups, permissions)
for KV resources. This enables administrators to configure granular access
control for stored data.

Changes:
- Add GetResourceMetadataResponse and UpdateResourceMetadataRequest types
- Add GetResourceMetadata and SetResourceMetadata methods to AuthService
- Add GET /kv/{path}/metadata endpoint (requires admin:users:read)
- Add PUT /kv/{path}/metadata endpoint (requires admin:users:update)
- Both endpoints protected by JWT authentication
- Metadata routes registered before general KV routes to prevent pattern conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 00:06:14 +03:00
852275945c fix: update bootstrap service and routes for cluster authentication
- Updated bootstrap service to use authenticated HTTP client with cluster auth headers
- Made GET /members/ endpoint unprotected for monitoring/inspection purposes
- All other cluster communication endpoints remain protected by cluster auth middleware

This ensures proper cluster formation while maintaining security for inter-node communication.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:27:15 +03:00
c7dcebb894 feat: implement secure cluster authentication (issue #13)
Implemented a comprehensive secure authentication mechanism for inter-node
cluster communication with the following features:

1. Global Cluster Secret (GCS)
   - Auto-generated cryptographically secure random secret (256-bit)
   - Configurable via YAML config file
   - Shared across all cluster nodes for authentication

2. Cluster Authentication Middleware
   - Validates X-Cluster-Secret and X-Node-ID headers
   - Applied to all cluster endpoints (/members/*, /merkle_tree/*, /kv_range)
   - Comprehensive logging of authentication attempts

3. Authenticated HTTP Client
   - Custom HTTP client with cluster auth headers
   - TLS support with configurable certificate verification
   - Protocol-aware (http/https based on TLS settings)

4. Secure Bootstrap Endpoint
   - New /auth/cluster-bootstrap endpoint
   - Protected by JWT authentication with admin scope
   - Allows new nodes to securely obtain cluster secret

5. Updated Cluster Communication
   - All gossip protocol requests include auth headers
   - All Merkle tree sync requests include auth headers
   - All data replication requests include auth headers

6. Configuration
   - cluster_secret: Shared secret (auto-generated if not provided)
   - cluster_tls_enabled: Enable TLS for inter-node communication
   - cluster_tls_cert_file: Path to TLS certificate
   - cluster_tls_key_file: Path to TLS private key
   - cluster_tls_skip_verify: Skip TLS verification (testing only)

This implementation addresses the security vulnerability of unprotected
cluster endpoints and provides a flexible, secure approach to protecting
internal cluster communication while allowing for automated node bootstrapping.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:19:40 +03:00
34 changed files with 1510 additions and 332 deletions

2
.gitignore vendored
View File

@@ -1,6 +1,8 @@
.claude/
.kvs/
data/
data*/
integration_test/
*.yaml
!config.yaml
kvs

View File

@@ -10,10 +10,16 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
go build -o kvs .
# Run with default config (auto-generates config.yaml)
./kvs
./kvs start config.yaml
# Run with custom config
./kvs /path/to/config.yaml
./kvs start /path/to/config.yaml
# Check running instances
./kvs status
# Stop instance
./kvs stop config
# Run comprehensive integration tests
./integration_test.sh
@@ -25,6 +31,32 @@ go run test_conflict.go data1 data2
go build -o kvs . && ./integration_test.sh
```
### Process Management Commands
```bash
# Start as background daemon
./kvs start <config.yaml> # .yaml extension optional
# Stop daemon
./kvs stop <config> # Graceful SIGTERM shutdown
# Restart daemon
./kvs restart <config> # Stop then start
# Show status
./kvs status # All instances
./kvs status <config> # Specific instance
# Run in foreground (for debugging)
./kvs <config.yaml> # Logs to stdout, blocks terminal
# View daemon logs
tail -f ~/.kvs/logs/kvs_<config>.yaml.log
# Global state directories
~/.kvs/pids/ # PID files (works from any directory)
~/.kvs/logs/ # Daemon log files
```
### Development Workflow
```bash
# Format and check code
@@ -38,11 +70,25 @@ go mod tidy
go build .
# Test specific cluster scenarios
./kvs node1.yaml & # Terminal 1
./kvs node2.yaml & # Terminal 2
./kvs start node1.yaml
./kvs start node2.yaml
# Wait for cluster formation
sleep 5
# Test data operations
curl -X PUT http://localhost:8081/kv/test/data -H "Content-Type: application/json" -d '{"test":"data"}'
curl http://localhost:8082/kv/test/data # Should replicate within ~30 seconds
pkill kvs
# Check daemon status
./kvs status
# View logs
tail -f ~/.kvs/logs/kvs_node1.yaml.log
# Cleanup
./kvs stop node1
./kvs stop node2
```
## Architecture Overview
@@ -59,6 +105,7 @@ KVS is a **distributed, eventually consistent key-value store** built around thr
#### Modular Package Design
- **`auth/`** - Complete JWT authentication system with POSIX-inspired permissions
- **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees)
- **`daemon/`** - Process management (daemonization, PID files, lifecycle)
- **`storage/`** - BadgerDB abstraction with compression and revision history
- **`server/`** - HTTP handlers, routing, and lifecycle management
- **`features/`** - Utility functions for TTL, rate limiting, tamper logging, backup
@@ -147,9 +194,18 @@ Creates two BadgerDB instances with intentionally conflicting data (same path, s
- **Bootstrap sync**: Up to 30 days of historical data for new nodes
#### Main Entry Point Flow
1. `main.go` loads config (auto-generates default if missing)
2. `server.NewServer()` initializes all subsystems
3. Graceful shutdown handling with `SIGINT`/`SIGTERM`
4. All business logic delegated to modular packages
1. `main.go` parses command-line arguments for subcommands (`start`, `stop`, `status`, `restart`)
2. For daemon mode: `daemon.Daemonize()` spawns background process and manages PID files
3. For server mode: loads config (auto-generates default if missing)
4. `server.NewServer()` initializes all subsystems
5. Graceful shutdown handling with `SIGINT`/`SIGTERM`
6. All business logic delegated to modular packages
#### Daemon Architecture
- **PID Management**: Global PID files stored in `~/.kvs/pids/` for cross-directory access
- **Logging**: Daemon logs written to `~/.kvs/logs/{config-name}.log`
- **Process Lifecycle**: Spawns detached process via `exec.Command()` with `Setsid: true`
- **Config Normalization**: Supports both `node1` and `node1.yaml` formats
- **Stale PID Detection**: Checks process existence via `Signal(0)` before operations
This architecture enables easy feature addition, comprehensive testing, and reliable operation in distributed environments while maintaining simplicity for single-node deployments.

115
README.md
View File

@@ -69,11 +69,67 @@ go build -o kvs .
### Quick Test
```bash
# Start standalone node
./kvs
# Start standalone node (uses config.yaml if it exists, or creates it)
./kvs start config.yaml
# Test the API
curl http://localhost:8080/health
# Check status
./kvs status
# Stop when done
./kvs stop config
```
## 🎮 Process Management
KVS includes systemd-style daemon commands for easy process management:
```bash
# Start as background daemon
./kvs start config.yaml # or just: ./kvs start config
./kvs start node1.yaml # Start with custom config
# Check status
./kvs status # Show all running instances
./kvs status node1 # Show specific instance
# Stop daemon
./kvs stop node1 # Graceful shutdown
# Restart daemon
./kvs restart node1 # Stop and start
# Run in foreground (traditional)
./kvs node1.yaml # Logs to stdout
```
### Daemon Features
- **Global PID tracking**: PID files stored in `~/.kvs/pids/` (works from any directory)
- **Automatic logging**: Logs written to `~/.kvs/logs/{config-name}.log`
- **Flexible naming**: Config extension optional (`node1` or `node1.yaml` both work)
- **Graceful shutdown**: SIGTERM sent for clean shutdown
- **Stale PID cleanup**: Automatically detects and cleans dead processes
- **Multi-instance**: Run multiple KVS instances on same machine
### Example Workflow
```bash
# Start 3-node cluster as daemons
./kvs start node1.yaml
./kvs start node2.yaml
./kvs start node3.yaml
# Check cluster status
./kvs status
# View logs
tail -f ~/.kvs/logs/kvs_node1.yaml.log
# Stop entire cluster
./kvs stop node1
./kvs stop node2
./kvs stop node3
```
## ⚙️ Configuration
@@ -308,17 +364,23 @@ clustering_enabled: true
#### Start the Cluster
```bash
# Terminal 1
./kvs node1.yaml
# Terminal 2 (wait a few seconds)
./kvs node2.yaml
# Terminal 3 (wait a few seconds)
./kvs node3.yaml
# Start as daemons
./kvs start node1.yaml
sleep 2
./kvs start node2.yaml
sleep 2
./kvs start node3.yaml
# Verify cluster formation
curl http://localhost:8081/members/ # Should show all 3 nodes
# Check daemon status
./kvs status
# Stop cluster when done
./kvs stop node1
./kvs stop node2
./kvs stop node3
```
## 🔄 How It Works
@@ -364,9 +426,10 @@ go build -o kvs .
./integration_test.sh
# Manual basic functionality test
./kvs &
./kvs start config.yaml
sleep 2
curl http://localhost:8080/health
pkill kvs
./kvs stop config
# Manual cluster test (requires creating configs)
echo 'node_id: "test1"
@@ -379,8 +442,9 @@ port: 8082
seed_nodes: ["127.0.0.1:8081"]
auth_enabled: false' > test2.yaml
./kvs test1.yaml &
./kvs test2.yaml &
./kvs start test1.yaml
sleep 2
./kvs start test2.yaml
# Test data replication (wait for cluster formation)
sleep 10
@@ -393,7 +457,8 @@ sleep 30
curl http://localhost:8082/kv/test/data
# Cleanup
pkill kvs
./kvs stop test1
./kvs stop test2
rm test1.yaml test2.yaml
```
@@ -418,17 +483,22 @@ auth_enabled: false
log_level: "debug"' > conflict2.yaml
# Start nodes with conflicting data
./kvs conflict1.yaml &
./kvs conflict2.yaml &
./kvs start conflict1.yaml
sleep 2
./kvs start conflict2.yaml
# Watch logs for conflict resolution
tail -f ~/.kvs/logs/kvs_conflict1.yaml.log ~/.kvs/logs/kvs_conflict2.yaml.log &
# Both nodes will converge within ~10-30 seconds
# Check final state
sleep 30
curl http://localhost:9111/kv/test/conflict/data
curl http://localhost:9112/kv/test/conflict/data
pkill kvs
# Cleanup
./kvs stop conflict1
./kvs stop conflict2
rm conflict1.yaml conflict2.yaml
```
@@ -474,6 +544,10 @@ kvs/
├── config/ # Configuration management
│ └── config.go # Config loading & defaults
├── daemon/ # Process management
│ ├── daemonize.go # Background process spawning
│ └── pid.go # PID file management
├── features/ # Utility features
│ ├── auth.go # Auth utilities
│ ├── backup.go # Backup system
@@ -580,8 +654,9 @@ type StoredValue struct {
## 🛡️ Production Considerations
### Deployment
- Use systemd or similar for process management
- Configure log rotation for JSON logs
- Built-in daemon commands (`start`/`stop`/`restart`/`status`) for process management
- Alternatively, use systemd or similar for advanced orchestration
- Logs automatically written to `~/.kvs/logs/` (configure log rotation)
- Set up monitoring for `/health` endpoint
- Use reverse proxy (nginx/traefik) for TLS and load balancing

View File

@@ -198,6 +198,40 @@ func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
}
// GetResourceMetadata retrieves metadata for a resource
func (s *AuthService) GetResourceMetadata(resourceKey string) (*types.ResourceMetadata, error) {
var metadata types.ResourceMetadata
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &metadata)
})
})
if err != nil {
return nil, err
}
return &metadata, nil
}
// SetResourceMetadata stores metadata for a resource
func (s *AuthService) SetResourceMetadata(resourceKey string, metadata *types.ResourceMetadata) error {
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %v", err)
}
return s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(ResourceMetadataKey(resourceKey)), metadataBytes)
})
}
// GetAuthContext retrieves auth context from request context
func GetAuthContext(ctx context.Context) *AuthContext {
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {

77
auth/cluster.go Normal file
View File

@@ -0,0 +1,77 @@
package auth
import (
"net/http"
"github.com/sirupsen/logrus"
)
// ClusterAuthService handles authentication for inter-cluster communication
type ClusterAuthService struct {
clusterSecret string
logger *logrus.Logger
}
// NewClusterAuthService creates a new cluster authentication service
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
return &ClusterAuthService{
clusterSecret: clusterSecret,
logger: logger,
}
}
// Middleware validates cluster authentication headers
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract authentication headers
clusterSecret := r.Header.Get("X-Cluster-Secret")
nodeID := r.Header.Get("X-Node-ID")
// Log authentication attempt
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
"method": r.Method,
}).Debug("Cluster authentication attempt")
// Validate cluster secret
if clusterSecret == "" {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Cluster-Secret header")
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
return
}
if clusterSecret != s.clusterSecret {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Invalid cluster secret")
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
return
}
// Validate node ID is present
if nodeID == "" {
s.logger.WithFields(logrus.Fields{
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Node-ID header")
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
return
}
// Authentication successful
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"path": r.URL.Path,
}).Debug("Cluster authentication successful")
next.ServeHTTP(w, r)
})
}

View File

@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
return false
}
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/members/join", seedAddr)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create join request")
return false
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
s.logger.WithFields(logrus.Fields{
"seed": seedAddr,

View File

@@ -17,13 +17,13 @@ import (
// GossipService handles gossip protocol operations
type GossipService struct {
config *types.Config
members map[string]*types.Member
membersMu sync.RWMutex
logger *logrus.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
config *types.Config
members map[string]*types.Member
membersMu sync.RWMutex
logger *logrus.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewGossipService creates a new gossip service
@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
return err
}
// Send HTTP request to peer
client := &http.Client{Timeout: 5 * time.Second}
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
// Send HTTP request to peer with cluster authentication
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create gossip request")
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,

43
cluster/http_client.go Normal file
View File

@@ -0,0 +1,43 @@
package cluster
import (
"crypto/tls"
"net/http"
"time"
"kvs/types"
)
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
client := &http.Client{
Timeout: timeout,
}
// Configure TLS if enabled
if config.ClusterTLSEnabled {
tlsConfig := &tls.Config{
InsecureSkipVerify: config.ClusterTLSSkipVerify,
}
client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
return client
}
// AddClusterAuthHeaders adds authentication headers to an HTTP request
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
req.Header.Set("X-Node-ID", config.NodeID)
}
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
func GetProtocol(config *types.Config) string {
if config.ClusterTLSEnabled {
return "https"
}
return "http"
}

View File

@@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() {
// 2. Compare roots and start recursive diffing if they differ
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
s.logger.WithFields(logrus.Fields{
"peer": peer.Address,
"local_root": hex.EncodeToString(localRoot.Hash),
"remote_root": hex.EncodeToString(remoteRoot.Hash),
"peer": peer.Address,
"local_root": hex.EncodeToString(localRoot.Hash),
"remote_root": hex.EncodeToString(remoteRoot.Hash),
}).Info("Merkle roots differ, starting recursive diff")
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
} else {
@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
// requestMerkleRoot requests the Merkle root from a peer
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
resp, err := client.Get(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re
// Hashes differ, need to go deeper.
// Request children from the remote peer for the current range.
req := types.MerkleTreeDiffRequest{
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
LocalHash: localNode.Hash, // Our hash for this range
}
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
// fetchSingleKVFromPeer fetches a single KV pair from a peer
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
client := &http.Client{Timeout: 5 * time.Second}
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
resp, err := client.Get(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -418,12 +432,12 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
// If we can't find membership info, fall back to UUID comparison for deterministic result
if localMember == nil || remoteMember == nil {
s.logger.WithFields(logrus.Fields{
"key": key,
"peerAddress": peerAddress,
"localNodeID": localNodeID,
"localMember": localMember != nil,
"remoteMember": remoteMember != nil,
"totalMembers": len(members),
"key": key,
"peerAddress": peerAddress,
"localNodeID": localNodeID,
"localMember": localMember != nil,
"remoteMember": remoteMember != nil,
"totalMembers": len(members),
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
if remote.UUID < local.UUID {
// Remote UUID lexically smaller (deterministic choice)
@@ -443,9 +457,9 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
err := s.storeReplicatedDataWithMetadata(key, remote)
if err == nil {
s.logger.WithFields(logrus.Fields{
"key": key,
"local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp,
"key": key,
"local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp,
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
}
return err
@@ -453,24 +467,32 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
// Local node is older or equal, keep local data
s.logger.WithFields(logrus.Fields{
"key": key,
"local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp,
"key": key,
"local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp,
}).Info("Conflict resolved: local data wins (oldest-node rule)")
return nil
}
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(req)
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(reqData)
if err != nil {
return nil, err
}
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
req := types.KVRangeRequest{
reqData := types.KVRangeRequest{
StartKey: startKey,
EndKey: endKey,
Limit: 0, // No limit
}
jsonData, err := json.Marshal(req)
jsonData, err := json.Marshal(reqData)
if err != nil {
return err
}
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil {
return err
}

View File

@@ -1,12 +1,14 @@
package config
import (
"crypto/rand"
"encoding/base64"
"fmt"
"os"
"path/filepath"
"kvs/types"
"gopkg.in/yaml.v3"
"kvs/types"
)
// Default configuration
@@ -33,8 +35,8 @@ func Default() *types.Config {
CompressionLevel: 3, // Balance between performance and compression ratio
// Default TTL and size limit settings
DefaultTTL: "0", // No default TTL
MaxJSONSize: 1048576, // 1MB default max JSON size
DefaultTTL: "0", // No default TTL
MaxJSONSize: 1048576, // 1MB default max JSON size
// Default rate limiting settings
RateLimitRequests: 100, // 100 requests per window
@@ -57,11 +59,31 @@ func Default() *types.Config {
RevisionHistoryEnabled: true,
// Default anonymous access settings (both disabled by default for security)
AllowAnonymousRead: false,
AllowAnonymousWrite: false,
AllowAnonymousRead: false,
AllowAnonymousWrite: false,
// Default cluster authentication settings (Issue #13)
ClusterSecret: generateClusterSecret(),
ClusterTLSEnabled: false,
ClusterTLSCertFile: "",
ClusterTLSKeyFile: "",
ClusterTLSSkipVerify: false,
}
}
// generateClusterSecret generates a cryptographically secure random cluster secret
func generateClusterSecret() string {
// Generate 32 bytes (256 bits) of random data
randomBytes := make([]byte, 32)
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to a warning - this should never happen in practice
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
return ""
}
// Encode as base64 for easy configuration file storage
return base64.StdEncoding.EncodeToString(randomBytes)
}
// Load configuration from file or create default
func Load(configPath string) (*types.Config, error) {
config := Default()
@@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
return nil, fmt.Errorf("failed to parse config file: %v", err)
}
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
if config.ClusteringEnabled && config.ClusterSecret == "" {
config.ClusterSecret = generateClusterSecret()
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
}
return config, nil
}

87
daemon/daemonize.go Normal file
View File

@@ -0,0 +1,87 @@
package daemon
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"syscall"
)
// GetLogFilePath returns the log file path for a given config file
func GetLogFilePath(configPath string) (string, error) {
logDir, err := getLogDir()
if err != nil {
return "", err
}
absConfigPath, err := filepath.Abs(configPath)
if err != nil {
return "", fmt.Errorf("failed to get absolute config path: %w", err)
}
basename := filepath.Base(configPath)
name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename
return filepath.Join(logDir, name+".log"), nil
}
// Daemonize spawns the process as a daemon and returns
func Daemonize(configPath string) error {
// Get absolute path to the current executable
executable, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
// Get absolute path to config
absConfigPath, err := filepath.Abs(configPath)
if err != nil {
return fmt.Errorf("failed to get absolute config path: %w", err)
}
// Check if already running
_, running, err := ReadPID(configPath)
if err != nil {
return fmt.Errorf("failed to check if instance is running: %w", err)
}
if running {
return fmt.Errorf("instance is already running")
}
// Spawn the process in background with --daemon flag
cmd := exec.Command(executable, "--daemon", absConfigPath)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setsid: true, // Create new session
}
// Redirect stdout/stderr to log file
logDir, err := getLogDir()
if err != nil {
return fmt.Errorf("failed to get log directory: %w", err)
}
if err := os.MkdirAll(logDir, 0755); err != nil {
return fmt.Errorf("failed to create log directory: %w", err)
}
basename := filepath.Base(configPath)
name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename
logFile := filepath.Join(logDir, name+".log")
f, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open log file: %w", err)
}
defer f.Close()
cmd.Stdout = f
cmd.Stderr = f
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start daemon: %w", err)
}
fmt.Printf("Started KVS instance '%s' (PID will be written by daemon)\n", filepath.Base(configPath))
fmt.Printf("Logs: %s\n", logFile)
return nil
}

171
daemon/pid.go Normal file
View File

@@ -0,0 +1,171 @@
package daemon
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
)
// getPIDDir returns the absolute path to the PID directory
func getPIDDir() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to get user home directory: %w", err)
}
return filepath.Join(homeDir, ".kvs", "pids"), nil
}
// getLogDir returns the absolute path to the log directory
func getLogDir() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to get user home directory: %w", err)
}
return filepath.Join(homeDir, ".kvs", "logs"), nil
}
// GetPIDFilePath returns the PID file path for a given config file
func GetPIDFilePath(configPath string) string {
pidDir, err := getPIDDir()
if err != nil {
// Fallback to local directory
pidDir = ".kvs/pids"
}
// Extract basename without extension
basename := filepath.Base(configPath)
name := strings.TrimSuffix(basename, filepath.Ext(basename))
return filepath.Join(pidDir, name+".pid")
}
// EnsurePIDDir creates the PID directory if it doesn't exist
func EnsurePIDDir() error {
pidDir, err := getPIDDir()
if err != nil {
return err
}
return os.MkdirAll(pidDir, 0755)
}
// WritePID writes the current process PID to a file
func WritePID(configPath string) error {
if err := EnsurePIDDir(); err != nil {
return fmt.Errorf("failed to create PID directory: %w", err)
}
pidFile := GetPIDFilePath(configPath)
pid := os.Getpid()
return os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", pid)), 0644)
}
// ReadPID reads the PID from a file and checks if the process is running
func ReadPID(configPath string) (int, bool, error) {
pidFile := GetPIDFilePath(configPath)
data, err := os.ReadFile(pidFile)
if err != nil {
if os.IsNotExist(err) {
return 0, false, nil
}
return 0, false, fmt.Errorf("failed to read PID file: %w", err)
}
pidStr := strings.TrimSpace(string(data))
pid, err := strconv.Atoi(pidStr)
if err != nil {
return 0, false, fmt.Errorf("invalid PID in file: %w", err)
}
// Check if process is actually running
process, err := os.FindProcess(pid)
if err != nil {
return pid, false, nil
}
// Send signal 0 to check if process exists
err = process.Signal(syscall.Signal(0))
if err != nil {
return pid, false, nil
}
return pid, true, nil
}
// RemovePID removes the PID file
func RemovePID(configPath string) error {
pidFile := GetPIDFilePath(configPath)
err := os.Remove(pidFile)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove PID file: %w", err)
}
return nil
}
// ListRunningInstances returns a list of running KVS instances
func ListRunningInstances() ([]InstanceInfo, error) {
var instances []InstanceInfo
pidDir, err := getPIDDir()
if err != nil {
return nil, err
}
// Check if PID directory exists
if _, err := os.Stat(pidDir); os.IsNotExist(err) {
return instances, nil
}
entries, err := os.ReadDir(pidDir)
if err != nil {
return nil, fmt.Errorf("failed to read PID directory: %w", err)
}
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pid") {
continue
}
name := strings.TrimSuffix(entry.Name(), ".pid")
configPath := name + ".yaml" // Assume .yaml extension
pid, running, err := ReadPID(configPath)
if err != nil {
continue
}
instances = append(instances, InstanceInfo{
Name: name,
PID: pid,
Running: running,
})
}
return instances, nil
}
// InstanceInfo holds information about a KVS instance
type InstanceInfo struct {
Name string
PID int
Running bool
}
// StopProcess stops a process by PID
func StopProcess(pid int) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process: %w", err)
}
// Try graceful shutdown first (SIGTERM)
if err := process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("failed to send SIGTERM: %w", err)
}
return nil
}

View File

@@ -45,6 +45,7 @@ cleanup() {
log_info "Cleaning up test environment..."
pkill -f "$BINARY" 2>/dev/null || true
rm -rf "$TEST_DIR" 2>/dev/null || true
rm -rf "$HOME/.kvs" 2>/dev/null || true # Clean up PID and log files from home dir
sleep 2 # Allow processes to fully terminate
}
@@ -64,6 +65,15 @@ wait_for_service() {
return 1
}
# Get log file path for a config file (matches daemon naming convention)
get_log_file() {
local config=$1
local abs_path=$(realpath "$config")
local basename=$(basename "$config")
local dirname=$(basename $(dirname "$abs_path"))
echo "$HOME/.kvs/logs/${dirname}_${basename}.log"
}
# Test 1: Build verification
test_build() {
test_start "Binary build verification"
@@ -95,9 +105,9 @@ allow_anonymous_read: true
allow_anonymous_write: true
EOF
# Start node
$BINARY basic.yaml >/dev/null 2>&1 &
local pid=$!
# Start node using daemon command
$BINARY start basic.yaml >/dev/null 2>&1
sleep 2
if wait_for_service 8090; then
# Test basic CRUD
@@ -106,7 +116,7 @@ EOF
-d '{"message":"hello world"}')
local get_result=$(curl -s http://localhost:8090/kv/test/basic)
local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) # Adjusted jq path
local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null)
if [ "$message" = "hello world" ]; then
log_success "Basic CRUD operations work"
@@ -117,14 +127,17 @@ EOF
log_error "Basic test node failed to start"
fi
kill $pid 2>/dev/null || true
sleep 2
$BINARY stop basic.yaml >/dev/null 2>&1
sleep 1
}
# Test 3: Cluster formation
test_cluster_formation() {
test_start "2-node cluster formation and Merkle Tree replication"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
# Node 1 config
cat > cluster1.yaml <<EOF
node_id: "cluster-1"
@@ -138,6 +151,7 @@ gossip_interval_max: 10
sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Node 2 config
@@ -153,25 +167,25 @@ gossip_interval_max: 10
sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Start nodes
$BINARY cluster1.yaml >/dev/null 2>&1 &
local pid1=$!
# Start nodes using daemon commands
$BINARY start cluster1.yaml >/dev/null 2>&1
sleep 2
if ! wait_for_service 8101; then
log_error "Cluster node 1 failed to start"
kill $pid1 2>/dev/null || true
$BINARY stop cluster1.yaml >/dev/null 2>&1
return 1
fi
sleep 2 # Give node 1 a moment to fully initialize
$BINARY cluster2.yaml >/dev/null 2>&1 &
local pid2=$!
$BINARY start cluster2.yaml >/dev/null 2>&1
sleep 2
if ! wait_for_service 8102; then
log_error "Cluster node 2 failed to start"
kill $pid1 $pid2 2>/dev/null || true
$BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1
return 1
fi
@@ -220,8 +234,8 @@ EOF
log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)"
fi
kill $pid1 $pid2 2>/dev/null || true
sleep 2
$BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1
sleep 1
}
# Test 4: Conflict resolution (Merkle Tree based)
@@ -239,6 +253,9 @@ test_conflict_resolution() {
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
cd "$TEST_DIR"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
# Create configs
cat > conflict1.yaml <<EOF
node_id: "conflict-1"
@@ -250,6 +267,7 @@ log_level: "info"
sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
cat > conflict2.yaml <<EOF
@@ -262,17 +280,17 @@ log_level: "info"
sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF
# Start nodes
# Start nodes using daemon commands
# Node 1 started first, making it "older" for tie-breaker if timestamps are equal
"$BINARY" conflict1.yaml >conflict1.log 2>&1 &
local pid1=$!
$BINARY start conflict1.yaml >/dev/null 2>&1
sleep 2
if wait_for_service 8111; then
$BINARY start conflict2.yaml >/dev/null 2>&1
sleep 2
$BINARY conflict2.yaml >conflict2.log 2>&1 &
local pid2=$!
if wait_for_service 8112; then
# Get initial data (full StoredValue)
@@ -334,8 +352,10 @@ EOF
log_error "Resolved data has inconsistent UUID/Timestamp: N1_UUID=$node1_final_uuid, N1_TS=$node1_final_timestamp, N2_UUID=$node2_final_uuid, N2_TS=$node2_final_timestamp"
fi
# Optionally, check logs for conflict resolution messages
if grep -q "Conflict resolved" conflict1.log conflict2.log 2>/dev/null; then
# Check logs for conflict resolution messages
local log1=$(get_log_file conflict1.yaml)
local log2=$(get_log_file conflict2.yaml)
if grep -q "Conflict resolved" "$log1" "$log2" 2>/dev/null; then
log_success "Conflict resolution messages found in logs"
else
log_error "No 'Conflict resolved' messages found in logs, but data converged."
@@ -348,13 +368,13 @@ EOF
log_error "Conflict node 2 failed to start"
fi
kill $pid2 2>/dev/null || true
$BINARY stop conflict2.yaml >/dev/null 2>&1
else
log_error "Conflict node 1 failed to start"
fi
kill $pid1 2>/dev/null || true
sleep 2
$BINARY stop conflict1.yaml >/dev/null 2>&1
sleep 1
else
cd "$TEST_DIR"
log_error "Failed to create conflict test data. Ensure test_conflict.go is correct."
@@ -378,19 +398,18 @@ allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node
$BINARY auth_test.yaml >auth_test.log 2>&1 &
local pid=$!
# Start node using daemon command
$BINARY start auth_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start and root account creation
if wait_for_service 8095; then
sleep 2 # Allow root account creation
# Extract the token from logs
local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r')
local log_file=$(get_log_file auth_test.yaml)
local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
kill $pid 2>/dev/null || true
$BINARY stop auth_test.yaml >/dev/null 2>&1
return
fi
@@ -426,14 +445,166 @@ EOF
log_error "KV endpoints should work with authentication, got: $kv_auth"
fi
kill $pid 2>/dev/null || true
sleep 2
$BINARY stop auth_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Auth test node failed to start"
kill $pid 2>/dev/null || true
$BINARY stop auth_test.yaml >/dev/null 2>&1
fi
}
# Test 6: Resource Metadata Management (Issue #12)
test_metadata_management() {
test_start "Resource Metadata Management test (Issue #12)"
# Create metadata test config
cat > metadata_test.yaml <<EOF
node_id: "metadata-test"
bind_address: "127.0.0.1"
port: 8096
data_dir: "./metadata_test_data"
seed_nodes: []
log_level: "error"
auth_enabled: true
allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node using daemon command
$BINARY start metadata_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start and root account creation
if wait_for_service 8096; then
# Extract the token from logs
local log_file=$(get_log_file metadata_test.yaml)
local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
$BINARY stop metadata_test.yaml >/dev/null 2>&1
return
fi
# First, create a KV resource
curl -s -X PUT http://localhost:8096/kv/test/resource -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"data":"test"}' >/dev/null
sleep 1
# Test 1: Get metadata should fail for non-existent metadata (initially no metadata exists)
local get_response=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
local get_body=$(echo "$get_response" | head -n -1)
local get_code=$(echo "$get_response" | tail -n 1)
if [ "$get_code" = "404" ]; then
log_success "GET metadata returns 404 for non-existent metadata"
else
log_error "GET metadata should return 404 for non-existent metadata, got code: $get_code, body: $get_body"
fi
# Test 2: Update metadata should create new metadata
local update_response=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"test-owner-123","permissions":3840}')
if echo "$update_response" | grep -q "owner_uuid"; then
log_success "PUT metadata creates metadata successfully"
else
log_error "PUT metadata should create metadata, got: $update_response"
fi
# Test 3: Get metadata should now return the created metadata
local get_response2=$(curl -s -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
if echo "$get_response2" | grep -q "test-owner-123" && echo "$get_response2" | grep -q "3840"; then
log_success "GET metadata returns created metadata"
else
log_error "GET metadata should return created metadata, got: $get_response2"
fi
# Test 4: Update metadata should modify existing metadata
local update_response2=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"new-owner-456"}')
if echo "$update_response2" | grep -q "new-owner-456"; then
log_success "PUT metadata updates existing metadata"
else
log_error "PUT metadata should update metadata, got: $update_response2"
fi
# Test 5: Metadata endpoints should require authentication
local no_auth=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata)
local no_auth_code=$(echo "$no_auth" | tail -n 1)
if [ "$no_auth_code" = "401" ]; then
log_success "Metadata endpoints properly require authentication"
else
log_error "Metadata endpoints should require authentication, got code: $no_auth_code"
fi
$BINARY stop metadata_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Metadata test node failed to start"
$BINARY stop metadata_test.yaml >/dev/null 2>&1
fi
}
# Test 7: Daemon commands (start, stop, status, restart)
test_daemon_commands() {
test_start "Daemon command tests (start, stop, status, restart)"
# Create daemon test config
cat > daemon_test.yaml <<EOF
node_id: "daemon-test"
bind_address: "127.0.0.1"
port: 8097
data_dir: "./daemon_test_data"
seed_nodes: []
log_level: "error"
allow_anonymous_read: true
allow_anonymous_write: true
EOF
# Test 1: Start command
$BINARY start daemon_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start
if wait_for_service 8097 5; then
log_success "Daemon 'start' command works"
# Test 2: Status command shows running
local status_output=$($BINARY status daemon_test.yaml 2>&1)
if echo "$status_output" | grep -q "RUNNING"; then
log_success "Daemon 'status' command shows RUNNING"
else
log_error "Daemon 'status' should show RUNNING, got: $status_output"
fi
# Test 3: Stop command
$BINARY stop daemon_test.yaml >/dev/null 2>&1
sleep 2
# Check that service is actually stopped
if ! curl -s "http://localhost:8097/health" >/dev/null 2>&1; then
log_success "Daemon 'stop' command works"
else
log_error "Daemon should be stopped but is still responding"
fi
# Test 4: Restart command
$BINARY restart daemon_test.yaml >/dev/null 2>&1
sleep 3
if wait_for_service 8097 5; then
log_success "Daemon 'restart' command works"
# Clean up
$BINARY stop daemon_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Daemon 'restart' failed to start service"
fi
else
log_error "Daemon 'start' command failed"
fi
# Ensure cleanup
pkill -f "daemon_test.yaml" 2>/dev/null || true
sleep 1
}
# Main test execution
main() {
echo "=================================================="
@@ -452,6 +623,8 @@ main() {
test_cluster_formation
test_conflict_resolution
test_authentication_middleware
test_metadata_management
test_daemon_commands
# Results
echo "=================================================="

208
main.go
View File

@@ -6,26 +6,90 @@ import (
"os"
"os/signal"
"syscall"
"time"
"path/filepath"
"strings"
"kvs/config"
"kvs/daemon"
"kvs/server"
)
func main() {
configPath := "./config.yaml"
// Simple CLI argument parsing
if len(os.Args) > 1 {
configPath = os.Args[1]
if len(os.Args) < 2 {
// No arguments - run in foreground with default config
runServer("./config.yaml", false)
return
}
// Check if this is a daemon spawn
if os.Args[1] == "--daemon" {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Error: --daemon flag requires config path\n")
os.Exit(1)
}
runServer(os.Args[2], true)
return
}
// Parse subcommand
command := os.Args[1]
switch command {
case "start":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs start <config>\n")
os.Exit(1)
}
cmdStart(normalizeConfigPath(os.Args[2]))
case "stop":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs stop <config>\n")
os.Exit(1)
}
cmdStop(normalizeConfigPath(os.Args[2]))
case "restart":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs restart <config>\n")
os.Exit(1)
}
cmdRestart(normalizeConfigPath(os.Args[2]))
case "status":
if len(os.Args) > 2 {
cmdStatusSingle(normalizeConfigPath(os.Args[2]))
} else {
cmdStatusAll()
}
case "help", "--help", "-h":
printHelp()
default:
// Backward compatibility: assume it's a config file path
runServer(command, false)
}
}
func runServer(configPath string, isDaemon bool) {
cfg, err := config.Load(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err)
os.Exit(1)
}
// Write PID file if running as daemon
if isDaemon {
if err := daemon.WritePID(configPath); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write PID file: %v\n", err)
os.Exit(1)
}
defer daemon.RemovePID(configPath)
}
kvServer, err := server.NewServer(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err)
@@ -46,3 +110,135 @@ func main() {
os.Exit(1)
}
}
func cmdStart(configPath string) {
if err := daemon.Daemonize(configPath); err != nil {
fmt.Fprintf(os.Stderr, "Failed to start: %v\n", err)
os.Exit(1)
}
}
func cmdStop(configPath string) {
pid, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err)
os.Exit(1)
}
if !running {
fmt.Printf("Instance '%s' is not running\n", configPath)
// Clean up stale PID file
daemon.RemovePID(configPath)
return
}
fmt.Printf("Stopping instance '%s' (PID %d)...\n", configPath, pid)
if err := daemon.StopProcess(pid); err != nil {
fmt.Fprintf(os.Stderr, "Failed to stop process: %v\n", err)
os.Exit(1)
}
// Wait a bit and verify it stopped
time.Sleep(1 * time.Second)
_, stillRunning, _ := daemon.ReadPID(configPath)
if stillRunning {
fmt.Printf("Warning: Process may still be running\n")
} else {
daemon.RemovePID(configPath)
fmt.Printf("Stopped successfully\n")
}
}
func cmdRestart(configPath string) {
// Check if running
_, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to check status: %v\n", err)
os.Exit(1)
}
if running {
cmdStop(configPath)
// Wait a bit for clean shutdown
time.Sleep(2 * time.Second)
}
cmdStart(configPath)
}
func cmdStatusSingle(configPath string) {
pid, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err)
os.Exit(1)
}
if running {
fmt.Printf("Instance '%s': RUNNING (PID %d)\n", configPath, pid)
} else if pid > 0 {
fmt.Printf("Instance '%s': STOPPED (stale PID %d)\n", configPath, pid)
} else {
fmt.Printf("Instance '%s': STOPPED\n", configPath)
}
}
func cmdStatusAll() {
instances, err := daemon.ListRunningInstances()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to list instances: %v\n", err)
os.Exit(1)
}
if len(instances) == 0 {
fmt.Println("No KVS instances found")
return
}
fmt.Println("KVS Instances:")
for _, inst := range instances {
status := "STOPPED"
if inst.Running {
status = "RUNNING"
}
fmt.Printf(" %-20s %s (PID %d)\n", inst.Name, status, inst.PID)
}
}
// normalizeConfigPath ensures config path has .yaml extension if not specified
func normalizeConfigPath(path string) string {
// If path doesn't have an extension, add .yaml
if filepath.Ext(path) == "" {
return path + ".yaml"
}
return path
}
// getConfigIdentifier returns the identifier for a config (basename without extension)
// This is used for PID files and status display
func getConfigIdentifier(path string) string {
basename := filepath.Base(path)
return strings.TrimSuffix(basename, filepath.Ext(basename))
}
func printHelp() {
help := `KVS - Distributed Key-Value Store
Usage:
kvs [config.yaml] Run in foreground (default: ./config.yaml)
kvs start <config> Start as daemon (.yaml extension optional)
kvs stop <config> Stop daemon (.yaml extension optional)
kvs restart <config> Restart daemon (.yaml extension optional)
kvs status [config] Show status (all instances if no config given)
kvs help Show this help
Examples:
kvs # Run with ./config.yaml in foreground
kvs node1.yaml # Run with node1.yaml in foreground
kvs start node1 # Start node1.yaml as daemon
kvs start node1.yaml # Same as above
kvs stop node1 # Stop node1 daemon
kvs status # Show all running instances
kvs status node1 # Show status of node1
`
fmt.Print(help)
}

View File

@@ -22,8 +22,6 @@ import (
"kvs/utils"
)
// healthHandler returns server health status
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
mode := s.getMode()
@@ -215,6 +213,104 @@ func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
s.logger.WithField("path", path).Info("Value deleted")
}
// getResourceMetadataHandler retrieves metadata for a KV resource
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Get metadata from storage
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// updateResourceMetadataHandler updates metadata for a KV resource
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Parse request body
var req types.UpdateResourceMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest)
return
}
// Get existing metadata or create new one
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
// Create new metadata with defaults
metadata = &types.ResourceMetadata{
OwnerUUID: "",
GroupUUID: "",
Permissions: types.DefaultPermissions,
TTL: "",
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Update only provided fields
if req.OwnerUUID != nil {
metadata.OwnerUUID = *req.OwnerUUID
}
if req.GroupUUID != nil {
metadata.GroupUUID = *req.GroupUUID
}
if req.Permissions != nil {
metadata.Permissions = *req.Permissions
}
metadata.UpdatedAt = time.Now().Unix()
// Store updated metadata
if err := s.authService.SetResourceMetadata(path, metadata); err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"owner_uuid": metadata.OwnerUUID,
"group_uuid": metadata.GroupUUID,
}).Info("Resource metadata updated")
}
// isClusterMember checks if request is from a cluster member
func (s *Server) isClusterMember(remoteAddr string) bool {
host, _, err := net.SplitHostPort(remoteAddr)
@@ -1271,3 +1367,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
return s.revisionService.GetSpecificRevision(key, revision)
}
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
// Ensure clustering is enabled
if !s.config.ClusteringEnabled {
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
return
}
// Ensure cluster secret is configured
if s.config.ClusterSecret == "" {
s.logger.Error("Cluster secret is not configured")
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
return
}
// Return the cluster secret for secure bootstrap
response := map[string]string{
"cluster_secret": s.config.ClusterSecret,
}
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}

View File

@@ -1,6 +1,8 @@
package server
import (
"net/http"
"github.com/gorilla/mux"
)
@@ -11,6 +13,18 @@ func (s *Server) setupRoutes() *mux.Router {
// Health endpoint (always available)
router.HandleFunc("/health", s.healthHandler).Methods("GET")
// Resource Metadata Management endpoints (Issue #12) - Must come BEFORE general KV routes
// These need to be registered first to prevent /kv/{path:.+} from matching metadata paths
if s.config.AuthEnabled {
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:read"}, nil, "",
)(s.getResourceMetadataHandler)).Methods("GET")
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:update"}, nil, "",
)(s.updateResourceMetadataHandler)).Methods("PUT")
}
// KV endpoints (with conditional authentication based on anonymous access settings)
// GET endpoint - require auth if anonymous read is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
@@ -41,16 +55,32 @@ func (s *Server) setupRoutes() *mux.Router {
// Member endpoints (available when clustering is enabled)
if s.config.ClusteringEnabled {
// GET /members/ is unprotected for monitoring/inspection
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
// Apply cluster authentication middleware to all cluster communication endpoints
if s.clusterAuthService != nil {
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
} else {
// Fallback to unprotected endpoints (for backwards compatibility)
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
}
}
// Authentication and user management endpoints (available when auth is enabled)
@@ -93,6 +123,12 @@ func (s *Server) setupRoutes() *mux.Router {
router.Handle("/api/tokens", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.createTokenHandler)).Methods("POST")
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
// Allows authenticated administrators to retrieve the cluster secret for new nodes
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.clusterBootstrapHandler)).Methods("GET")
}
// Revision History endpoints (available when revision history is enabled)

View File

@@ -50,7 +50,8 @@ type Server struct {
backupMu sync.RWMutex // Protects backup status
// Authentication service
authService *auth.AuthService
authService *auth.AuthService
clusterAuthService *auth.ClusterAuthService
}
// NewServer initializes and returns a new Server instance
@@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) {
// Initialize authentication service
server.authService = auth.NewAuthService(db, logger, config)
// Initialize cluster authentication service (Issue #13)
if config.ClusteringEnabled {
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
}
// Setup initial root account if needed (Issue #3)
if config.AuthEnabled {
if err := server.setupRootAccount(); err != nil {
@@ -269,10 +275,10 @@ func (s *Server) createRootUserAndToken() error {
// Log the token securely (one-time display)
s.logger.WithFields(logrus.Fields{
"user_uuid": rootUserUUID,
"group_uuid": adminGroupUUID,
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
"expires_in": "24 hours",
"user_uuid": rootUserUUID,
"group_uuid": adminGroupUUID,
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
"expires_in": "24 hours",
}).Warn("Root account created - SAVE THIS TOKEN:")
// Display token prominently
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
return nil
})
}

View File

@@ -12,10 +12,10 @@ import (
// StorageService handles all BadgerDB operations and data management
type StorageService struct {
db *badger.DB
config *types.Config
compressionSvc *CompressionService
logger *logrus.Logger
db *badger.DB
config *types.Config
compressionSvc *CompressionService
logger *logrus.Logger
}
// NewStorageService creates a new storage service

View File

@@ -13,20 +13,20 @@ type StoredValue struct {
// User represents a system user
type User struct {
UUID string `json:"uuid"` // Server-generated UUID
UUID string `json:"uuid"` // Server-generated UUID
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
}
// Group represents a user group
type Group struct {
UUID string `json:"uuid"` // Server-generated UUID
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
Members []string `json:"members"` // List of user UUIDs in this group
CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
UUID string `json:"uuid"` // Server-generated UUID
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
Members []string `json:"members"` // List of user UUIDs in this group
CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
}
// APIToken represents a JWT authentication token
@@ -40,12 +40,12 @@ type APIToken struct {
// ResourceMetadata contains ownership and permission information for stored resources
type ResourceMetadata struct {
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
GroupUUID string `json:"group_uuid"` // UUID of the resource group
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
TTL string `json:"ttl"` // Time-to-live duration (Go format)
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
GroupUUID string `json:"group_uuid"` // UUID of the resource group
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
TTL string `json:"ttl"` // Time-to-live duration (Go format)
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
}
// Permission constants for POSIX-inspired ACL
@@ -131,6 +131,22 @@ type CreateTokenResponse struct {
ExpiresAt int64 `json:"expires_at"`
}
// Resource Metadata Management API structures (Issue #12)
type GetResourceMetadataResponse struct {
OwnerUUID string `json:"owner_uuid"`
GroupUUID string `json:"group_uuid"`
Permissions int `json:"permissions"`
TTL string `json:"ttl"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type UpdateResourceMetadataRequest struct {
OwnerUUID *string `json:"owner_uuid,omitempty"`
GroupUUID *string `json:"group_uuid,omitempty"`
Permissions *int `json:"permissions,omitempty"`
}
// Cluster and member management types
type Member struct {
ID string `json:"id"`
@@ -231,28 +247,28 @@ type KVRangeResponse struct {
// Configuration
type Config struct {
NodeID string `yaml:"node_id"`
BindAddress string `yaml:"bind_address"`
Port int `yaml:"port"`
DataDir string `yaml:"data_dir"`
SeedNodes []string `yaml:"seed_nodes"`
ReadOnly bool `yaml:"read_only"`
LogLevel string `yaml:"log_level"`
GossipIntervalMin int `yaml:"gossip_interval_min"`
GossipIntervalMax int `yaml:"gossip_interval_max"`
SyncInterval int `yaml:"sync_interval"`
CatchupInterval int `yaml:"catchup_interval"`
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
FetchDelayMs int `yaml:"fetch_delay_ms"`
NodeID string `yaml:"node_id"`
BindAddress string `yaml:"bind_address"`
Port int `yaml:"port"`
DataDir string `yaml:"data_dir"`
SeedNodes []string `yaml:"seed_nodes"`
ReadOnly bool `yaml:"read_only"`
LogLevel string `yaml:"log_level"`
GossipIntervalMin int `yaml:"gossip_interval_min"`
GossipIntervalMax int `yaml:"gossip_interval_max"`
SyncInterval int `yaml:"sync_interval"`
CatchupInterval int `yaml:"catchup_interval"`
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
FetchDelayMs int `yaml:"fetch_delay_ms"`
// Database compression configuration
CompressionEnabled bool `yaml:"compression_enabled"`
CompressionLevel int `yaml:"compression_level"`
// TTL configuration
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
// Rate limiting configuration
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
@@ -262,10 +278,10 @@ type Config struct {
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
// Backup system configuration
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
BackupPath string `yaml:"backup_path"` // Directory to store backups
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
BackupPath string `yaml:"backup_path"` // Directory to store backups
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
// Feature toggles for optional functionalities
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
@@ -275,6 +291,13 @@ type Config struct {
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
// Anonymous access control (Issue #5)
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
// Cluster authentication (Issue #13)
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
}