5 Commits

Author SHA1 Message Date
852275945c fix: update bootstrap service and routes for cluster authentication
- Updated bootstrap service to use authenticated HTTP client with cluster auth headers
- Made GET /members/ endpoint unprotected for monitoring/inspection purposes
- All other cluster communication endpoints remain protected by cluster auth middleware

This ensures proper cluster formation while maintaining security for inter-node communication.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:27:15 +03:00
c7dcebb894 feat: implement secure cluster authentication (issue #13)
Implemented a comprehensive secure authentication mechanism for inter-node
cluster communication with the following features:

1. Global Cluster Secret (GCS)
   - Auto-generated cryptographically secure random secret (256-bit)
   - Configurable via YAML config file
   - Shared across all cluster nodes for authentication

2. Cluster Authentication Middleware
   - Validates X-Cluster-Secret and X-Node-ID headers
   - Applied to all cluster endpoints (/members/*, /merkle_tree/*, /kv_range)
   - Comprehensive logging of authentication attempts

3. Authenticated HTTP Client
   - Custom HTTP client with cluster auth headers
   - TLS support with configurable certificate verification
   - Protocol-aware (http/https based on TLS settings)

4. Secure Bootstrap Endpoint
   - New /auth/cluster-bootstrap endpoint
   - Protected by JWT authentication with admin scope
   - Allows new nodes to securely obtain cluster secret

5. Updated Cluster Communication
   - All gossip protocol requests include auth headers
   - All Merkle tree sync requests include auth headers
   - All data replication requests include auth headers

6. Configuration
   - cluster_secret: Shared secret (auto-generated if not provided)
   - cluster_tls_enabled: Enable TLS for inter-node communication
   - cluster_tls_cert_file: Path to TLS certificate
   - cluster_tls_key_file: Path to TLS private key
   - cluster_tls_skip_verify: Skip TLS verification (testing only)

This implementation addresses the security vulnerability of unprotected
cluster endpoints and provides a flexible, secure approach to protecting
internal cluster communication while allowing for automated node bootstrapping.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:19:40 +03:00
2431d3cfb0 test: add comprehensive authentication middleware test (issue #4)
- Add Test 5 to integration_test.sh for authentication verification
- Test admin endpoints reject unauthorized requests properly
- Test admin endpoints work with valid JWT tokens
- Test KV endpoints respect anonymous access configuration
- Extract and use auto-generated root account tokens

docs: update README and CLAUDE.md for recent security features

- Document allow_anonymous_read and allow_anonymous_write config options
- Update API documentation with authentication requirements
- Add security notes about DELETE operations always requiring auth
- Update configuration table with new anonymous access settings
- Document new authentication test coverage in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:34:15 +03:00
b4f57b3604 feat: add anonymous access configuration for KV endpoints (issue #5)
- Add AllowAnonymousRead and AllowAnonymousWrite config parameters
- Set both to false by default for security
- Apply conditional authentication middleware to KV endpoints:
  - GET requires auth if AllowAnonymousRead is false
  - PUT requires auth if AllowAnonymousWrite is false
  - DELETE always requires authentication (no anonymous delete)
- Update integration tests to enable anonymous access for testing
- Maintain backward compatibility when AuthEnabled is false

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:22:14 +03:00
e6d87d025f fix: secure admin endpoints with authentication middleware (issue #4)
- Add config parameter to AuthService constructor
- Implement proper config-based auth checks in middleware
- Wrap all admin endpoints (users, groups, tokens) with authentication
- Apply granular scopes: admin:users:*, admin:groups:*, admin:tokens:*
- Maintain backward compatibility when config is nil

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:15:38 +03:00
31 changed files with 676 additions and 250 deletions

View File

@@ -99,15 +99,21 @@ type StoredValue struct {
### Configuration Architecture ### Configuration Architecture
The system uses feature toggles extensively (`types/Config:271-276`): The system uses feature toggles extensively (`types/Config:271-280`):
```yaml ```yaml
auth_enabled: true # JWT authentication system auth_enabled: true # JWT authentication system
tamper_logging_enabled: true # Cryptographic audit trail tamper_logging_enabled: true # Cryptographic audit trail
clustering_enabled: true # Gossip protocol and sync clustering_enabled: true # Gossip protocol and sync
rate_limiting_enabled: true # Per-client rate limiting rate_limiting_enabled: true # Per-client rate limiting
revision_history_enabled: true # Automatic versioning revision_history_enabled: true # Automatic versioning
# Anonymous access control (Issue #5 - when auth_enabled: true)
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
``` ```
**Security Note**: DELETE operations always require authentication when `auth_enabled: true`, regardless of anonymous access settings.
### Testing Strategy ### Testing Strategy
#### Integration Test Suite (`integration_test.sh`) #### Integration Test Suite (`integration_test.sh`)
@@ -115,6 +121,11 @@ revision_history_enabled: true # Automatic versioning
- **Basic functionality** - Single-node CRUD operations - **Basic functionality** - Single-node CRUD operations
- **Cluster formation** - 2-node gossip protocol and data replication - **Cluster formation** - 2-node gossip protocol and data replication
- **Conflict resolution** - Automated conflict detection and resolution using `test_conflict.go` - **Conflict resolution** - Automated conflict detection and resolution using `test_conflict.go`
- **Authentication middleware** - Comprehensive security testing (Issue #4):
- Admin endpoints properly reject unauthenticated requests
- Admin endpoints work with valid JWT tokens
- KV endpoints respect anonymous access configuration
- Automatic root account creation and token extraction
The test suite uses sophisticated retry logic and timing to handle the eventually consistent nature of the system. The test suite uses sophisticated retry logic and timing to handle the eventually consistent nature of the system.

View File

@@ -113,6 +113,10 @@ clustering_enabled: true # Gossip protocol and sync
rate_limiting_enabled: true # Rate limiting rate_limiting_enabled: true # Rate limiting
revision_history_enabled: true # Automatic versioning revision_history_enabled: true # Automatic versioning
# Anonymous access control (when auth_enabled: true)
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
# Backup configuration # Backup configuration
backup_enabled: true # Automated backups backup_enabled: true # Automated backups
backup_schedule: "0 0 * * *" # Daily at midnight (cron format) backup_schedule: "0 0 * * *" # Daily at midnight (cron format)
@@ -134,7 +138,7 @@ backup_retention: 7 # Days to keep backups
```bash ```bash
PUT /kv/{path} PUT /kv/{path}
Content-Type: application/json Content-Type: application/json
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_write
# Basic storage # Basic storage
curl -X PUT http://localhost:8080/kv/users/john/profile \ curl -X PUT http://localhost:8080/kv/users/john/profile \
@@ -158,7 +162,7 @@ curl -X PUT http://localhost:8080/kv/cache/session/abc123 \
#### Retrieve Data #### Retrieve Data
```bash ```bash
GET /kv/{path} GET /kv/{path}
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_read
curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
@@ -177,7 +181,7 @@ curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profi
#### Delete Data #### Delete Data
```bash ```bash
DELETE /kv/{path} DELETE /kv/{path}
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Always required when auth_enabled (no anonymous delete)
curl -X DELETE -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile curl -X DELETE -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
# Returns: 204 No Content # Returns: 204 No Content
@@ -532,6 +536,8 @@ type StoredValue struct {
| `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default | | `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default |
| **Feature Toggles** | | **Feature Toggles** |
| `auth_enabled` | JWT authentication system | true | Complete auth/authz system | | `auth_enabled` | JWT authentication system | true | Complete auth/authz system |
| `allow_anonymous_read` | Allow unauthenticated read access | false | When auth_enabled, controls KV GET endpoints |
| `allow_anonymous_write` | Allow unauthenticated write access | false | When auth_enabled, controls KV PUT endpoints |
| `clustering_enabled` | Gossip protocol and sync | true | Distributed mode | | `clustering_enabled` | Gossip protocol and sync | true | Distributed mode |
| `compression_enabled` | ZSTD compression | true | Reduces storage size | | `compression_enabled` | ZSTD compression | true | Reduces storage size |
| `rate_limiting_enabled` | Rate limiting | true | Per-client limits | | `rate_limiting_enabled` | Rate limiting | true | Per-client limits |

View File

@@ -26,13 +26,15 @@ type AuthContext struct {
type AuthService struct { type AuthService struct {
db *badger.DB db *badger.DB
logger *logrus.Logger logger *logrus.Logger
config *types.Config
} }
// NewAuthService creates a new authentication service // NewAuthService creates a new authentication service
func NewAuthService(db *badger.DB, logger *logrus.Logger) *AuthService { func NewAuthService(db *badger.DB, logger *logrus.Logger, config *types.Config) *AuthService {
return &AuthService{ return &AuthService{
db: db, db: db,
logger: logger, logger: logger,
config: config,
} }
} }

77
auth/cluster.go Normal file
View File

@@ -0,0 +1,77 @@
package auth
import (
"net/http"
"github.com/sirupsen/logrus"
)
// ClusterAuthService handles authentication for inter-cluster communication
type ClusterAuthService struct {
clusterSecret string
logger *logrus.Logger
}
// NewClusterAuthService creates a new cluster authentication service
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
return &ClusterAuthService{
clusterSecret: clusterSecret,
logger: logger,
}
}
// Middleware validates cluster authentication headers
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract authentication headers
clusterSecret := r.Header.Get("X-Cluster-Secret")
nodeID := r.Header.Get("X-Node-ID")
// Log authentication attempt
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
"method": r.Method,
}).Debug("Cluster authentication attempt")
// Validate cluster secret
if clusterSecret == "" {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Cluster-Secret header")
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
return
}
if clusterSecret != s.clusterSecret {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Invalid cluster secret")
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
return
}
// Validate node ID is present
if nodeID == "" {
s.logger.WithFields(logrus.Fields{
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Node-ID header")
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
return
}
// Authentication successful
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"path": r.URL.Path,
}).Debug("Cluster authentication successful")
next.ServeHTTP(w, r)
})
}

View File

@@ -138,11 +138,12 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
} }
} }
// isAuthEnabled checks if authentication is enabled (would be passed from config) // isAuthEnabled checks if authentication is enabled from config
func (s *AuthService) isAuthEnabled() bool { func (s *AuthService) isAuthEnabled() bool {
// This would normally be injected from config, but for now we'll assume enabled if s.config != nil {
// TODO: Inject config dependency return s.config.AuthEnabled
return true }
return true // Default to enabled if no config
} }
// Helper method to check rate limits (simplified version) // Helper method to check rate limits (simplified version)

View File

@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
return false return false
} }
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/members/join", seedAddr) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create join request")
return false
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"seed": seedAddr, "seed": seedAddr,

View File

@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
return err return err
} }
// Send HTTP request to peer // Send HTTP request to peer with cluster authentication
client := &http.Client{Timeout: 5 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
url := fmt.Sprintf("http://%s/members/gossip", peer.Address) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create gossip request")
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"peer": peer.Address, "peer": peer.Address,

43
cluster/http_client.go Normal file
View File

@@ -0,0 +1,43 @@
package cluster
import (
"crypto/tls"
"net/http"
"time"
"kvs/types"
)
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
client := &http.Client{
Timeout: timeout,
}
// Configure TLS if enabled
if config.ClusterTLSEnabled {
tlsConfig := &tls.Config{
InsecureSkipVerify: config.ClusterTLSSkipVerify,
}
client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
return client
}
// AddClusterAuthHeaders adds authentication headers to an HTTP request
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
req.Header.Set("X-Node-ID", config.NodeID)
}
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
func GetProtocol(config *types.Config) string {
if config.ClusterTLSEnabled {
return "https"
}
return "http"
}

View File

@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
// requestMerkleRoot requests the Merkle root from a peer // requestMerkleRoot requests the Merkle root from a peer
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) { func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
resp, err := client.Get(url) req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
// fetchSingleKVFromPeer fetches a single KV pair from a peer // fetchSingleKVFromPeer fetches a single KV pair from a peer
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) { func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
client := &http.Client{Timeout: 5 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
resp, err := client.Get(url) req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -461,16 +475,24 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
} }
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer // requestMerkleDiff requests children hashes or keys for a given node/range from a peer
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(req) jsonData, err := json.Marshal(reqData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally // fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
req := types.KVRangeRequest{ reqData := types.KVRangeRequest{
StartKey: startKey, StartKey: startKey,
EndKey: endKey, EndKey: endKey,
Limit: 0, // No limit Limit: 0, // No limit
} }
jsonData, err := json.Marshal(req) jsonData, err := json.Marshal(reqData)
if err != nil { if err != nil {
return err return err
} }
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
url := fmt.Sprintf("http://%s/kv_range", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -1,12 +1,14 @@
package config package config
import ( import (
"crypto/rand"
"encoding/base64"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"kvs/types"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"kvs/types"
) )
// Default configuration // Default configuration
@@ -55,9 +57,33 @@ func Default() *types.Config {
ClusteringEnabled: true, ClusteringEnabled: true,
RateLimitingEnabled: true, RateLimitingEnabled: true,
RevisionHistoryEnabled: true, RevisionHistoryEnabled: true,
// Default anonymous access settings (both disabled by default for security)
AllowAnonymousRead: false,
AllowAnonymousWrite: false,
// Default cluster authentication settings (Issue #13)
ClusterSecret: generateClusterSecret(),
ClusterTLSEnabled: false,
ClusterTLSCertFile: "",
ClusterTLSKeyFile: "",
ClusterTLSSkipVerify: false,
} }
} }
// generateClusterSecret generates a cryptographically secure random cluster secret
func generateClusterSecret() string {
// Generate 32 bytes (256 bits) of random data
randomBytes := make([]byte, 32)
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to a warning - this should never happen in practice
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
return ""
}
// Encode as base64 for easy configuration file storage
return base64.StdEncoding.EncodeToString(randomBytes)
}
// Load configuration from file or create default // Load configuration from file or create default
func Load(configPath string) (*types.Config, error) { func Load(configPath string) (*types.Config, error) {
config := Default() config := Default()
@@ -90,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
return nil, fmt.Errorf("failed to parse config file: %v", err) return nil, fmt.Errorf("failed to parse config file: %v", err)
} }
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
if config.ClusteringEnabled && config.ClusterSecret == "" {
config.ClusterSecret = generateClusterSecret()
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
}
return config, nil return config, nil
} }

View File

@@ -91,6 +91,8 @@ port: 8090
data_dir: "./basic_data" data_dir: "./basic_data"
seed_nodes: [] seed_nodes: []
log_level: "error" log_level: "error"
allow_anonymous_read: true
allow_anonymous_write: true
EOF EOF
# Start node # Start node
@@ -123,6 +125,9 @@ EOF
test_cluster_formation() { test_cluster_formation() {
test_start "2-node cluster formation and Merkle Tree replication" test_start "2-node cluster formation and Merkle Tree replication"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
# Node 1 config # Node 1 config
cat > cluster1.yaml <<EOF cat > cluster1.yaml <<EOF
node_id: "cluster-1" node_id: "cluster-1"
@@ -134,6 +139,9 @@ log_level: "error"
gossip_interval_min: 5 gossip_interval_min: 5
gossip_interval_max: 10 gossip_interval_max: 10
sync_interval: 10 sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Node 2 config # Node 2 config
@@ -147,6 +155,9 @@ log_level: "error"
gossip_interval_min: 5 gossip_interval_min: 5
gossip_interval_max: 10 gossip_interval_max: 10
sync_interval: 10 sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Start nodes # Start nodes
@@ -233,6 +244,9 @@ test_conflict_resolution() {
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
cd "$TEST_DIR" cd "$TEST_DIR"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
# Create configs # Create configs
cat > conflict1.yaml <<EOF cat > conflict1.yaml <<EOF
node_id: "conflict-1" node_id: "conflict-1"
@@ -242,6 +256,9 @@ data_dir: "./conflict1_data"
seed_nodes: [] seed_nodes: []
log_level: "info" log_level: "info"
sync_interval: 3 sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
cat > conflict2.yaml <<EOF cat > conflict2.yaml <<EOF
@@ -252,6 +269,9 @@ data_dir: "./conflict2_data"
seed_nodes: ["127.0.0.1:8111"] seed_nodes: ["127.0.0.1:8111"]
log_level: "info" log_level: "info"
sync_interval: 3 sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Start nodes # Start nodes
@@ -351,6 +371,79 @@ EOF
fi fi
} }
# Test 5: Authentication middleware (Issue #4)
test_authentication_middleware() {
test_start "Authentication middleware test (Issue #4)"
# Create auth test config
cat > auth_test.yaml <<EOF
node_id: "auth-test"
bind_address: "127.0.0.1"
port: 8095
data_dir: "./auth_test_data"
seed_nodes: []
log_level: "error"
auth_enabled: true
allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node
$BINARY auth_test.yaml >auth_test.log 2>&1 &
local pid=$!
if wait_for_service 8095; then
sleep 2 # Allow root account creation
# Extract the token from logs
local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
kill $pid 2>/dev/null || true
return
fi
# Test 1: Admin endpoints should fail without authentication
local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}')
if echo "$no_auth_response" | grep -q "Unauthorized"; then
log_success "Admin endpoints properly reject unauthenticated requests"
else
log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response"
fi
# Test 2: Admin endpoints should work with valid authentication
local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}')
if echo "$auth_response" | grep -q "uuid"; then
log_success "Admin endpoints work with valid authentication"
else
log_error "Admin endpoints should work with authentication, got: $auth_response"
fi
# Test 3: KV endpoints should require auth when anonymous access is disabled
local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}')
if echo "$kv_no_auth" | grep -q "Unauthorized"; then
log_success "KV endpoints properly require authentication when anonymous access disabled"
else
log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth"
fi
# Test 4: KV endpoints should work with valid authentication
local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}')
if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then
log_success "KV endpoints work with valid authentication"
else
log_error "KV endpoints should work with authentication, got: $kv_auth"
fi
kill $pid 2>/dev/null || true
sleep 2
else
log_error "Auth test node failed to start"
kill $pid 2>/dev/null || true
fi
}
# Main test execution # Main test execution
main() { main() {
echo "==================================================" echo "=================================================="
@@ -368,6 +461,7 @@ main() {
test_basic_functionality test_basic_functionality
test_cluster_formation test_cluster_formation
test_conflict_resolution test_conflict_resolution
test_authentication_middleware
# Results # Results
echo "==================================================" echo "=================================================="

View File

@@ -11,7 +11,6 @@ import (
"kvs/server" "kvs/server"
) )
func main() { func main() {
configPath := "./config.yaml" configPath := "./config.yaml"

View File

@@ -22,8 +22,6 @@ import (
"kvs/utils" "kvs/utils"
) )
// healthHandler returns server health status // healthHandler returns server health status
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
mode := s.getMode() mode := s.getMode()
@@ -1271,3 +1269,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) { func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
return s.revisionService.GetSpecificRevision(key, revision) return s.revisionService.GetSpecificRevision(key, revision)
} }
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
// Ensure clustering is enabled
if !s.config.ClusteringEnabled {
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
return
}
// Ensure cluster secret is configured
if s.config.ClusterSecret == "" {
s.logger.Error("Cluster secret is not configured")
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
return
}
// Return the cluster secret for secure bootstrap
response := map[string]string{
"cluster_secret": s.config.ClusterSecret,
}
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}

View File

@@ -1,6 +1,8 @@
package server package server
import ( import (
"net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@@ -11,14 +13,52 @@ func (s *Server) setupRoutes() *mux.Router {
// Health endpoint (always available) // Health endpoint (always available)
router.HandleFunc("/health", s.healthHandler).Methods("GET") router.HandleFunc("/health", s.healthHandler).Methods("GET")
// KV endpoints (always available - see issue #5 for anonymous access control) // KV endpoints (with conditional authentication based on anonymous access settings)
// GET endpoint - require auth if anonymous read is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"read"}, nil, "",
)(s.getKVHandler)).Methods("GET")
} else {
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
}
// PUT endpoint - require auth if anonymous write is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousWrite {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"write"}, nil, "",
)(s.putKVHandler)).Methods("PUT")
} else {
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
}
// DELETE endpoint - always require authentication (no anonymous delete)
if s.config.AuthEnabled {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"delete"}, nil, "",
)(s.deleteKVHandler)).Methods("DELETE")
} else {
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE") router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
}
// Member endpoints (available when clustering is enabled) // Member endpoints (available when clustering is enabled)
if s.config.ClusteringEnabled { if s.config.ClusteringEnabled {
// GET /members/ is unprotected for monitoring/inspection
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
// Apply cluster authentication middleware to all cluster communication endpoints
if s.clusterAuthService != nil {
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
} else {
// Fallback to unprotected endpoints (for backwards compatibility)
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST") router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE") router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST") router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
@@ -29,23 +69,54 @@ func (s *Server) setupRoutes() *mux.Router {
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
} }
}
// Authentication and user management endpoints (available when auth is enabled) // Authentication and user management endpoints (available when auth is enabled)
if s.config.AuthEnabled { if s.config.AuthEnabled {
// User Management endpoints // User Management endpoints (with authentication middleware)
router.HandleFunc("/api/users", s.createUserHandler).Methods("POST") router.Handle("/api/users", s.authService.Middleware(
router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET") []string{"admin:users:create"}, nil, "",
router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT") )(s.createUserHandler)).Methods("POST")
router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE")
// Group Management endpoints router.Handle("/api/users/{uuid}", s.authService.Middleware(
router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST") []string{"admin:users:read"}, nil, "",
router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET") )(s.getUserHandler)).Methods("GET")
router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT")
router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE")
// Token Management endpoints router.Handle("/api/users/{uuid}", s.authService.Middleware(
router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST") []string{"admin:users:update"}, nil, "",
)(s.updateUserHandler)).Methods("PUT")
router.Handle("/api/users/{uuid}", s.authService.Middleware(
[]string{"admin:users:delete"}, nil, "",
)(s.deleteUserHandler)).Methods("DELETE")
// Group Management endpoints (with authentication middleware)
router.Handle("/api/groups", s.authService.Middleware(
[]string{"admin:groups:create"}, nil, "",
)(s.createGroupHandler)).Methods("POST")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:read"}, nil, "",
)(s.getGroupHandler)).Methods("GET")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:update"}, nil, "",
)(s.updateGroupHandler)).Methods("PUT")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:delete"}, nil, "",
)(s.deleteGroupHandler)).Methods("DELETE")
// Token Management endpoints (with authentication middleware)
router.Handle("/api/tokens", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.createTokenHandler)).Methods("POST")
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
// Allows authenticated administrators to retrieve the cluster secret for new nodes
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.clusterBootstrapHandler)).Methods("GET")
} }
// Revision History endpoints (available when revision history is enabled) // Revision History endpoints (available when revision history is enabled)

View File

@@ -51,6 +51,7 @@ type Server struct {
// Authentication service // Authentication service
authService *auth.AuthService authService *auth.AuthService
clusterAuthService *auth.ClusterAuthService
} }
// NewServer initializes and returns a new Server instance // NewServer initializes and returns a new Server instance
@@ -118,7 +119,12 @@ func NewServer(config *types.Config) (*Server, error) {
server.revisionService = storage.NewRevisionService(storageService) server.revisionService = storage.NewRevisionService(storageService)
// Initialize authentication service // Initialize authentication service
server.authService = auth.NewAuthService(db, logger) server.authService = auth.NewAuthService(db, logger, config)
// Initialize cluster authentication service (Issue #13)
if config.ClusteringEnabled {
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
}
// Setup initial root account if needed (Issue #3) // Setup initial root account if needed (Issue #3)
if config.AuthEnabled { if config.AuthEnabled {
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
return nil return nil
}) })
} }

View File

@@ -273,4 +273,15 @@ type Config struct {
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
// Anonymous access control (Issue #5)
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
// Cluster authentication (Issue #13)
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
} }