9 Commits

Author SHA1 Message Date
64680a6ece docs: document daemon process management commands
Update README.md and CLAUDE.md to document new process management:
- Add "Process Management" section with daemon commands
- Update all examples to use `./kvs start/stop/status` instead of `&` and `pkill`
- Document global PID/log directories (~/.kvs/)
- Update cluster setup examples
- Update development workflow
- Add daemon package to project structure

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 23:10:25 +03:00
4c3fcbc45a test: refactor integration tests to use daemon commands
Update integration_test.sh to use new daemon management commands
instead of manual background processes and PIDs:
- Replace `kvs config.yaml &` with `kvs start config.yaml`
- Replace `kill $pid` with `kvs stop config.yaml`
- Update log file paths to use ~/.kvs/logs/
- Add integration_test/ directory to gitignore

All tests now use clean daemon lifecycle management.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 22:59:25 +03:00
a41e0d625c feat: add process management commands for daemon control
Add systemd-style subcommands for managing KVS instances:
- start <config>  - Daemonize and run in background
- stop <config>   - Gracefully stop daemon
- restart <config> - Restart daemon
- status [config] - Show status of all or specific instances

Key features:
- PID files stored in ~/.kvs/pids/ (global across all directories)
- Logs stored in ~/.kvs/logs/
- Config names support both 'node1' and 'node1.yaml' formats
- Backward compatible: 'kvs config.yaml' still runs in foreground
- Proper stale PID detection and cleanup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 22:56:16 +03:00
377af163f0 feat: implement resource metadata management API (issue #12)
Add API endpoints to manage ResourceMetadata (ownership, groups, permissions)
for KV resources. This enables administrators to configure granular access
control for stored data.

Changes:
- Add GetResourceMetadataResponse and UpdateResourceMetadataRequest types
- Add GetResourceMetadata and SetResourceMetadata methods to AuthService
- Add GET /kv/{path}/metadata endpoint (requires admin:users:read)
- Add PUT /kv/{path}/metadata endpoint (requires admin:users:update)
- Both endpoints protected by JWT authentication
- Metadata routes registered before general KV routes to prevent pattern conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 00:06:14 +03:00
852275945c fix: update bootstrap service and routes for cluster authentication
- Updated bootstrap service to use authenticated HTTP client with cluster auth headers
- Made GET /members/ endpoint unprotected for monitoring/inspection purposes
- All other cluster communication endpoints remain protected by cluster auth middleware

This ensures proper cluster formation while maintaining security for inter-node communication.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:27:15 +03:00
c7dcebb894 feat: implement secure cluster authentication (issue #13)
Implemented a comprehensive secure authentication mechanism for inter-node
cluster communication with the following features:

1. Global Cluster Secret (GCS)
   - Auto-generated cryptographically secure random secret (256-bit)
   - Configurable via YAML config file
   - Shared across all cluster nodes for authentication

2. Cluster Authentication Middleware
   - Validates X-Cluster-Secret and X-Node-ID headers
   - Applied to all cluster endpoints (/members/*, /merkle_tree/*, /kv_range)
   - Comprehensive logging of authentication attempts

3. Authenticated HTTP Client
   - Custom HTTP client with cluster auth headers
   - TLS support with configurable certificate verification
   - Protocol-aware (http/https based on TLS settings)

4. Secure Bootstrap Endpoint
   - New /auth/cluster-bootstrap endpoint
   - Protected by JWT authentication with admin scope
   - Allows new nodes to securely obtain cluster secret

5. Updated Cluster Communication
   - All gossip protocol requests include auth headers
   - All Merkle tree sync requests include auth headers
   - All data replication requests include auth headers

6. Configuration
   - cluster_secret: Shared secret (auto-generated if not provided)
   - cluster_tls_enabled: Enable TLS for inter-node communication
   - cluster_tls_cert_file: Path to TLS certificate
   - cluster_tls_key_file: Path to TLS private key
   - cluster_tls_skip_verify: Skip TLS verification (testing only)

This implementation addresses the security vulnerability of unprotected
cluster endpoints and provides a flexible, secure approach to protecting
internal cluster communication while allowing for automated node bootstrapping.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 22:19:40 +03:00
2431d3cfb0 test: add comprehensive authentication middleware test (issue #4)
- Add Test 5 to integration_test.sh for authentication verification
- Test admin endpoints reject unauthorized requests properly
- Test admin endpoints work with valid JWT tokens
- Test KV endpoints respect anonymous access configuration
- Extract and use auto-generated root account tokens

docs: update README and CLAUDE.md for recent security features

- Document allow_anonymous_read and allow_anonymous_write config options
- Update API documentation with authentication requirements
- Add security notes about DELETE operations always requiring auth
- Update configuration table with new anonymous access settings
- Document new authentication test coverage in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:34:15 +03:00
b4f57b3604 feat: add anonymous access configuration for KV endpoints (issue #5)
- Add AllowAnonymousRead and AllowAnonymousWrite config parameters
- Set both to false by default for security
- Apply conditional authentication middleware to KV endpoints:
  - GET requires auth if AllowAnonymousRead is false
  - PUT requires auth if AllowAnonymousWrite is false
  - DELETE always requires authentication (no anonymous delete)
- Update integration tests to enable anonymous access for testing
- Maintain backward compatibility when AuthEnabled is false

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:22:14 +03:00
e6d87d025f fix: secure admin endpoints with authentication middleware (issue #4)
- Add config parameter to AuthService constructor
- Implement proper config-based auth checks in middleware
- Wrap all admin endpoints (users, groups, tokens) with authentication
- Apply granular scopes: admin:users:*, admin:groups:*, admin:tokens:*
- Maintain backward compatibility when config is nil

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 12:15:38 +03:00
34 changed files with 1664 additions and 327 deletions

2
.gitignore vendored
View File

@@ -1,6 +1,8 @@
.claude/ .claude/
.kvs/
data/ data/
data*/ data*/
integration_test/
*.yaml *.yaml
!config.yaml !config.yaml
kvs kvs

View File

@@ -10,10 +10,16 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
go build -o kvs . go build -o kvs .
# Run with default config (auto-generates config.yaml) # Run with default config (auto-generates config.yaml)
./kvs ./kvs start config.yaml
# Run with custom config # Run with custom config
./kvs /path/to/config.yaml ./kvs start /path/to/config.yaml
# Check running instances
./kvs status
# Stop instance
./kvs stop config
# Run comprehensive integration tests # Run comprehensive integration tests
./integration_test.sh ./integration_test.sh
@@ -25,6 +31,32 @@ go run test_conflict.go data1 data2
go build -o kvs . && ./integration_test.sh go build -o kvs . && ./integration_test.sh
``` ```
### Process Management Commands
```bash
# Start as background daemon
./kvs start <config.yaml> # .yaml extension optional
# Stop daemon
./kvs stop <config> # Graceful SIGTERM shutdown
# Restart daemon
./kvs restart <config> # Stop then start
# Show status
./kvs status # All instances
./kvs status <config> # Specific instance
# Run in foreground (for debugging)
./kvs <config.yaml> # Logs to stdout, blocks terminal
# View daemon logs
tail -f ~/.kvs/logs/kvs_<config>.yaml.log
# Global state directories
~/.kvs/pids/ # PID files (works from any directory)
~/.kvs/logs/ # Daemon log files
```
### Development Workflow ### Development Workflow
```bash ```bash
# Format and check code # Format and check code
@@ -38,11 +70,25 @@ go mod tidy
go build . go build .
# Test specific cluster scenarios # Test specific cluster scenarios
./kvs node1.yaml & # Terminal 1 ./kvs start node1.yaml
./kvs node2.yaml & # Terminal 2 ./kvs start node2.yaml
# Wait for cluster formation
sleep 5
# Test data operations
curl -X PUT http://localhost:8081/kv/test/data -H "Content-Type: application/json" -d '{"test":"data"}' curl -X PUT http://localhost:8081/kv/test/data -H "Content-Type: application/json" -d '{"test":"data"}'
curl http://localhost:8082/kv/test/data # Should replicate within ~30 seconds curl http://localhost:8082/kv/test/data # Should replicate within ~30 seconds
pkill kvs
# Check daemon status
./kvs status
# View logs
tail -f ~/.kvs/logs/kvs_node1.yaml.log
# Cleanup
./kvs stop node1
./kvs stop node2
``` ```
## Architecture Overview ## Architecture Overview
@@ -58,7 +104,8 @@ KVS is a **distributed, eventually consistent key-value store** built around thr
#### Modular Package Design #### Modular Package Design
- **`auth/`** - Complete JWT authentication system with POSIX-inspired permissions - **`auth/`** - Complete JWT authentication system with POSIX-inspired permissions
- **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees) - **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees)
- **`daemon/`** - Process management (daemonization, PID files, lifecycle)
- **`storage/`** - BadgerDB abstraction with compression and revision history - **`storage/`** - BadgerDB abstraction with compression and revision history
- **`server/`** - HTTP handlers, routing, and lifecycle management - **`server/`** - HTTP handlers, routing, and lifecycle management
- **`features/`** - Utility functions for TTL, rate limiting, tamper logging, backup - **`features/`** - Utility functions for TTL, rate limiting, tamper logging, backup
@@ -99,15 +146,21 @@ type StoredValue struct {
### Configuration Architecture ### Configuration Architecture
The system uses feature toggles extensively (`types/Config:271-276`): The system uses feature toggles extensively (`types/Config:271-280`):
```yaml ```yaml
auth_enabled: true # JWT authentication system auth_enabled: true # JWT authentication system
tamper_logging_enabled: true # Cryptographic audit trail tamper_logging_enabled: true # Cryptographic audit trail
clustering_enabled: true # Gossip protocol and sync clustering_enabled: true # Gossip protocol and sync
rate_limiting_enabled: true # Per-client rate limiting rate_limiting_enabled: true # Per-client rate limiting
revision_history_enabled: true # Automatic versioning revision_history_enabled: true # Automatic versioning
# Anonymous access control (Issue #5 - when auth_enabled: true)
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
``` ```
**Security Note**: DELETE operations always require authentication when `auth_enabled: true`, regardless of anonymous access settings.
### Testing Strategy ### Testing Strategy
#### Integration Test Suite (`integration_test.sh`) #### Integration Test Suite (`integration_test.sh`)
@@ -115,6 +168,11 @@ revision_history_enabled: true # Automatic versioning
- **Basic functionality** - Single-node CRUD operations - **Basic functionality** - Single-node CRUD operations
- **Cluster formation** - 2-node gossip protocol and data replication - **Cluster formation** - 2-node gossip protocol and data replication
- **Conflict resolution** - Automated conflict detection and resolution using `test_conflict.go` - **Conflict resolution** - Automated conflict detection and resolution using `test_conflict.go`
- **Authentication middleware** - Comprehensive security testing (Issue #4):
- Admin endpoints properly reject unauthenticated requests
- Admin endpoints work with valid JWT tokens
- KV endpoints respect anonymous access configuration
- Automatic root account creation and token extraction
The test suite uses sophisticated retry logic and timing to handle the eventually consistent nature of the system. The test suite uses sophisticated retry logic and timing to handle the eventually consistent nature of the system.
@@ -136,9 +194,18 @@ Creates two BadgerDB instances with intentionally conflicting data (same path, s
- **Bootstrap sync**: Up to 30 days of historical data for new nodes - **Bootstrap sync**: Up to 30 days of historical data for new nodes
#### Main Entry Point Flow #### Main Entry Point Flow
1. `main.go` loads config (auto-generates default if missing) 1. `main.go` parses command-line arguments for subcommands (`start`, `stop`, `status`, `restart`)
2. `server.NewServer()` initializes all subsystems 2. For daemon mode: `daemon.Daemonize()` spawns background process and manages PID files
3. Graceful shutdown handling with `SIGINT`/`SIGTERM` 3. For server mode: loads config (auto-generates default if missing)
4. All business logic delegated to modular packages 4. `server.NewServer()` initializes all subsystems
5. Graceful shutdown handling with `SIGINT`/`SIGTERM`
6. All business logic delegated to modular packages
#### Daemon Architecture
- **PID Management**: Global PID files stored in `~/.kvs/pids/` for cross-directory access
- **Logging**: Daemon logs written to `~/.kvs/logs/{config-name}.log`
- **Process Lifecycle**: Spawns detached process via `exec.Command()` with `Setsid: true`
- **Config Normalization**: Supports both `node1` and `node1.yaml` formats
- **Stale PID Detection**: Checks process existence via `Signal(0)` before operations
This architecture enables easy feature addition, comprehensive testing, and reliable operation in distributed environments while maintaining simplicity for single-node deployments. This architecture enables easy feature addition, comprehensive testing, and reliable operation in distributed environments while maintaining simplicity for single-node deployments.

127
README.md
View File

@@ -69,11 +69,67 @@ go build -o kvs .
### Quick Test ### Quick Test
```bash ```bash
# Start standalone node # Start standalone node (uses config.yaml if it exists, or creates it)
./kvs ./kvs start config.yaml
# Test the API # Test the API
curl http://localhost:8080/health curl http://localhost:8080/health
# Check status
./kvs status
# Stop when done
./kvs stop config
```
## 🎮 Process Management
KVS includes systemd-style daemon commands for easy process management:
```bash
# Start as background daemon
./kvs start config.yaml # or just: ./kvs start config
./kvs start node1.yaml # Start with custom config
# Check status
./kvs status # Show all running instances
./kvs status node1 # Show specific instance
# Stop daemon
./kvs stop node1 # Graceful shutdown
# Restart daemon
./kvs restart node1 # Stop and start
# Run in foreground (traditional)
./kvs node1.yaml # Logs to stdout
```
### Daemon Features
- **Global PID tracking**: PID files stored in `~/.kvs/pids/` (works from any directory)
- **Automatic logging**: Logs written to `~/.kvs/logs/{config-name}.log`
- **Flexible naming**: Config extension optional (`node1` or `node1.yaml` both work)
- **Graceful shutdown**: SIGTERM sent for clean shutdown
- **Stale PID cleanup**: Automatically detects and cleans dead processes
- **Multi-instance**: Run multiple KVS instances on same machine
### Example Workflow
```bash
# Start 3-node cluster as daemons
./kvs start node1.yaml
./kvs start node2.yaml
./kvs start node3.yaml
# Check cluster status
./kvs status
# View logs
tail -f ~/.kvs/logs/kvs_node1.yaml.log
# Stop entire cluster
./kvs stop node1
./kvs stop node2
./kvs stop node3
``` ```
## ⚙️ Configuration ## ⚙️ Configuration
@@ -113,6 +169,10 @@ clustering_enabled: true # Gossip protocol and sync
rate_limiting_enabled: true # Rate limiting rate_limiting_enabled: true # Rate limiting
revision_history_enabled: true # Automatic versioning revision_history_enabled: true # Automatic versioning
# Anonymous access control (when auth_enabled: true)
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
# Backup configuration # Backup configuration
backup_enabled: true # Automated backups backup_enabled: true # Automated backups
backup_schedule: "0 0 * * *" # Daily at midnight (cron format) backup_schedule: "0 0 * * *" # Daily at midnight (cron format)
@@ -134,7 +194,7 @@ backup_retention: 7 # Days to keep backups
```bash ```bash
PUT /kv/{path} PUT /kv/{path}
Content-Type: application/json Content-Type: application/json
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_write
# Basic storage # Basic storage
curl -X PUT http://localhost:8080/kv/users/john/profile \ curl -X PUT http://localhost:8080/kv/users/john/profile \
@@ -158,7 +218,7 @@ curl -X PUT http://localhost:8080/kv/cache/session/abc123 \
#### Retrieve Data #### Retrieve Data
```bash ```bash
GET /kv/{path} GET /kv/{path}
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_read
curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
@@ -177,7 +237,7 @@ curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profi
#### Delete Data #### Delete Data
```bash ```bash
DELETE /kv/{path} DELETE /kv/{path}
Authorization: Bearer <jwt-token> # Required if auth_enabled Authorization: Bearer <jwt-token> # Always required when auth_enabled (no anonymous delete)
curl -X DELETE -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile curl -X DELETE -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
# Returns: 204 No Content # Returns: 204 No Content
@@ -304,17 +364,23 @@ clustering_enabled: true
#### Start the Cluster #### Start the Cluster
```bash ```bash
# Terminal 1 # Start as daemons
./kvs node1.yaml ./kvs start node1.yaml
sleep 2
# Terminal 2 (wait a few seconds) ./kvs start node2.yaml
./kvs node2.yaml sleep 2
./kvs start node3.yaml
# Terminal 3 (wait a few seconds)
./kvs node3.yaml
# Verify cluster formation # Verify cluster formation
curl http://localhost:8081/members/ # Should show all 3 nodes curl http://localhost:8081/members/ # Should show all 3 nodes
# Check daemon status
./kvs status
# Stop cluster when done
./kvs stop node1
./kvs stop node2
./kvs stop node3
``` ```
## 🔄 How It Works ## 🔄 How It Works
@@ -360,9 +426,10 @@ go build -o kvs .
./integration_test.sh ./integration_test.sh
# Manual basic functionality test # Manual basic functionality test
./kvs & ./kvs start config.yaml
sleep 2
curl http://localhost:8080/health curl http://localhost:8080/health
pkill kvs ./kvs stop config
# Manual cluster test (requires creating configs) # Manual cluster test (requires creating configs)
echo 'node_id: "test1" echo 'node_id: "test1"
@@ -375,8 +442,9 @@ port: 8082
seed_nodes: ["127.0.0.1:8081"] seed_nodes: ["127.0.0.1:8081"]
auth_enabled: false' > test2.yaml auth_enabled: false' > test2.yaml
./kvs test1.yaml & ./kvs start test1.yaml
./kvs test2.yaml & sleep 2
./kvs start test2.yaml
# Test data replication (wait for cluster formation) # Test data replication (wait for cluster formation)
sleep 10 sleep 10
@@ -389,7 +457,8 @@ sleep 30
curl http://localhost:8082/kv/test/data curl http://localhost:8082/kv/test/data
# Cleanup # Cleanup
pkill kvs ./kvs stop test1
./kvs stop test2
rm test1.yaml test2.yaml rm test1.yaml test2.yaml
``` ```
@@ -414,17 +483,22 @@ auth_enabled: false
log_level: "debug"' > conflict2.yaml log_level: "debug"' > conflict2.yaml
# Start nodes with conflicting data # Start nodes with conflicting data
./kvs conflict1.yaml & ./kvs start conflict1.yaml
./kvs conflict2.yaml & sleep 2
./kvs start conflict2.yaml
# Watch logs for conflict resolution # Watch logs for conflict resolution
tail -f ~/.kvs/logs/kvs_conflict1.yaml.log ~/.kvs/logs/kvs_conflict2.yaml.log &
# Both nodes will converge within ~10-30 seconds # Both nodes will converge within ~10-30 seconds
# Check final state # Check final state
sleep 30 sleep 30
curl http://localhost:9111/kv/test/conflict/data curl http://localhost:9111/kv/test/conflict/data
curl http://localhost:9112/kv/test/conflict/data curl http://localhost:9112/kv/test/conflict/data
pkill kvs # Cleanup
./kvs stop conflict1
./kvs stop conflict2
rm conflict1.yaml conflict2.yaml rm conflict1.yaml conflict2.yaml
``` ```
@@ -470,6 +544,10 @@ kvs/
├── config/ # Configuration management ├── config/ # Configuration management
│ └── config.go # Config loading & defaults │ └── config.go # Config loading & defaults
├── daemon/ # Process management
│ ├── daemonize.go # Background process spawning
│ └── pid.go # PID file management
├── features/ # Utility features ├── features/ # Utility features
│ ├── auth.go # Auth utilities │ ├── auth.go # Auth utilities
│ ├── backup.go # Backup system │ ├── backup.go # Backup system
@@ -532,6 +610,8 @@ type StoredValue struct {
| `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default | | `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default |
| **Feature Toggles** | | **Feature Toggles** |
| `auth_enabled` | JWT authentication system | true | Complete auth/authz system | | `auth_enabled` | JWT authentication system | true | Complete auth/authz system |
| `allow_anonymous_read` | Allow unauthenticated read access | false | When auth_enabled, controls KV GET endpoints |
| `allow_anonymous_write` | Allow unauthenticated write access | false | When auth_enabled, controls KV PUT endpoints |
| `clustering_enabled` | Gossip protocol and sync | true | Distributed mode | | `clustering_enabled` | Gossip protocol and sync | true | Distributed mode |
| `compression_enabled` | ZSTD compression | true | Reduces storage size | | `compression_enabled` | ZSTD compression | true | Reduces storage size |
| `rate_limiting_enabled` | Rate limiting | true | Per-client limits | | `rate_limiting_enabled` | Rate limiting | true | Per-client limits |
@@ -574,8 +654,9 @@ type StoredValue struct {
## 🛡️ Production Considerations ## 🛡️ Production Considerations
### Deployment ### Deployment
- Use systemd or similar for process management - Built-in daemon commands (`start`/`stop`/`restart`/`status`) for process management
- Configure log rotation for JSON logs - Alternatively, use systemd or similar for advanced orchestration
- Logs automatically written to `~/.kvs/logs/` (configure log rotation)
- Set up monitoring for `/health` endpoint - Set up monitoring for `/health` endpoint
- Use reverse proxy (nginx/traefik) for TLS and load balancing - Use reverse proxy (nginx/traefik) for TLS and load balancing

View File

@@ -26,20 +26,22 @@ type AuthContext struct {
type AuthService struct { type AuthService struct {
db *badger.DB db *badger.DB
logger *logrus.Logger logger *logrus.Logger
config *types.Config
} }
// NewAuthService creates a new authentication service // NewAuthService creates a new authentication service
func NewAuthService(db *badger.DB, logger *logrus.Logger) *AuthService { func NewAuthService(db *badger.DB, logger *logrus.Logger, config *types.Config) *AuthService {
return &AuthService{ return &AuthService{
db: db, db: db,
logger: logger, logger: logger,
config: config,
} }
} }
// StoreAPIToken stores an API token in BadgerDB with TTL // StoreAPIToken stores an API token in BadgerDB with TTL
func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error { func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error {
tokenHash := utils.HashToken(tokenString) tokenHash := utils.HashToken(tokenString)
apiToken := types.APIToken{ apiToken := types.APIToken{
TokenHash: tokenHash, TokenHash: tokenHash,
UserUUID: userUUID, UserUUID: userUUID,
@@ -55,13 +57,13 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes
return s.db.Update(func(txn *badger.Txn) error { return s.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData) entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData)
// Set TTL to the token expiration time // Set TTL to the token expiration time
ttl := time.Until(time.Unix(expiresAt, 0)) ttl := time.Until(time.Unix(expiresAt, 0))
if ttl > 0 { if ttl > 0 {
entry = entry.WithTTL(ttl) entry = entry.WithTTL(ttl)
} }
return txn.SetEntry(entry) return txn.SetEntry(entry)
}) })
} }
@@ -69,7 +71,7 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes
// GetAPIToken retrieves an API token from BadgerDB by hash // GetAPIToken retrieves an API token from BadgerDB by hash
func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) { func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) {
var apiToken types.APIToken var apiToken types.APIToken
err := s.db.View(func(txn *badger.Txn) error { err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(TokenStorageKey(tokenHash))) item, err := txn.Get([]byte(TokenStorageKey(tokenHash)))
if err != nil { if err != nil {
@@ -196,6 +198,40 @@ func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember) return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
} }
// GetResourceMetadata retrieves metadata for a resource
func (s *AuthService) GetResourceMetadata(resourceKey string) (*types.ResourceMetadata, error) {
var metadata types.ResourceMetadata
err := s.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey)))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &metadata)
})
})
if err != nil {
return nil, err
}
return &metadata, nil
}
// SetResourceMetadata stores metadata for a resource
func (s *AuthService) SetResourceMetadata(resourceKey string, metadata *types.ResourceMetadata) error {
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %v", err)
}
return s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(ResourceMetadataKey(resourceKey)), metadataBytes)
})
}
// GetAuthContext retrieves auth context from request context // GetAuthContext retrieves auth context from request context
func GetAuthContext(ctx context.Context) *AuthContext { func GetAuthContext(ctx context.Context) *AuthContext {
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok { if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {
@@ -207,22 +243,22 @@ func GetAuthContext(ctx context.Context) *AuthContext {
// HasUsers checks if any users exist in the database // HasUsers checks if any users exist in the database
func (s *AuthService) HasUsers() (bool, error) { func (s *AuthService) HasUsers() (bool, error) {
var hasUsers bool var hasUsers bool
err := s.db.View(func(txn *badger.Txn) error { err := s.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false // We only need to check if keys exist opts.PrefetchValues = false // We only need to check if keys exist
iterator := txn.NewIterator(opts) iterator := txn.NewIterator(opts)
defer iterator.Close() defer iterator.Close()
// Look for any key starting with "user:" // Look for any key starting with "user:"
prefix := []byte("user:") prefix := []byte("user:")
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() { for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
hasUsers = true hasUsers = true
return nil // Found at least one user, can exit early return nil // Found at least one user, can exit early
} }
return nil return nil
}) })
return hasUsers, err return hasUsers, err
} }

77
auth/cluster.go Normal file
View File

@@ -0,0 +1,77 @@
package auth
import (
"net/http"
"github.com/sirupsen/logrus"
)
// ClusterAuthService handles authentication for inter-cluster communication
type ClusterAuthService struct {
clusterSecret string
logger *logrus.Logger
}
// NewClusterAuthService creates a new cluster authentication service
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
return &ClusterAuthService{
clusterSecret: clusterSecret,
logger: logger,
}
}
// Middleware validates cluster authentication headers
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract authentication headers
clusterSecret := r.Header.Get("X-Cluster-Secret")
nodeID := r.Header.Get("X-Node-ID")
// Log authentication attempt
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
"method": r.Method,
}).Debug("Cluster authentication attempt")
// Validate cluster secret
if clusterSecret == "" {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Cluster-Secret header")
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
return
}
if clusterSecret != s.clusterSecret {
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Invalid cluster secret")
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
return
}
// Validate node ID is present
if nodeID == "" {
s.logger.WithFields(logrus.Fields{
"remote_addr": r.RemoteAddr,
"path": r.URL.Path,
}).Warn("Missing X-Node-ID header")
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
return
}
// Authentication successful
s.logger.WithFields(logrus.Fields{
"node_id": nodeID,
"path": r.URL.Path,
}).Debug("Cluster authentication successful")
next.ServeHTTP(w, r)
})
}

View File

@@ -64,4 +64,4 @@ func ValidateJWT(tokenString string) (*JWTClaims, error) {
} }
return nil, fmt.Errorf("invalid token") return nil, fmt.Errorf("invalid token")
} }

View File

@@ -33,7 +33,7 @@ func (s *AuthService) Middleware(requiredScopes []string, resourceKeyExtractor f
next(w, r) next(w, r)
return return
} }
// Authenticate request // Authenticate request
authCtx, err := s.AuthenticateRequest(r) authCtx, err := s.AuthenticateRequest(r)
if err != nil { if err != nil {
@@ -102,7 +102,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
next(w, r) next(w, r)
return return
} }
// Extract auth context to get user UUID // Extract auth context to get user UUID
authCtx := GetAuthContext(r.Context()) authCtx := GetAuthContext(r.Context())
if authCtx == nil { if authCtx == nil {
@@ -110,7 +110,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
next(w, r) next(w, r)
return return
} }
// Check rate limit // Check rate limit
allowed, err := s.checkRateLimit(authCtx.UserUUID) allowed, err := s.checkRateLimit(authCtx.UserUUID)
if err != nil { if err != nil {
@@ -118,31 +118,32 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
http.Error(w, "Internal Server Error", http.StatusInternalServerError) http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return return
} }
if !allowed { if !allowed {
s.authService.logger.WithFields(logrus.Fields{ s.authService.logger.WithFields(logrus.Fields{
"user_uuid": authCtx.UserUUID, "user_uuid": authCtx.UserUUID,
"limit": s.config.RateLimitRequests, "limit": s.config.RateLimitRequests,
"window": s.config.RateLimitWindow, "window": s.config.RateLimitWindow,
}).Info("Rate limit exceeded") }).Info("Rate limit exceeded")
// Set rate limit headers // Set rate limit headers
w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests)) w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests))
w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow) w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow)
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
return return
} }
next(w, r) next(w, r)
} }
} }
// isAuthEnabled checks if authentication is enabled (would be passed from config) // isAuthEnabled checks if authentication is enabled from config
func (s *AuthService) isAuthEnabled() bool { func (s *AuthService) isAuthEnabled() bool {
// This would normally be injected from config, but for now we'll assume enabled if s.config != nil {
// TODO: Inject config dependency return s.config.AuthEnabled
return true }
return true // Default to enabled if no config
} }
// Helper method to check rate limits (simplified version) // Helper method to check rate limits (simplified version)
@@ -150,8 +151,8 @@ func (s *RateLimitService) checkRateLimit(userUUID string) (bool, error) {
if s.config.RateLimitRequests <= 0 { if s.config.RateLimitRequests <= 0 {
return true, nil // Rate limiting disabled return true, nil // Rate limiting disabled
} }
// Simplified rate limiting - in practice this would use the full implementation // Simplified rate limiting - in practice this would use the full implementation
// that was in main.go with proper window calculations and BadgerDB storage // that was in main.go with proper window calculations and BadgerDB storage
return true, nil // For now, always allow return true, nil // For now, always allow
} }

View File

@@ -15,7 +15,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
return (permissions & types.PermGroupCreate) != 0 return (permissions & types.PermGroupCreate) != 0
} }
return (permissions & types.PermOthersCreate) != 0 return (permissions & types.PermOthersCreate) != 0
case "delete": case "delete":
if isOwner { if isOwner {
return (permissions & types.PermOwnerDelete) != 0 return (permissions & types.PermOwnerDelete) != 0
@@ -24,7 +24,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
return (permissions & types.PermGroupDelete) != 0 return (permissions & types.PermGroupDelete) != 0
} }
return (permissions & types.PermOthersDelete) != 0 return (permissions & types.PermOthersDelete) != 0
case "write": case "write":
if isOwner { if isOwner {
return (permissions & types.PermOwnerWrite) != 0 return (permissions & types.PermOwnerWrite) != 0
@@ -33,7 +33,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
return (permissions & types.PermGroupWrite) != 0 return (permissions & types.PermGroupWrite) != 0
} }
return (permissions & types.PermOthersWrite) != 0 return (permissions & types.PermOthersWrite) != 0
case "read": case "read":
if isOwner { if isOwner {
return (permissions & types.PermOwnerRead) != 0 return (permissions & types.PermOwnerRead) != 0
@@ -42,7 +42,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
return (permissions & types.PermGroupRead) != 0 return (permissions & types.PermGroupRead) != 0
} }
return (permissions & types.PermOthersRead) != 0 return (permissions & types.PermOthersRead) != 0
default: default:
return false return false
} }
@@ -51,7 +51,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
// CheckUserResourceRelationship determines user relationship to resource // CheckUserResourceRelationship determines user relationship to resource
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) { func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
isOwner = (userUUID == metadata.OwnerUUID) isOwner = (userUUID == metadata.OwnerUUID)
if metadata.GroupUUID != "" { if metadata.GroupUUID != "" {
for _, groupUUID := range userGroups { for _, groupUUID := range userGroups {
if groupUUID == metadata.GroupUUID { if groupUUID == metadata.GroupUUID {
@@ -60,6 +60,6 @@ func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMeta
} }
} }
} }
return isOwner, isGroupMember return isOwner, isGroupMember
} }

View File

@@ -16,4 +16,4 @@ func TokenStorageKey(tokenHash string) string {
func ResourceMetadataKey(resourceKey string) string { func ResourceMetadataKey(resourceKey string) string {
return resourceKey + ":metadata" return resourceKey + ":metadata"
} }

View File

@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
return false return false
} }
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/members/join", seedAddr) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create join request")
return false
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"seed": seedAddr, "seed": seedAddr,
@@ -142,4 +151,4 @@ func (s *BootstrapService) performGradualSync() {
} }
s.logger.Info("Gradual sync completed") s.logger.Info("Gradual sync completed")
} }

View File

@@ -17,13 +17,13 @@ import (
// GossipService handles gossip protocol operations // GossipService handles gossip protocol operations
type GossipService struct { type GossipService struct {
config *types.Config config *types.Config
members map[string]*types.Member members map[string]*types.Member
membersMu sync.RWMutex membersMu sync.RWMutex
logger *logrus.Logger logger *logrus.Logger
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
} }
// NewGossipService creates a new gossip service // NewGossipService creates a new gossip service
@@ -44,7 +44,7 @@ func (s *GossipService) Start() {
s.logger.Info("Clustering disabled, skipping gossip routine") s.logger.Info("Clustering disabled, skipping gossip routine")
return return
} }
s.wg.Add(1) s.wg.Add(1)
go s.gossipRoutine() go s.gossipRoutine()
} }
@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
return err return err
} }
// Send HTTP request to peer // Send HTTP request to peer with cluster authentication
client := &http.Client{Timeout: 5 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
url := fmt.Sprintf("http://%s/members/gossip", peer.Address) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
s.logger.WithError(err).Error("Failed to create gossip request")
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"peer": peer.Address, "peer": peer.Address,
@@ -300,4 +309,4 @@ func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID
func (s *GossipService) GetJoinedTimestamp() int64 { func (s *GossipService) GetJoinedTimestamp() int64 {
// This should be implemented by the server that uses this service // This should be implemented by the server that uses this service
return time.Now().UnixMilli() return time.Now().UnixMilli()
} }

43
cluster/http_client.go Normal file
View File

@@ -0,0 +1,43 @@
package cluster
import (
"crypto/tls"
"net/http"
"time"
"kvs/types"
)
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
client := &http.Client{
Timeout: timeout,
}
// Configure TLS if enabled
if config.ClusterTLSEnabled {
tlsConfig := &tls.Config{
InsecureSkipVerify: config.ClusterTLSSkipVerify,
}
client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
return client
}
// AddClusterAuthHeaders adds authentication headers to an HTTP request
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
req.Header.Set("X-Node-ID", config.NodeID)
}
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
func GetProtocol(config *types.Config) string {
if config.ClusterTLSEnabled {
return "https"
}
return "http"
}

View File

@@ -170,7 +170,7 @@ func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.Me
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err) return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err)
} }
filteredPairs := FilterPairsByRange(pairs, startKey, endKey) filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
return s.BuildMerkleTreeFromPairs(filteredPairs) return s.BuildMerkleTreeFromPairs(filteredPairs)
} }

View File

@@ -51,11 +51,11 @@ func (s *SyncService) Start() {
s.logger.Info("Clustering disabled, skipping sync routines") s.logger.Info("Clustering disabled, skipping sync routines")
return return
} }
// Start sync routine // Start sync routine
s.wg.Add(1) s.wg.Add(1)
go s.syncRoutine() go s.syncRoutine()
// Start Merkle tree rebuild routine // Start Merkle tree rebuild routine
s.wg.Add(1) s.wg.Add(1)
go s.merkleTreeRebuildRoutine() go s.merkleTreeRebuildRoutine()
@@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() {
// 2. Compare roots and start recursive diffing if they differ // 2. Compare roots and start recursive diffing if they differ
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) { if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"peer": peer.Address, "peer": peer.Address,
"local_root": hex.EncodeToString(localRoot.Hash), "local_root": hex.EncodeToString(localRoot.Hash),
"remote_root": hex.EncodeToString(remoteRoot.Hash), "remote_root": hex.EncodeToString(remoteRoot.Hash),
}).Info("Merkle roots differ, starting recursive diff") }).Info("Merkle roots differ, starting recursive diff")
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot) s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
} else { } else {
@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
// requestMerkleRoot requests the Merkle root from a peer // requestMerkleRoot requests the Merkle root from a peer
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) { func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
resp, err := client.Get(url) req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re
// Hashes differ, need to go deeper. // Hashes differ, need to go deeper.
// Request children from the remote peer for the current range. // Request children from the remote peer for the current range.
req := types.MerkleTreeDiffRequest{ req := types.MerkleTreeDiffRequest{
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
LocalHash: localNode.Hash, // Our hash for this range LocalHash: localNode.Hash, // Our hash for this range
} }
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
// fetchSingleKVFromPeer fetches a single KV pair from a peer // fetchSingleKVFromPeer fetches a single KV pair from a peer
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) { func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
client := &http.Client{Timeout: 5 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
resp, err := client.Get(url) req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -398,14 +412,14 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
// Timestamps are equal - need sophisticated conflict resolution // Timestamps are equal - need sophisticated conflict resolution
s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule") s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule")
// Get cluster members to determine which node is older // Get cluster members to determine which node is older
members := s.gossipService.GetMembers() members := s.gossipService.GetMembers()
// Find the local node and the remote node in membership // Find the local node and the remote node in membership
var localMember, remoteMember *types.Member var localMember, remoteMember *types.Member
localNodeID := s.config.NodeID localNodeID := s.config.NodeID
for _, member := range members { for _, member := range members {
if member.ID == localNodeID { if member.ID == localNodeID {
localMember = member localMember = member
@@ -414,16 +428,16 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
remoteMember = member remoteMember = member
} }
} }
// If we can't find membership info, fall back to UUID comparison for deterministic result // If we can't find membership info, fall back to UUID comparison for deterministic result
if localMember == nil || remoteMember == nil { if localMember == nil || remoteMember == nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"key": key, "key": key,
"peerAddress": peerAddress, "peerAddress": peerAddress,
"localNodeID": localNodeID, "localNodeID": localNodeID,
"localMember": localMember != nil, "localMember": localMember != nil,
"remoteMember": remoteMember != nil, "remoteMember": remoteMember != nil,
"totalMembers": len(members), "totalMembers": len(members),
}).Warn("Could not find membership info for conflict resolution, using UUID comparison") }).Warn("Could not find membership info for conflict resolution, using UUID comparison")
if remote.UUID < local.UUID { if remote.UUID < local.UUID {
// Remote UUID lexically smaller (deterministic choice) // Remote UUID lexically smaller (deterministic choice)
@@ -436,41 +450,49 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)") s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)")
return nil return nil
} }
// Apply oldest-node rule: node with earliest joined_timestamp wins // Apply oldest-node rule: node with earliest joined_timestamp wins
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp { if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
// Remote node is older, its data wins // Remote node is older, its data wins
err := s.storeReplicatedDataWithMetadata(key, remote) err := s.storeReplicatedDataWithMetadata(key, remote)
if err == nil { if err == nil {
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"key": key, "key": key,
"local_joined": localMember.JoinedTimestamp, "local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp, "remote_joined": remoteMember.JoinedTimestamp,
}).Info("Conflict resolved: remote data wins (oldest-node rule)") }).Info("Conflict resolved: remote data wins (oldest-node rule)")
} }
return err return err
} }
// Local node is older or equal, keep local data // Local node is older or equal, keep local data
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"key": key, "key": key,
"local_joined": localMember.JoinedTimestamp, "local_joined": localMember.JoinedTimestamp,
"remote_joined": remoteMember.JoinedTimestamp, "remote_joined": remoteMember.JoinedTimestamp,
}).Info("Conflict resolved: local data wins (oldest-node rule)") }).Info("Conflict resolved: local data wins (oldest-node rule)")
return nil return nil
} }
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer // requestMerkleDiff requests children hashes or keys for a given node/range from a peer
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) { func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
jsonData, err := json.Marshal(req) jsonData, err := json.Marshal(reqData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
client := &http.Client{Timeout: 10 * time.Second} client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally // fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error { func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
req := types.KVRangeRequest{ reqData := types.KVRangeRequest{
StartKey: startKey, StartKey: startKey,
EndKey: endKey, EndKey: endKey,
Limit: 0, // No limit Limit: 0, // No limit
} }
jsonData, err := json.Marshal(req) jsonData, err := json.Marshal(reqData)
if err != nil { if err != nil {
return err return err
} }
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
url := fmt.Sprintf("http://%s/kv_range", peerAddress) protocol := GetProtocol(s.config)
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
AddClusterAuthHeaders(req, s.config)
resp, err := client.Do(req)
if err != nil { if err != nil {
return err return err
} }
@@ -568,4 +598,4 @@ func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey st
} }
} }
return nil return nil
} }

View File

@@ -1,12 +1,14 @@
package config package config
import ( import (
"crypto/rand"
"encoding/base64"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"kvs/types"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"kvs/types"
) )
// Default configuration // Default configuration
@@ -27,37 +29,61 @@ func Default() *types.Config {
BootstrapMaxAgeHours: 720, // 30 days BootstrapMaxAgeHours: 720, // 30 days
ThrottleDelayMs: 100, ThrottleDelayMs: 100,
FetchDelayMs: 50, FetchDelayMs: 50,
// Default compression settings // Default compression settings
CompressionEnabled: true, CompressionEnabled: true,
CompressionLevel: 3, // Balance between performance and compression ratio CompressionLevel: 3, // Balance between performance and compression ratio
// Default TTL and size limit settings // Default TTL and size limit settings
DefaultTTL: "0", // No default TTL DefaultTTL: "0", // No default TTL
MaxJSONSize: 1048576, // 1MB default max JSON size MaxJSONSize: 1048576, // 1MB default max JSON size
// Default rate limiting settings // Default rate limiting settings
RateLimitRequests: 100, // 100 requests per window RateLimitRequests: 100, // 100 requests per window
RateLimitWindow: "1m", // 1 minute window RateLimitWindow: "1m", // 1 minute window
// Default tamper-evident logging settings // Default tamper-evident logging settings
TamperLogActions: []string{"data_write", "user_create", "auth_failure"}, TamperLogActions: []string{"data_write", "user_create", "auth_failure"},
// Default backup system settings // Default backup system settings
BackupEnabled: true, BackupEnabled: true,
BackupSchedule: "0 0 * * *", // Daily at midnight BackupSchedule: "0 0 * * *", // Daily at midnight
BackupPath: "./backups", BackupPath: "./backups",
BackupRetention: 7, // Keep backups for 7 days BackupRetention: 7, // Keep backups for 7 days
// Default feature toggle settings (all enabled by default) // Default feature toggle settings (all enabled by default)
AuthEnabled: true, AuthEnabled: true,
TamperLoggingEnabled: true, TamperLoggingEnabled: true,
ClusteringEnabled: true, ClusteringEnabled: true,
RateLimitingEnabled: true, RateLimitingEnabled: true,
RevisionHistoryEnabled: true, RevisionHistoryEnabled: true,
// Default anonymous access settings (both disabled by default for security)
AllowAnonymousRead: false,
AllowAnonymousWrite: false,
// Default cluster authentication settings (Issue #13)
ClusterSecret: generateClusterSecret(),
ClusterTLSEnabled: false,
ClusterTLSCertFile: "",
ClusterTLSKeyFile: "",
ClusterTLSSkipVerify: false,
} }
} }
// generateClusterSecret generates a cryptographically secure random cluster secret
func generateClusterSecret() string {
// Generate 32 bytes (256 bits) of random data
randomBytes := make([]byte, 32)
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to a warning - this should never happen in practice
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
return ""
}
// Encode as base64 for easy configuration file storage
return base64.StdEncoding.EncodeToString(randomBytes)
}
// Load configuration from file or create default // Load configuration from file or create default
func Load(configPath string) (*types.Config, error) { func Load(configPath string) (*types.Config, error) {
config := Default() config := Default()
@@ -90,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
return nil, fmt.Errorf("failed to parse config file: %v", err) return nil, fmt.Errorf("failed to parse config file: %v", err)
} }
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
if config.ClusteringEnabled && config.ClusterSecret == "" {
config.ClusterSecret = generateClusterSecret()
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
}
return config, nil return config, nil
} }

87
daemon/daemonize.go Normal file
View File

@@ -0,0 +1,87 @@
package daemon
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"syscall"
)
// GetLogFilePath returns the log file path for a given config file
func GetLogFilePath(configPath string) (string, error) {
logDir, err := getLogDir()
if err != nil {
return "", err
}
absConfigPath, err := filepath.Abs(configPath)
if err != nil {
return "", fmt.Errorf("failed to get absolute config path: %w", err)
}
basename := filepath.Base(configPath)
name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename
return filepath.Join(logDir, name+".log"), nil
}
// Daemonize spawns the process as a daemon and returns
func Daemonize(configPath string) error {
// Get absolute path to the current executable
executable, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
// Get absolute path to config
absConfigPath, err := filepath.Abs(configPath)
if err != nil {
return fmt.Errorf("failed to get absolute config path: %w", err)
}
// Check if already running
_, running, err := ReadPID(configPath)
if err != nil {
return fmt.Errorf("failed to check if instance is running: %w", err)
}
if running {
return fmt.Errorf("instance is already running")
}
// Spawn the process in background with --daemon flag
cmd := exec.Command(executable, "--daemon", absConfigPath)
cmd.SysProcAttr = &syscall.SysProcAttr{
Setsid: true, // Create new session
}
// Redirect stdout/stderr to log file
logDir, err := getLogDir()
if err != nil {
return fmt.Errorf("failed to get log directory: %w", err)
}
if err := os.MkdirAll(logDir, 0755); err != nil {
return fmt.Errorf("failed to create log directory: %w", err)
}
basename := filepath.Base(configPath)
name := filepath.Base(filepath.Dir(absConfigPath)) + "_" + basename
logFile := filepath.Join(logDir, name+".log")
f, err := os.OpenFile(logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open log file: %w", err)
}
defer f.Close()
cmd.Stdout = f
cmd.Stderr = f
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start daemon: %w", err)
}
fmt.Printf("Started KVS instance '%s' (PID will be written by daemon)\n", filepath.Base(configPath))
fmt.Printf("Logs: %s\n", logFile)
return nil
}

171
daemon/pid.go Normal file
View File

@@ -0,0 +1,171 @@
package daemon
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
)
// getPIDDir returns the absolute path to the PID directory
func getPIDDir() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to get user home directory: %w", err)
}
return filepath.Join(homeDir, ".kvs", "pids"), nil
}
// getLogDir returns the absolute path to the log directory
func getLogDir() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to get user home directory: %w", err)
}
return filepath.Join(homeDir, ".kvs", "logs"), nil
}
// GetPIDFilePath returns the PID file path for a given config file
func GetPIDFilePath(configPath string) string {
pidDir, err := getPIDDir()
if err != nil {
// Fallback to local directory
pidDir = ".kvs/pids"
}
// Extract basename without extension
basename := filepath.Base(configPath)
name := strings.TrimSuffix(basename, filepath.Ext(basename))
return filepath.Join(pidDir, name+".pid")
}
// EnsurePIDDir creates the PID directory if it doesn't exist
func EnsurePIDDir() error {
pidDir, err := getPIDDir()
if err != nil {
return err
}
return os.MkdirAll(pidDir, 0755)
}
// WritePID writes the current process PID to a file
func WritePID(configPath string) error {
if err := EnsurePIDDir(); err != nil {
return fmt.Errorf("failed to create PID directory: %w", err)
}
pidFile := GetPIDFilePath(configPath)
pid := os.Getpid()
return os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", pid)), 0644)
}
// ReadPID reads the PID from a file and checks if the process is running
func ReadPID(configPath string) (int, bool, error) {
pidFile := GetPIDFilePath(configPath)
data, err := os.ReadFile(pidFile)
if err != nil {
if os.IsNotExist(err) {
return 0, false, nil
}
return 0, false, fmt.Errorf("failed to read PID file: %w", err)
}
pidStr := strings.TrimSpace(string(data))
pid, err := strconv.Atoi(pidStr)
if err != nil {
return 0, false, fmt.Errorf("invalid PID in file: %w", err)
}
// Check if process is actually running
process, err := os.FindProcess(pid)
if err != nil {
return pid, false, nil
}
// Send signal 0 to check if process exists
err = process.Signal(syscall.Signal(0))
if err != nil {
return pid, false, nil
}
return pid, true, nil
}
// RemovePID removes the PID file
func RemovePID(configPath string) error {
pidFile := GetPIDFilePath(configPath)
err := os.Remove(pidFile)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove PID file: %w", err)
}
return nil
}
// ListRunningInstances returns a list of running KVS instances
func ListRunningInstances() ([]InstanceInfo, error) {
var instances []InstanceInfo
pidDir, err := getPIDDir()
if err != nil {
return nil, err
}
// Check if PID directory exists
if _, err := os.Stat(pidDir); os.IsNotExist(err) {
return instances, nil
}
entries, err := os.ReadDir(pidDir)
if err != nil {
return nil, fmt.Errorf("failed to read PID directory: %w", err)
}
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pid") {
continue
}
name := strings.TrimSuffix(entry.Name(), ".pid")
configPath := name + ".yaml" // Assume .yaml extension
pid, running, err := ReadPID(configPath)
if err != nil {
continue
}
instances = append(instances, InstanceInfo{
Name: name,
PID: pid,
Running: running,
})
}
return instances, nil
}
// InstanceInfo holds information about a KVS instance
type InstanceInfo struct {
Name string
PID int
Running bool
}
// StopProcess stops a process by PID
func StopProcess(pid int) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process: %w", err)
}
// Try graceful shutdown first (SIGTERM)
if err := process.Signal(syscall.SIGTERM); err != nil {
return fmt.Errorf("failed to send SIGTERM: %w", err)
}
return nil
}

View File

@@ -99,4 +99,4 @@ func ExtractKVResourceKey(r *http.Request) string {
return path return path
} }
return "" return ""
} }

View File

@@ -8,4 +8,4 @@ import (
// GetBackupFilename generates a filename for a backup // GetBackupFilename generates a filename for a backup
func GetBackupFilename(timestamp time.Time) string { func GetBackupFilename(timestamp time.Time) string {
return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02")) return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02"))
} }

View File

@@ -1,4 +1,4 @@
// Package features provides utility functions for KVS authentication, validation, // Package features provides utility functions for KVS authentication, validation,
// logging, backup, and other operational features. These functions were extracted // logging, backup, and other operational features. These functions were extracted
// from main.go to improve code organization and maintainability. // from main.go to improve code organization and maintainability.
package features package features

View File

@@ -5,4 +5,4 @@ import "fmt"
// GetRateLimitKey generates the storage key for rate limiting // GetRateLimitKey generates the storage key for rate limiting
func GetRateLimitKey(userUUID string, windowStart int64) string { func GetRateLimitKey(userUUID string, windowStart int64) string {
return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart) return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart)
} }

View File

@@ -5,4 +5,4 @@ import "fmt"
// GetRevisionKey generates the storage key for a specific revision // GetRevisionKey generates the storage key for a specific revision
func GetRevisionKey(baseKey string, revision int) string { func GetRevisionKey(baseKey string, revision int) string {
return fmt.Sprintf("%s:rev:%d", baseKey, revision) return fmt.Sprintf("%s:rev:%d", baseKey, revision)
} }

View File

@@ -21,4 +21,4 @@ func GenerateLogSignature(timestamp, action, userUUID, resource string) string {
// Concatenate all fields in a deterministic order // Concatenate all fields in a deterministic order
data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource) data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource)
return utils.HashSHA3512(data) return utils.HashSHA3512(data)
} }

View File

@@ -21,4 +21,4 @@ func ParseTTL(ttlString string) (time.Duration, error) {
} }
return duration, nil return duration, nil
} }

View File

@@ -45,6 +45,7 @@ cleanup() {
log_info "Cleaning up test environment..." log_info "Cleaning up test environment..."
pkill -f "$BINARY" 2>/dev/null || true pkill -f "$BINARY" 2>/dev/null || true
rm -rf "$TEST_DIR" 2>/dev/null || true rm -rf "$TEST_DIR" 2>/dev/null || true
rm -rf "$HOME/.kvs" 2>/dev/null || true # Clean up PID and log files from home dir
sleep 2 # Allow processes to fully terminate sleep 2 # Allow processes to fully terminate
} }
@@ -53,7 +54,7 @@ wait_for_service() {
local port=$1 local port=$1
local timeout=${2:-30} local timeout=${2:-30}
local count=0 local count=0
while [ $count -lt $timeout ]; do while [ $count -lt $timeout ]; do
if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then
return 0 return 0
@@ -64,6 +65,15 @@ wait_for_service() {
return 1 return 1
} }
# Get log file path for a config file (matches daemon naming convention)
get_log_file() {
local config=$1
local abs_path=$(realpath "$config")
local basename=$(basename "$config")
local dirname=$(basename $(dirname "$abs_path"))
echo "$HOME/.kvs/logs/${dirname}_${basename}.log"
}
# Test 1: Build verification # Test 1: Build verification
test_build() { test_build() {
test_start "Binary build verification" test_start "Binary build verification"
@@ -82,7 +92,7 @@ test_build() {
# Test 2: Basic functionality # Test 2: Basic functionality
test_basic_functionality() { test_basic_functionality() {
test_start "Basic functionality test" test_start "Basic functionality test"
# Create basic config # Create basic config
cat > basic.yaml <<EOF cat > basic.yaml <<EOF
node_id: "basic-test" node_id: "basic-test"
@@ -91,21 +101,23 @@ port: 8090
data_dir: "./basic_data" data_dir: "./basic_data"
seed_nodes: [] seed_nodes: []
log_level: "error" log_level: "error"
allow_anonymous_read: true
allow_anonymous_write: true
EOF EOF
# Start node # Start node using daemon command
$BINARY basic.yaml >/dev/null 2>&1 & $BINARY start basic.yaml >/dev/null 2>&1
local pid=$! sleep 2
if wait_for_service 8090; then if wait_for_service 8090; then
# Test basic CRUD # Test basic CRUD
local put_result=$(curl -s -X PUT http://localhost:8090/kv/test/basic \ local put_result=$(curl -s -X PUT http://localhost:8090/kv/test/basic \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{"message":"hello world"}') -d '{"message":"hello world"}')
local get_result=$(curl -s http://localhost:8090/kv/test/basic) local get_result=$(curl -s http://localhost:8090/kv/test/basic)
local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) # Adjusted jq path local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null)
if [ "$message" = "hello world" ]; then if [ "$message" = "hello world" ]; then
log_success "Basic CRUD operations work" log_success "Basic CRUD operations work"
else else
@@ -114,15 +126,18 @@ EOF
else else
log_error "Basic test node failed to start" log_error "Basic test node failed to start"
fi fi
kill $pid 2>/dev/null || true $BINARY stop basic.yaml >/dev/null 2>&1
sleep 2 sleep 1
} }
# Test 3: Cluster formation # Test 3: Cluster formation
test_cluster_formation() { test_cluster_formation() {
test_start "2-node cluster formation and Merkle Tree replication" test_start "2-node cluster formation and Merkle Tree replication"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
# Node 1 config # Node 1 config
cat > cluster1.yaml <<EOF cat > cluster1.yaml <<EOF
node_id: "cluster-1" node_id: "cluster-1"
@@ -134,8 +149,11 @@ log_level: "error"
gossip_interval_min: 5 gossip_interval_min: 5
gossip_interval_max: 10 gossip_interval_max: 10
sync_interval: 10 sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Node 2 config # Node 2 config
cat > cluster2.yaml <<EOF cat > cluster2.yaml <<EOF
node_id: "cluster-2" node_id: "cluster-2"
@@ -147,25 +165,27 @@ log_level: "error"
gossip_interval_min: 5 gossip_interval_min: 5
gossip_interval_max: 10 gossip_interval_max: 10
sync_interval: 10 sync_interval: 10
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Start nodes # Start nodes using daemon commands
$BINARY cluster1.yaml >/dev/null 2>&1 & $BINARY start cluster1.yaml >/dev/null 2>&1
local pid1=$! sleep 2
if ! wait_for_service 8101; then if ! wait_for_service 8101; then
log_error "Cluster node 1 failed to start" log_error "Cluster node 1 failed to start"
kill $pid1 2>/dev/null || true $BINARY stop cluster1.yaml >/dev/null 2>&1
return 1 return 1
fi fi
sleep 2 # Give node 1 a moment to fully initialize $BINARY start cluster2.yaml >/dev/null 2>&1
$BINARY cluster2.yaml >/dev/null 2>&1 & sleep 2
local pid2=$!
if ! wait_for_service 8102; then if ! wait_for_service 8102; then
log_error "Cluster node 2 failed to start" log_error "Cluster node 2 failed to start"
kill $pid1 $pid2 2>/dev/null || true $BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1
return 1 return 1
fi fi
@@ -213,9 +233,9 @@ EOF
else else
log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)" log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)"
fi fi
kill $pid1 $pid2 2>/dev/null || true $BINARY stop cluster1.yaml cluster2.yaml >/dev/null 2>&1
sleep 2 sleep 1
} }
# Test 4: Conflict resolution (Merkle Tree based) # Test 4: Conflict resolution (Merkle Tree based)
@@ -224,15 +244,18 @@ EOF
# but same path. The Merkle tree sync should then trigger conflict resolution. # but same path. The Merkle tree sync should then trigger conflict resolution.
test_conflict_resolution() { test_conflict_resolution() {
test_start "Conflict resolution test (Merkle Tree based)" test_start "Conflict resolution test (Merkle Tree based)"
# Create conflicting data using our utility # Create conflicting data using our utility
rm -rf conflict1_data conflict2_data 2>/dev/null || true rm -rf conflict1_data conflict2_data 2>/dev/null || true
mkdir -p conflict1_data conflict2_data mkdir -p conflict1_data conflict2_data
cd "$SCRIPT_DIR" cd "$SCRIPT_DIR"
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
cd "$TEST_DIR" cd "$TEST_DIR"
# Shared cluster secret for authentication (Issue #13)
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
# Create configs # Create configs
cat > conflict1.yaml <<EOF cat > conflict1.yaml <<EOF
node_id: "conflict-1" node_id: "conflict-1"
@@ -242,8 +265,11 @@ data_dir: "./conflict1_data"
seed_nodes: [] seed_nodes: []
log_level: "info" log_level: "info"
sync_interval: 3 sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
cat > conflict2.yaml <<EOF cat > conflict2.yaml <<EOF
node_id: "conflict-2" node_id: "conflict-2"
bind_address: "127.0.0.1" bind_address: "127.0.0.1"
@@ -252,18 +278,20 @@ data_dir: "./conflict2_data"
seed_nodes: ["127.0.0.1:8111"] seed_nodes: ["127.0.0.1:8111"]
log_level: "info" log_level: "info"
sync_interval: 3 sync_interval: 3
allow_anonymous_read: true
allow_anonymous_write: true
cluster_secret: "$CLUSTER_SECRET"
EOF EOF
# Start nodes # Start nodes using daemon commands
# Node 1 started first, making it "older" for tie-breaker if timestamps are equal # Node 1 started first, making it "older" for tie-breaker if timestamps are equal
"$BINARY" conflict1.yaml >conflict1.log 2>&1 & $BINARY start conflict1.yaml >/dev/null 2>&1
local pid1=$! sleep 2
if wait_for_service 8111; then if wait_for_service 8111; then
$BINARY start conflict2.yaml >/dev/null 2>&1
sleep 2 sleep 2
$BINARY conflict2.yaml >conflict2.log 2>&1 &
local pid2=$!
if wait_for_service 8112; then if wait_for_service 8112; then
# Get initial data (full StoredValue) # Get initial data (full StoredValue)
local node1_initial_full=$(curl -s http://localhost:8111/kv/test/conflict/data) local node1_initial_full=$(curl -s http://localhost:8111/kv/test/conflict/data)
@@ -324,8 +352,10 @@ EOF
log_error "Resolved data has inconsistent UUID/Timestamp: N1_UUID=$node1_final_uuid, N1_TS=$node1_final_timestamp, N2_UUID=$node2_final_uuid, N2_TS=$node2_final_timestamp" log_error "Resolved data has inconsistent UUID/Timestamp: N1_UUID=$node1_final_uuid, N1_TS=$node1_final_timestamp, N2_UUID=$node2_final_uuid, N2_TS=$node2_final_timestamp"
fi fi
# Optionally, check logs for conflict resolution messages # Check logs for conflict resolution messages
if grep -q "Conflict resolved" conflict1.log conflict2.log 2>/dev/null; then local log1=$(get_log_file conflict1.yaml)
local log2=$(get_log_file conflict2.yaml)
if grep -q "Conflict resolved" "$log1" "$log2" 2>/dev/null; then
log_success "Conflict resolution messages found in logs" log_success "Conflict resolution messages found in logs"
else else
log_error "No 'Conflict resolved' messages found in logs, but data converged." log_error "No 'Conflict resolved' messages found in logs, but data converged."
@@ -337,20 +367,244 @@ EOF
else else
log_error "Conflict node 2 failed to start" log_error "Conflict node 2 failed to start"
fi fi
kill $pid2 2>/dev/null || true $BINARY stop conflict2.yaml >/dev/null 2>&1
else else
log_error "Conflict node 1 failed to start" log_error "Conflict node 1 failed to start"
fi fi
kill $pid1 2>/dev/null || true $BINARY stop conflict1.yaml >/dev/null 2>&1
sleep 2 sleep 1
else else
cd "$TEST_DIR" cd "$TEST_DIR"
log_error "Failed to create conflict test data. Ensure test_conflict.go is correct." log_error "Failed to create conflict test data. Ensure test_conflict.go is correct."
fi fi
} }
# Test 5: Authentication middleware (Issue #4)
test_authentication_middleware() {
test_start "Authentication middleware test (Issue #4)"
# Create auth test config
cat > auth_test.yaml <<EOF
node_id: "auth-test"
bind_address: "127.0.0.1"
port: 8095
data_dir: "./auth_test_data"
seed_nodes: []
log_level: "error"
auth_enabled: true
allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node using daemon command
$BINARY start auth_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start and root account creation
if wait_for_service 8095; then
# Extract the token from logs
local log_file=$(get_log_file auth_test.yaml)
local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
$BINARY stop auth_test.yaml >/dev/null 2>&1
return
fi
# Test 1: Admin endpoints should fail without authentication
local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}')
if echo "$no_auth_response" | grep -q "Unauthorized"; then
log_success "Admin endpoints properly reject unauthenticated requests"
else
log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response"
fi
# Test 2: Admin endpoints should work with valid authentication
local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}')
if echo "$auth_response" | grep -q "uuid"; then
log_success "Admin endpoints work with valid authentication"
else
log_error "Admin endpoints should work with authentication, got: $auth_response"
fi
# Test 3: KV endpoints should require auth when anonymous access is disabled
local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}')
if echo "$kv_no_auth" | grep -q "Unauthorized"; then
log_success "KV endpoints properly require authentication when anonymous access disabled"
else
log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth"
fi
# Test 4: KV endpoints should work with valid authentication
local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}')
if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then
log_success "KV endpoints work with valid authentication"
else
log_error "KV endpoints should work with authentication, got: $kv_auth"
fi
$BINARY stop auth_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Auth test node failed to start"
$BINARY stop auth_test.yaml >/dev/null 2>&1
fi
}
# Test 6: Resource Metadata Management (Issue #12)
test_metadata_management() {
test_start "Resource Metadata Management test (Issue #12)"
# Create metadata test config
cat > metadata_test.yaml <<EOF
node_id: "metadata-test"
bind_address: "127.0.0.1"
port: 8096
data_dir: "./metadata_test_data"
seed_nodes: []
log_level: "error"
auth_enabled: true
allow_anonymous_read: false
allow_anonymous_write: false
EOF
# Start node using daemon command
$BINARY start metadata_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start and root account creation
if wait_for_service 8096; then
# Extract the token from logs
local log_file=$(get_log_file metadata_test.yaml)
local token=$(grep "Token:" "$log_file" | sed 's/.*Token: //' | tr -d '\n\r')
if [ -z "$token" ]; then
log_error "Failed to extract authentication token from logs"
$BINARY stop metadata_test.yaml >/dev/null 2>&1
return
fi
# First, create a KV resource
curl -s -X PUT http://localhost:8096/kv/test/resource -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"data":"test"}' >/dev/null
sleep 1
# Test 1: Get metadata should fail for non-existent metadata (initially no metadata exists)
local get_response=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
local get_body=$(echo "$get_response" | head -n -1)
local get_code=$(echo "$get_response" | tail -n 1)
if [ "$get_code" = "404" ]; then
log_success "GET metadata returns 404 for non-existent metadata"
else
log_error "GET metadata should return 404 for non-existent metadata, got code: $get_code, body: $get_body"
fi
# Test 2: Update metadata should create new metadata
local update_response=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"test-owner-123","permissions":3840}')
if echo "$update_response" | grep -q "owner_uuid"; then
log_success "PUT metadata creates metadata successfully"
else
log_error "PUT metadata should create metadata, got: $update_response"
fi
# Test 3: Get metadata should now return the created metadata
local get_response2=$(curl -s -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
if echo "$get_response2" | grep -q "test-owner-123" && echo "$get_response2" | grep -q "3840"; then
log_success "GET metadata returns created metadata"
else
log_error "GET metadata should return created metadata, got: $get_response2"
fi
# Test 4: Update metadata should modify existing metadata
local update_response2=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"new-owner-456"}')
if echo "$update_response2" | grep -q "new-owner-456"; then
log_success "PUT metadata updates existing metadata"
else
log_error "PUT metadata should update metadata, got: $update_response2"
fi
# Test 5: Metadata endpoints should require authentication
local no_auth=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata)
local no_auth_code=$(echo "$no_auth" | tail -n 1)
if [ "$no_auth_code" = "401" ]; then
log_success "Metadata endpoints properly require authentication"
else
log_error "Metadata endpoints should require authentication, got code: $no_auth_code"
fi
$BINARY stop metadata_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Metadata test node failed to start"
$BINARY stop metadata_test.yaml >/dev/null 2>&1
fi
}
# Test 7: Daemon commands (start, stop, status, restart)
test_daemon_commands() {
test_start "Daemon command tests (start, stop, status, restart)"
# Create daemon test config
cat > daemon_test.yaml <<EOF
node_id: "daemon-test"
bind_address: "127.0.0.1"
port: 8097
data_dir: "./daemon_test_data"
seed_nodes: []
log_level: "error"
allow_anonymous_read: true
allow_anonymous_write: true
EOF
# Test 1: Start command
$BINARY start daemon_test.yaml >/dev/null 2>&1
sleep 3 # Allow daemon to start
if wait_for_service 8097 5; then
log_success "Daemon 'start' command works"
# Test 2: Status command shows running
local status_output=$($BINARY status daemon_test.yaml 2>&1)
if echo "$status_output" | grep -q "RUNNING"; then
log_success "Daemon 'status' command shows RUNNING"
else
log_error "Daemon 'status' should show RUNNING, got: $status_output"
fi
# Test 3: Stop command
$BINARY stop daemon_test.yaml >/dev/null 2>&1
sleep 2
# Check that service is actually stopped
if ! curl -s "http://localhost:8097/health" >/dev/null 2>&1; then
log_success "Daemon 'stop' command works"
else
log_error "Daemon should be stopped but is still responding"
fi
# Test 4: Restart command
$BINARY restart daemon_test.yaml >/dev/null 2>&1
sleep 3
if wait_for_service 8097 5; then
log_success "Daemon 'restart' command works"
# Clean up
$BINARY stop daemon_test.yaml >/dev/null 2>&1
sleep 1
else
log_error "Daemon 'restart' failed to start service"
fi
else
log_error "Daemon 'start' command failed"
fi
# Ensure cleanup
pkill -f "daemon_test.yaml" 2>/dev/null || true
sleep 1
}
# Main test execution # Main test execution
main() { main() {
echo "==================================================" echo "=================================================="
@@ -368,7 +622,10 @@ main() {
test_basic_functionality test_basic_functionality
test_cluster_formation test_cluster_formation
test_conflict_resolution test_conflict_resolution
test_authentication_middleware
test_metadata_management
test_daemon_commands
# Results # Results
echo "==================================================" echo "=================================================="
echo " Test Results" echo " Test Results"

208
main.go
View File

@@ -6,26 +6,90 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"path/filepath"
"strings"
"kvs/config" "kvs/config"
"kvs/daemon"
"kvs/server" "kvs/server"
) )
func main() { func main() {
configPath := "./config.yaml" if len(os.Args) < 2 {
// No arguments - run in foreground with default config
// Simple CLI argument parsing runServer("./config.yaml", false)
if len(os.Args) > 1 { return
configPath = os.Args[1]
} }
// Check if this is a daemon spawn
if os.Args[1] == "--daemon" {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Error: --daemon flag requires config path\n")
os.Exit(1)
}
runServer(os.Args[2], true)
return
}
// Parse subcommand
command := os.Args[1]
switch command {
case "start":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs start <config>\n")
os.Exit(1)
}
cmdStart(normalizeConfigPath(os.Args[2]))
case "stop":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs stop <config>\n")
os.Exit(1)
}
cmdStop(normalizeConfigPath(os.Args[2]))
case "restart":
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: kvs restart <config>\n")
os.Exit(1)
}
cmdRestart(normalizeConfigPath(os.Args[2]))
case "status":
if len(os.Args) > 2 {
cmdStatusSingle(normalizeConfigPath(os.Args[2]))
} else {
cmdStatusAll()
}
case "help", "--help", "-h":
printHelp()
default:
// Backward compatibility: assume it's a config file path
runServer(command, false)
}
}
func runServer(configPath string, isDaemon bool) {
cfg, err := config.Load(configPath) cfg, err := config.Load(configPath)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err) fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err)
os.Exit(1) os.Exit(1)
} }
// Write PID file if running as daemon
if isDaemon {
if err := daemon.WritePID(configPath); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write PID file: %v\n", err)
os.Exit(1)
}
defer daemon.RemovePID(configPath)
}
kvServer, err := server.NewServer(cfg) kvServer, err := server.NewServer(cfg)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err) fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err)
@@ -46,3 +110,135 @@ func main() {
os.Exit(1) os.Exit(1)
} }
} }
func cmdStart(configPath string) {
if err := daemon.Daemonize(configPath); err != nil {
fmt.Fprintf(os.Stderr, "Failed to start: %v\n", err)
os.Exit(1)
}
}
func cmdStop(configPath string) {
pid, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err)
os.Exit(1)
}
if !running {
fmt.Printf("Instance '%s' is not running\n", configPath)
// Clean up stale PID file
daemon.RemovePID(configPath)
return
}
fmt.Printf("Stopping instance '%s' (PID %d)...\n", configPath, pid)
if err := daemon.StopProcess(pid); err != nil {
fmt.Fprintf(os.Stderr, "Failed to stop process: %v\n", err)
os.Exit(1)
}
// Wait a bit and verify it stopped
time.Sleep(1 * time.Second)
_, stillRunning, _ := daemon.ReadPID(configPath)
if stillRunning {
fmt.Printf("Warning: Process may still be running\n")
} else {
daemon.RemovePID(configPath)
fmt.Printf("Stopped successfully\n")
}
}
func cmdRestart(configPath string) {
// Check if running
_, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to check status: %v\n", err)
os.Exit(1)
}
if running {
cmdStop(configPath)
// Wait a bit for clean shutdown
time.Sleep(2 * time.Second)
}
cmdStart(configPath)
}
func cmdStatusSingle(configPath string) {
pid, running, err := daemon.ReadPID(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read PID: %v\n", err)
os.Exit(1)
}
if running {
fmt.Printf("Instance '%s': RUNNING (PID %d)\n", configPath, pid)
} else if pid > 0 {
fmt.Printf("Instance '%s': STOPPED (stale PID %d)\n", configPath, pid)
} else {
fmt.Printf("Instance '%s': STOPPED\n", configPath)
}
}
func cmdStatusAll() {
instances, err := daemon.ListRunningInstances()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to list instances: %v\n", err)
os.Exit(1)
}
if len(instances) == 0 {
fmt.Println("No KVS instances found")
return
}
fmt.Println("KVS Instances:")
for _, inst := range instances {
status := "STOPPED"
if inst.Running {
status = "RUNNING"
}
fmt.Printf(" %-20s %s (PID %d)\n", inst.Name, status, inst.PID)
}
}
// normalizeConfigPath ensures config path has .yaml extension if not specified
func normalizeConfigPath(path string) string {
// If path doesn't have an extension, add .yaml
if filepath.Ext(path) == "" {
return path + ".yaml"
}
return path
}
// getConfigIdentifier returns the identifier for a config (basename without extension)
// This is used for PID files and status display
func getConfigIdentifier(path string) string {
basename := filepath.Base(path)
return strings.TrimSuffix(basename, filepath.Ext(basename))
}
func printHelp() {
help := `KVS - Distributed Key-Value Store
Usage:
kvs [config.yaml] Run in foreground (default: ./config.yaml)
kvs start <config> Start as daemon (.yaml extension optional)
kvs stop <config> Stop daemon (.yaml extension optional)
kvs restart <config> Restart daemon (.yaml extension optional)
kvs status [config] Show status (all instances if no config given)
kvs help Show this help
Examples:
kvs # Run with ./config.yaml in foreground
kvs node1.yaml # Run with node1.yaml in foreground
kvs start node1 # Start node1.yaml as daemon
kvs start node1.yaml # Same as above
kvs stop node1 # Stop node1 daemon
kvs status # Show all running instances
kvs status node1 # Show status of node1
`
fmt.Print(help)
}

View File

@@ -22,8 +22,6 @@ import (
"kvs/utils" "kvs/utils"
) )
// healthHandler returns server health status // healthHandler returns server health status
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) { func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
mode := s.getMode() mode := s.getMode()
@@ -215,6 +213,104 @@ func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
s.logger.WithField("path", path).Info("Value deleted") s.logger.WithField("path", path).Info("Value deleted")
} }
// getResourceMetadataHandler retrieves metadata for a KV resource
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Get metadata from storage
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound)
return
}
if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// updateResourceMetadataHandler updates metadata for a KV resource
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
path := vars["path"]
// Parse request body
var req types.UpdateResourceMetadataRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest)
return
}
// Get existing metadata or create new one
metadata, err := s.authService.GetResourceMetadata(path)
if err == badger.ErrKeyNotFound {
// Create new metadata with defaults
metadata = &types.ResourceMetadata{
OwnerUUID: "",
GroupUUID: "",
Permissions: types.DefaultPermissions,
TTL: "",
CreatedAt: time.Now().Unix(),
UpdatedAt: time.Now().Unix(),
}
} else if err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// Update only provided fields
if req.OwnerUUID != nil {
metadata.OwnerUUID = *req.OwnerUUID
}
if req.GroupUUID != nil {
metadata.GroupUUID = *req.GroupUUID
}
if req.Permissions != nil {
metadata.Permissions = *req.Permissions
}
metadata.UpdatedAt = time.Now().Unix()
// Store updated metadata
if err := s.authService.SetResourceMetadata(path, metadata); err != nil {
s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
response := types.GetResourceMetadataResponse{
OwnerUUID: metadata.OwnerUUID,
GroupUUID: metadata.GroupUUID,
Permissions: metadata.Permissions,
TTL: metadata.TTL,
CreatedAt: metadata.CreatedAt,
UpdatedAt: metadata.UpdatedAt,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
s.logger.WithFields(logrus.Fields{
"path": path,
"owner_uuid": metadata.OwnerUUID,
"group_uuid": metadata.GroupUUID,
}).Info("Resource metadata updated")
}
// isClusterMember checks if request is from a cluster member // isClusterMember checks if request is from a cluster member
func (s *Server) isClusterMember(remoteAddr string) bool { func (s *Server) isClusterMember(remoteAddr string) bool {
host, _, err := net.SplitHostPort(remoteAddr) host, _, err := net.SplitHostPort(remoteAddr)
@@ -1271,3 +1367,29 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) { func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
return s.revisionService.GetSpecificRevision(key, revision) return s.revisionService.GetSpecificRevision(key, revision)
} }
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
// Ensure clustering is enabled
if !s.config.ClusteringEnabled {
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
return
}
// Ensure cluster secret is configured
if s.config.ClusterSecret == "" {
s.logger.Error("Cluster secret is not configured")
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
return
}
// Return the cluster secret for secure bootstrap
response := map[string]string{
"cluster_secret": s.config.ClusterSecret,
}
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}

View File

@@ -1,6 +1,8 @@
package server package server
import ( import (
"net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@@ -11,41 +13,122 @@ func (s *Server) setupRoutes() *mux.Router {
// Health endpoint (always available) // Health endpoint (always available)
router.HandleFunc("/health", s.healthHandler).Methods("GET") router.HandleFunc("/health", s.healthHandler).Methods("GET")
// KV endpoints (always available - see issue #5 for anonymous access control) // Resource Metadata Management endpoints (Issue #12) - Must come BEFORE general KV routes
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET") // These need to be registered first to prevent /kv/{path:.+} from matching metadata paths
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT") if s.config.AuthEnabled {
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE") router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:read"}, nil, "",
)(s.getResourceMetadataHandler)).Methods("GET")
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
[]string{"admin:users:update"}, nil, "",
)(s.updateResourceMetadataHandler)).Methods("PUT")
}
// KV endpoints (with conditional authentication based on anonymous access settings)
// GET endpoint - require auth if anonymous read is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"read"}, nil, "",
)(s.getKVHandler)).Methods("GET")
} else {
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
}
// PUT endpoint - require auth if anonymous write is disabled
if s.config.AuthEnabled && !s.config.AllowAnonymousWrite {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"write"}, nil, "",
)(s.putKVHandler)).Methods("PUT")
} else {
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
}
// DELETE endpoint - always require authentication (no anonymous delete)
if s.config.AuthEnabled {
router.Handle("/kv/{path:.+}", s.authService.Middleware(
[]string{"delete"}, nil, "",
)(s.deleteKVHandler)).Methods("DELETE")
} else {
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
}
// Member endpoints (available when clustering is enabled) // Member endpoints (available when clustering is enabled)
if s.config.ClusteringEnabled { if s.config.ClusteringEnabled {
// GET /members/ is unprotected for monitoring/inspection
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET") router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
// Merkle Tree endpoints (clustering feature) // Apply cluster authentication middleware to all cluster communication endpoints
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET") if s.clusterAuthService != nil {
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
} else {
// Fallback to unprotected endpoints (for backwards compatibility)
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
// Merkle Tree endpoints (clustering feature)
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
}
} }
// Authentication and user management endpoints (available when auth is enabled) // Authentication and user management endpoints (available when auth is enabled)
if s.config.AuthEnabled { if s.config.AuthEnabled {
// User Management endpoints // User Management endpoints (with authentication middleware)
router.HandleFunc("/api/users", s.createUserHandler).Methods("POST") router.Handle("/api/users", s.authService.Middleware(
router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET") []string{"admin:users:create"}, nil, "",
router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT") )(s.createUserHandler)).Methods("POST")
router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE")
// Group Management endpoints router.Handle("/api/users/{uuid}", s.authService.Middleware(
router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST") []string{"admin:users:read"}, nil, "",
router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET") )(s.getUserHandler)).Methods("GET")
router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT")
router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE")
// Token Management endpoints router.Handle("/api/users/{uuid}", s.authService.Middleware(
router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST") []string{"admin:users:update"}, nil, "",
)(s.updateUserHandler)).Methods("PUT")
router.Handle("/api/users/{uuid}", s.authService.Middleware(
[]string{"admin:users:delete"}, nil, "",
)(s.deleteUserHandler)).Methods("DELETE")
// Group Management endpoints (with authentication middleware)
router.Handle("/api/groups", s.authService.Middleware(
[]string{"admin:groups:create"}, nil, "",
)(s.createGroupHandler)).Methods("POST")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:read"}, nil, "",
)(s.getGroupHandler)).Methods("GET")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:update"}, nil, "",
)(s.updateGroupHandler)).Methods("PUT")
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
[]string{"admin:groups:delete"}, nil, "",
)(s.deleteGroupHandler)).Methods("DELETE")
// Token Management endpoints (with authentication middleware)
router.Handle("/api/tokens", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.createTokenHandler)).Methods("POST")
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
// Allows authenticated administrators to retrieve the cluster secret for new nodes
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
[]string{"admin:tokens:create"}, nil, "",
)(s.clusterBootstrapHandler)).Methods("GET")
} }
// Revision History endpoints (available when revision history is enabled) // Revision History endpoints (available when revision history is enabled)

View File

@@ -50,7 +50,8 @@ type Server struct {
backupMu sync.RWMutex // Protects backup status backupMu sync.RWMutex // Protects backup status
// Authentication service // Authentication service
authService *auth.AuthService authService *auth.AuthService
clusterAuthService *auth.ClusterAuthService
} }
// NewServer initializes and returns a new Server instance // NewServer initializes and returns a new Server instance
@@ -118,7 +119,12 @@ func NewServer(config *types.Config) (*Server, error) {
server.revisionService = storage.NewRevisionService(storageService) server.revisionService = storage.NewRevisionService(storageService)
// Initialize authentication service // Initialize authentication service
server.authService = auth.NewAuthService(db, logger) server.authService = auth.NewAuthService(db, logger, config)
// Initialize cluster authentication service (Issue #13)
if config.ClusteringEnabled {
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
}
// Setup initial root account if needed (Issue #3) // Setup initial root account if needed (Issue #3)
if config.AuthEnabled { if config.AuthEnabled {
@@ -219,7 +225,7 @@ func (s *Server) setupRootAccount() error {
func (s *Server) createRootUserAndToken() error { func (s *Server) createRootUserAndToken() error {
rootNickname := "root" rootNickname := "root"
adminGroupName := "admin" adminGroupName := "admin"
// Generate UUIDs // Generate UUIDs
rootUserUUID := "root-" + time.Now().Format("20060102-150405") rootUserUUID := "root-" + time.Now().Format("20060102-150405")
adminGroupUUID := "admin-" + time.Now().Format("20060102-150405") adminGroupUUID := "admin-" + time.Now().Format("20060102-150405")
@@ -234,7 +240,7 @@ func (s *Server) createRootUserAndToken() error {
UpdatedAt: now, UpdatedAt: now,
} }
// Create root user // Create root user
rootUser := types.User{ rootUser := types.User{
UUID: rootUserUUID, UUID: rootUserUUID,
NicknameHash: hashUserNickname(rootNickname), NicknameHash: hashUserNickname(rootNickname),
@@ -251,7 +257,7 @@ func (s *Server) createRootUserAndToken() error {
// Create API token with full administrative scopes // Create API token with full administrative scopes
adminScopes := []string{ adminScopes := []string{
"admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete", "admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete",
"admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete", "admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete",
"admin:tokens:create", "admin:tokens:revoke", "admin:tokens:create", "admin:tokens:revoke",
"read", "write", "delete", "read", "write", "delete",
} }
@@ -269,13 +275,13 @@ func (s *Server) createRootUserAndToken() error {
// Log the token securely (one-time display) // Log the token securely (one-time display)
s.logger.WithFields(logrus.Fields{ s.logger.WithFields(logrus.Fields{
"user_uuid": rootUserUUID, "user_uuid": rootUserUUID,
"group_uuid": adminGroupUUID, "group_uuid": adminGroupUUID,
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339), "expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
"expires_in": "24 hours", "expires_in": "24 hours",
}).Warn("Root account created - SAVE THIS TOKEN:") }).Warn("Root account created - SAVE THIS TOKEN:")
// Display token prominently // Display token prominently
fmt.Printf("\n" + strings.Repeat("=", 80) + "\n") fmt.Printf("\n" + strings.Repeat("=", 80) + "\n")
fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n") fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n")
fmt.Printf("===========================================\n") fmt.Printf("===========================================\n")
@@ -309,7 +315,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal user data: %v", err) return fmt.Errorf("failed to marshal user data: %v", err)
} }
if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil { if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil {
return fmt.Errorf("failed to store user: %v", err) return fmt.Errorf("failed to store user: %v", err)
} }
@@ -319,7 +325,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal group data: %v", err) return fmt.Errorf("failed to marshal group data: %v", err)
} }
if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil { if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil {
return fmt.Errorf("failed to store group: %v", err) return fmt.Errorf("failed to store group: %v", err)
} }
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
return nil return nil
}) })
} }

View File

@@ -2,7 +2,7 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
) )
@@ -57,4 +57,4 @@ func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, erro
return nil, fmt.Errorf("decompressor not initialized") return nil, fmt.Errorf("decompressor not initialized")
} }
return c.decompressor.DecodeAll(compressedData, nil) return c.decompressor.DecodeAll(compressedData, nil)
} }

View File

@@ -34,10 +34,10 @@ func GetRevisionKey(baseKey string, revision int) string {
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error { func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
// Get existing metadata to check current revisions // Get existing metadata to check current revisions
metadataKey := auth.ResourceMetadataKey(key) metadataKey := auth.ResourceMetadataKey(key)
var metadata types.ResourceMetadata var metadata types.ResourceMetadata
var currentRevisions []int var currentRevisions []int
// Try to get existing metadata // Try to get existing metadata
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey)) metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
if err == badger.ErrKeyNotFound { if err == badger.ErrKeyNotFound {
@@ -60,7 +60,7 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal metadata: %v", err) return fmt.Errorf("failed to unmarshal metadata: %v", err)
} }
// Extract current revisions (we store them as a custom field) // Extract current revisions (we store them as a custom field)
if metadata.TTL == "" { if metadata.TTL == "" {
currentRevisions = []int{} currentRevisions = []int{}
@@ -69,13 +69,13 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
} }
} }
// Revision rotation logic: shift existing revisions // Revision rotation logic: shift existing revisions
if len(currentRevisions) >= 3 { if len(currentRevisions) >= 3 {
// Delete oldest revision (rev:3) // Delete oldest revision (rev:3)
oldestRevKey := GetRevisionKey(key, 3) oldestRevKey := GetRevisionKey(key, 3)
txn.Delete([]byte(oldestRevKey)) txn.Delete([]byte(oldestRevKey))
// Shift rev:2 → rev:3 // Shift rev:2 → rev:3
rev2Key := GetRevisionKey(key, 2) rev2Key := GetRevisionKey(key, 2)
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key)) rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
@@ -83,8 +83,8 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
rev3Key := GetRevisionKey(key, 3) rev3Key := GetRevisionKey(key, 3)
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl) r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
} }
// Shift rev:1 → rev:2 // Shift rev:1 → rev:2
rev1Key := GetRevisionKey(key, 1) rev1Key := GetRevisionKey(key, 1)
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key)) rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
if err == nil { if err == nil {
@@ -92,80 +92,80 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl) r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
} }
} }
// Store current value as rev:1 // Store current value as rev:1
currentValueBytes, err := json.Marshal(storedValue) currentValueBytes, err := json.Marshal(storedValue)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal current value for revision: %v", err) return fmt.Errorf("failed to marshal current value for revision: %v", err)
} }
rev1Key := GetRevisionKey(key, 1) rev1Key := GetRevisionKey(key, 1)
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl) err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
if err != nil { if err != nil {
return fmt.Errorf("failed to store revision 1: %v", err) return fmt.Errorf("failed to store revision 1: %v", err)
} }
// Update metadata with new revision count // Update metadata with new revision count
metadata.UpdatedAt = time.Now().Unix() metadata.UpdatedAt = time.Now().Unix()
metadataBytes, err := json.Marshal(metadata) metadataBytes, err := json.Marshal(metadata)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal metadata: %v", err) return fmt.Errorf("failed to marshal metadata: %v", err)
} }
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl) return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
} }
// GetRevisionHistory retrieves all available revisions for a given key // GetRevisionHistory retrieves all available revisions for a given key
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) { func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
var revisions []map[string]interface{} var revisions []map[string]interface{}
err := r.storage.db.View(func(txn *badger.Txn) error { err := r.storage.db.View(func(txn *badger.Txn) error {
// Check revisions 1, 2, 3 // Check revisions 1, 2, 3
for rev := 1; rev <= 3; rev++ { for rev := 1; rev <= 3; rev++ {
revKey := GetRevisionKey(key, rev) revKey := GetRevisionKey(key, rev)
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
if err == badger.ErrKeyNotFound { if err == badger.ErrKeyNotFound {
continue // Skip missing revisions continue // Skip missing revisions
} else if err != nil { } else if err != nil {
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err) return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
} }
var storedValue types.StoredValue var storedValue types.StoredValue
err = json.Unmarshal(revData, &storedValue) err = json.Unmarshal(revData, &storedValue)
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err) return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
} }
var data interface{} var data interface{}
err = json.Unmarshal(storedValue.Data, &data) err = json.Unmarshal(storedValue.Data, &data)
if err != nil { if err != nil {
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err) return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
} }
revision := map[string]interface{}{ revision := map[string]interface{}{
"revision": rev, "revision": rev,
"uuid": storedValue.UUID, "uuid": storedValue.UUID,
"timestamp": storedValue.Timestamp, "timestamp": storedValue.Timestamp,
"data": data, "data": data,
} }
revisions = append(revisions, revision) revisions = append(revisions, revision)
} }
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Sort revisions by revision number (newest first) // Sort revisions by revision number (newest first)
// Note: they're already in order since we iterate 1->3, but reverse for newest first // Note: they're already in order since we iterate 1->3, but reverse for newest first
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 { for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
revisions[i], revisions[j] = revisions[j], revisions[i] revisions[i], revisions[j] = revisions[j], revisions[i]
} }
return revisions, nil return revisions, nil
} }
@@ -174,23 +174,23 @@ func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.
if revision < 1 || revision > 3 { if revision < 1 || revision > 3 {
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision) return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
} }
var storedValue types.StoredValue var storedValue types.StoredValue
err := r.storage.db.View(func(txn *badger.Txn) error { err := r.storage.db.View(func(txn *badger.Txn) error {
revKey := GetRevisionKey(key, revision) revKey := GetRevisionKey(key, revision)
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
if err != nil { if err != nil {
return err return err
} }
return json.Unmarshal(revData, &storedValue) return json.Unmarshal(revData, &storedValue)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &storedValue, nil return &storedValue, nil
} }
@@ -200,15 +200,15 @@ func GetRevisionFromPath(path string) (string, int, error) {
if len(parts) < 4 || parts[len(parts)-2] != "rev" { if len(parts) < 4 || parts[len(parts)-2] != "rev" {
return "", 0, fmt.Errorf("invalid revision path format") return "", 0, fmt.Errorf("invalid revision path format")
} }
revisionStr := parts[len(parts)-1] revisionStr := parts[len(parts)-1]
revision, err := strconv.Atoi(revisionStr) revision, err := strconv.Atoi(revisionStr)
if err != nil { if err != nil {
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr) return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
} }
// Reconstruct the base key without the "/rev/N" suffix // Reconstruct the base key without the "/rev/N" suffix
baseKey := strings.Join(parts[:len(parts)-2], "/") baseKey := strings.Join(parts[:len(parts)-2], "/")
return baseKey, revision, nil return baseKey, revision, nil
} }

View File

@@ -12,17 +12,17 @@ import (
// StorageService handles all BadgerDB operations and data management // StorageService handles all BadgerDB operations and data management
type StorageService struct { type StorageService struct {
db *badger.DB db *badger.DB
config *types.Config config *types.Config
compressionSvc *CompressionService compressionSvc *CompressionService
logger *logrus.Logger logger *logrus.Logger
} }
// NewStorageService creates a new storage service // NewStorageService creates a new storage service
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) { func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
var compressionSvc *CompressionService var compressionSvc *CompressionService
var err error var err error
// Initialize compression if enabled // Initialize compression if enabled
if config.CompressionEnabled { if config.CompressionEnabled {
compressionSvc, err = NewCompressionService() compressionSvc, err = NewCompressionService()
@@ -50,7 +50,7 @@ func (s *StorageService) Close() {
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error { func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
var finalData []byte var finalData []byte
var err error var err error
// Compress data if compression is enabled // Compress data if compression is enabled
if s.config.CompressionEnabled && s.compressionSvc != nil { if s.config.CompressionEnabled && s.compressionSvc != nil {
finalData, err = s.compressionSvc.CompressData(data) finalData, err = s.compressionSvc.CompressData(data)
@@ -60,14 +60,14 @@ func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte,
} else { } else {
finalData = data finalData = data
} }
entry := badger.NewEntry(key, finalData) entry := badger.NewEntry(key, finalData)
// Apply TTL if specified // Apply TTL if specified
if ttl > 0 { if ttl > 0 {
entry = entry.WithTTL(ttl) entry = entry.WithTTL(ttl)
} }
return txn.SetEntry(entry) return txn.SetEntry(entry)
} }
@@ -77,7 +77,7 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var compressedData []byte var compressedData []byte
err = item.Value(func(val []byte) error { err = item.Value(func(val []byte) error {
compressedData = append(compressedData, val...) compressedData = append(compressedData, val...)
@@ -86,12 +86,12 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Decompress data if compression is enabled // Decompress data if compression is enabled
if s.config.CompressionEnabled && s.compressionSvc != nil { if s.config.CompressionEnabled && s.compressionSvc != nil {
return s.compressionSvc.DecompressData(compressedData) return s.compressionSvc.DecompressData(compressedData)
} }
return compressedData, nil return compressedData, nil
} }
@@ -109,4 +109,4 @@ func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) {
return compressedData, nil return compressedData, nil
} }
return s.compressionSvc.DecompressData(compressedData) return s.compressionSvc.DecompressData(compressedData)
} }

View File

@@ -13,20 +13,20 @@ type StoredValue struct {
// User represents a system user // User represents a system user
type User struct { type User struct {
UUID string `json:"uuid"` // Server-generated UUID UUID string `json:"uuid"` // Server-generated UUID
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
Groups []string `json:"groups"` // List of group UUIDs this user belongs to Groups []string `json:"groups"` // List of group UUIDs this user belongs to
CreatedAt int64 `json:"created_at"` // Unix timestamp CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp UpdatedAt int64 `json:"updated_at"` // Unix timestamp
} }
// Group represents a user group // Group represents a user group
type Group struct { type Group struct {
UUID string `json:"uuid"` // Server-generated UUID UUID string `json:"uuid"` // Server-generated UUID
NameHash string `json:"name_hash"` // SHA3-512 hash of group name NameHash string `json:"name_hash"` // SHA3-512 hash of group name
Members []string `json:"members"` // List of user UUIDs in this group Members []string `json:"members"` // List of user UUIDs in this group
CreatedAt int64 `json:"created_at"` // Unix timestamp CreatedAt int64 `json:"created_at"` // Unix timestamp
UpdatedAt int64 `json:"updated_at"` // Unix timestamp UpdatedAt int64 `json:"updated_at"` // Unix timestamp
} }
// APIToken represents a JWT authentication token // APIToken represents a JWT authentication token
@@ -40,12 +40,12 @@ type APIToken struct {
// ResourceMetadata contains ownership and permission information for stored resources // ResourceMetadata contains ownership and permission information for stored resources
type ResourceMetadata struct { type ResourceMetadata struct {
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
GroupUUID string `json:"group_uuid"` // UUID of the resource group GroupUUID string `json:"group_uuid"` // UUID of the resource group
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired) Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
TTL string `json:"ttl"` // Time-to-live duration (Go format) TTL string `json:"ttl"` // Time-to-live duration (Go format)
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
} }
// Permission constants for POSIX-inspired ACL // Permission constants for POSIX-inspired ACL
@@ -55,19 +55,19 @@ const (
PermOwnerDelete = 1 << 10 PermOwnerDelete = 1 << 10
PermOwnerWrite = 1 << 9 PermOwnerWrite = 1 << 9
PermOwnerRead = 1 << 8 PermOwnerRead = 1 << 8
// Group permissions (bits 7-4) // Group permissions (bits 7-4)
PermGroupCreate = 1 << 7 PermGroupCreate = 1 << 7
PermGroupDelete = 1 << 6 PermGroupDelete = 1 << 6
PermGroupWrite = 1 << 5 PermGroupWrite = 1 << 5
PermGroupRead = 1 << 4 PermGroupRead = 1 << 4
// Others permissions (bits 3-0) // Others permissions (bits 3-0)
PermOthersCreate = 1 << 3 PermOthersCreate = 1 << 3
PermOthersDelete = 1 << 2 PermOthersDelete = 1 << 2
PermOthersWrite = 1 << 1 PermOthersWrite = 1 << 1
PermOthersRead = 1 << 0 PermOthersRead = 1 << 0
// Default permissions: Owner(1111), Group(0110), Others(0010) // Default permissions: Owner(1111), Group(0110), Others(0010)
DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) | DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) |
(PermGroupWrite | PermGroupRead) | (PermGroupWrite | PermGroupRead) |
@@ -131,6 +131,22 @@ type CreateTokenResponse struct {
ExpiresAt int64 `json:"expires_at"` ExpiresAt int64 `json:"expires_at"`
} }
// Resource Metadata Management API structures (Issue #12)
type GetResourceMetadataResponse struct {
OwnerUUID string `json:"owner_uuid"`
GroupUUID string `json:"group_uuid"`
Permissions int `json:"permissions"`
TTL string `json:"ttl"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
type UpdateResourceMetadataRequest struct {
OwnerUUID *string `json:"owner_uuid,omitempty"`
GroupUUID *string `json:"group_uuid,omitempty"`
Permissions *int `json:"permissions,omitempty"`
}
// Cluster and member management types // Cluster and member management types
type Member struct { type Member struct {
ID string `json:"id"` ID string `json:"id"`
@@ -231,46 +247,57 @@ type KVRangeResponse struct {
// Configuration // Configuration
type Config struct { type Config struct {
NodeID string `yaml:"node_id"` NodeID string `yaml:"node_id"`
BindAddress string `yaml:"bind_address"` BindAddress string `yaml:"bind_address"`
Port int `yaml:"port"` Port int `yaml:"port"`
DataDir string `yaml:"data_dir"` DataDir string `yaml:"data_dir"`
SeedNodes []string `yaml:"seed_nodes"` SeedNodes []string `yaml:"seed_nodes"`
ReadOnly bool `yaml:"read_only"` ReadOnly bool `yaml:"read_only"`
LogLevel string `yaml:"log_level"` LogLevel string `yaml:"log_level"`
GossipIntervalMin int `yaml:"gossip_interval_min"` GossipIntervalMin int `yaml:"gossip_interval_min"`
GossipIntervalMax int `yaml:"gossip_interval_max"` GossipIntervalMax int `yaml:"gossip_interval_max"`
SyncInterval int `yaml:"sync_interval"` SyncInterval int `yaml:"sync_interval"`
CatchupInterval int `yaml:"catchup_interval"` CatchupInterval int `yaml:"catchup_interval"`
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
ThrottleDelayMs int `yaml:"throttle_delay_ms"` ThrottleDelayMs int `yaml:"throttle_delay_ms"`
FetchDelayMs int `yaml:"fetch_delay_ms"` FetchDelayMs int `yaml:"fetch_delay_ms"`
// Database compression configuration // Database compression configuration
CompressionEnabled bool `yaml:"compression_enabled"` CompressionEnabled bool `yaml:"compression_enabled"`
CompressionLevel int `yaml:"compression_level"` CompressionLevel int `yaml:"compression_level"`
// TTL configuration // TTL configuration
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
// Rate limiting configuration // Rate limiting configuration
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format) RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format)
// Tamper-evident logging configuration // Tamper-evident logging configuration
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
// Backup system configuration // Backup system configuration
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
BackupPath string `yaml:"backup_path"` // Directory to store backups BackupPath string `yaml:"backup_path"` // Directory to store backups
BackupRetention int `yaml:"backup_retention"` // Days to keep backups BackupRetention int `yaml:"backup_retention"` // Days to keep backups
// Feature toggles for optional functionalities // Feature toggles for optional functionalities
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
}
// Anonymous access control (Issue #5)
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
// Cluster authentication (Issue #13)
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
}

View File

@@ -22,4 +22,4 @@ func HashGroupName(groupname string) string {
func HashToken(token string) string { func HashToken(token string) string {
return HashSHA3512(token) return HashSHA3512(token)
} }