Compare commits
1 Commits
secure-clu
...
0b7af92761
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b7af92761 |
155
CLAUDE.md
155
CLAUDE.md
@@ -1,155 +0,0 @@
|
||||
# CLAUDE.md
|
||||
|
||||
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
||||
|
||||
## Commands for Development
|
||||
|
||||
### Build and Test Commands
|
||||
```bash
|
||||
# Build the binary
|
||||
go build -o kvs .
|
||||
|
||||
# Run with default config (auto-generates config.yaml)
|
||||
./kvs
|
||||
|
||||
# Run with custom config
|
||||
./kvs /path/to/config.yaml
|
||||
|
||||
# Run comprehensive integration tests
|
||||
./integration_test.sh
|
||||
|
||||
# Create test conflict data for debugging
|
||||
go run test_conflict.go data1 data2
|
||||
|
||||
# Build and test in one go
|
||||
go build -o kvs . && ./integration_test.sh
|
||||
```
|
||||
|
||||
### Development Workflow
|
||||
```bash
|
||||
# Format and check code
|
||||
go fmt ./...
|
||||
go vet ./...
|
||||
|
||||
# Run dependencies management
|
||||
go mod tidy
|
||||
|
||||
# Check build without artifacts
|
||||
go build .
|
||||
|
||||
# Test specific cluster scenarios
|
||||
./kvs node1.yaml & # Terminal 1
|
||||
./kvs node2.yaml & # Terminal 2
|
||||
curl -X PUT http://localhost:8081/kv/test/data -H "Content-Type: application/json" -d '{"test":"data"}'
|
||||
curl http://localhost:8082/kv/test/data # Should replicate within ~30 seconds
|
||||
pkill kvs
|
||||
```
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
### High-Level Structure
|
||||
KVS is a **distributed, eventually consistent key-value store** built around three core systems:
|
||||
|
||||
1. **Gossip Protocol** (`cluster/gossip.go`) - Decentralized membership management and failure detection
|
||||
2. **Merkle Tree Sync** (`cluster/sync.go`, `cluster/merkle.go`) - Efficient data synchronization and conflict resolution
|
||||
3. **Modular Server** (`server/`) - HTTP API with pluggable feature modules
|
||||
|
||||
### Key Architectural Patterns
|
||||
|
||||
#### Modular Package Design
|
||||
- **`auth/`** - Complete JWT authentication system with POSIX-inspired permissions
|
||||
- **`cluster/`** - Distributed systems logic (gossip, sync, merkle trees)
|
||||
- **`storage/`** - BadgerDB abstraction with compression and revision history
|
||||
- **`server/`** - HTTP handlers, routing, and lifecycle management
|
||||
- **`features/`** - Utility functions for TTL, rate limiting, tamper logging, backup
|
||||
- **`types/`** - Centralized type definitions for all components
|
||||
- **`config/`** - Configuration loading with auto-generation
|
||||
- **`utils/`** - Cryptographic hashing utilities
|
||||
|
||||
#### Core Data Model
|
||||
```go
|
||||
// Primary storage format
|
||||
type StoredValue struct {
|
||||
UUID string `json:"uuid"` // Unique version identifier
|
||||
Timestamp int64 `json:"timestamp"` // Unix timestamp (milliseconds)
|
||||
Data json.RawMessage `json:"data"` // Actual user JSON payload
|
||||
}
|
||||
```
|
||||
|
||||
#### Critical System Interactions
|
||||
|
||||
**Conflict Resolution Flow:**
|
||||
1. Merkle trees detect divergent data between nodes (`cluster/merkle.go`)
|
||||
2. Sync service fetches conflicting keys (`cluster/sync.go:fetchAndCompareData`)
|
||||
3. Sophisticated conflict resolution logic in `resolveConflict()`:
|
||||
- Same timestamp → Apply "oldest-node rule" (earliest `joined_timestamp` wins)
|
||||
- Tie-breaker → UUID comparison for deterministic results
|
||||
- Winner's data automatically replicated to losing nodes
|
||||
|
||||
**Authentication & Authorization:**
|
||||
- JWT tokens with scoped permissions (`auth/jwt.go`)
|
||||
- POSIX-inspired 12-bit permission system (`types/types.go:52-75`)
|
||||
- Resource ownership metadata with TTL support (`types/ResourceMetadata`)
|
||||
|
||||
**Storage Strategy:**
|
||||
- **Main keys**: Direct path mapping (`users/john/profile`)
|
||||
- **Index keys**: `_ts:{timestamp}:{path}` for time-based queries
|
||||
- **Compression**: Optional ZSTD compression (`storage/compression.go`)
|
||||
- **Revisions**: Optional revision history (`storage/revision.go`)
|
||||
|
||||
### Configuration Architecture
|
||||
|
||||
The system uses feature toggles extensively (`types/Config:271-280`):
|
||||
```yaml
|
||||
auth_enabled: true # JWT authentication system
|
||||
tamper_logging_enabled: true # Cryptographic audit trail
|
||||
clustering_enabled: true # Gossip protocol and sync
|
||||
rate_limiting_enabled: true # Per-client rate limiting
|
||||
revision_history_enabled: true # Automatic versioning
|
||||
|
||||
# Anonymous access control (Issue #5 - when auth_enabled: true)
|
||||
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
|
||||
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
|
||||
```
|
||||
|
||||
**Security Note**: DELETE operations always require authentication when `auth_enabled: true`, regardless of anonymous access settings.
|
||||
|
||||
### Testing Strategy
|
||||
|
||||
#### Integration Test Suite (`integration_test.sh`)
|
||||
- **Build verification** - Ensures binary compiles correctly
|
||||
- **Basic functionality** - Single-node CRUD operations
|
||||
- **Cluster formation** - 2-node gossip protocol and data replication
|
||||
- **Conflict resolution** - Automated conflict detection and resolution using `test_conflict.go`
|
||||
- **Authentication middleware** - Comprehensive security testing (Issue #4):
|
||||
- Admin endpoints properly reject unauthenticated requests
|
||||
- Admin endpoints work with valid JWT tokens
|
||||
- KV endpoints respect anonymous access configuration
|
||||
- Automatic root account creation and token extraction
|
||||
|
||||
The test suite uses sophisticated retry logic and timing to handle the eventually consistent nature of the system.
|
||||
|
||||
#### Conflict Testing Utility (`test_conflict.go`)
|
||||
Creates two BadgerDB instances with intentionally conflicting data (same path, same timestamp, different UUIDs) to test the conflict resolution algorithm.
|
||||
|
||||
### Development Notes
|
||||
|
||||
#### Key Constraints
|
||||
- **Eventually Consistent**: All operations succeed locally first, then replicate
|
||||
- **Local-First Truth**: Nodes operate independently and sync in background
|
||||
- **No Transactions**: Each key operation is atomic and independent
|
||||
- **Hierarchical Keys**: Support for path-like structures (`/home/room/closet/socks`)
|
||||
|
||||
#### Critical Timing Considerations
|
||||
- **Gossip intervals**: 1-2 minutes for membership updates
|
||||
- **Sync intervals**: 5 minutes for regular data sync, 2 minutes for catch-up
|
||||
- **Conflict resolution**: Typically resolves within 10-30 seconds after detection
|
||||
- **Bootstrap sync**: Up to 30 days of historical data for new nodes
|
||||
|
||||
#### Main Entry Point Flow
|
||||
1. `main.go` loads config (auto-generates default if missing)
|
||||
2. `server.NewServer()` initializes all subsystems
|
||||
3. Graceful shutdown handling with `SIGINT`/`SIGTERM`
|
||||
4. All business logic delegated to modular packages
|
||||
|
||||
This architecture enables easy feature addition, comprehensive testing, and reliable operation in distributed environments while maintaining simplicity for single-node deployments.
|
||||
368
README.md
368
README.md
@@ -6,14 +6,12 @@ A minimalistic, clustered key-value database system written in Go that prioritiz
|
||||
|
||||
- **Hierarchical Keys**: Support for structured paths (e.g., `/home/room/closet/socks`)
|
||||
- **Eventual Consistency**: Local operations are fast, replication happens in background
|
||||
- **Merkle Tree Sync**: Efficient data synchronization with cryptographic integrity
|
||||
- **Sophisticated Conflict Resolution**: Oldest-node rule with UUID tie-breaking
|
||||
- **JWT Authentication**: Full authentication system with POSIX-inspired permissions
|
||||
- **Gossip Protocol**: Decentralized node discovery and failure detection
|
||||
- **Sophisticated Conflict Resolution**: Majority vote with oldest-node tie-breaking
|
||||
- **Local-First Truth**: All operations work locally first, sync globally later
|
||||
- **Read-Only Mode**: Configurable mode for reducing write load
|
||||
- **Modular Architecture**: Clean separation of concerns with feature toggles
|
||||
- **Comprehensive Features**: TTL support, rate limiting, tamper logging, automated backups
|
||||
- **Zero External Dependencies**: Single binary with embedded BadgerDB storage
|
||||
- **Gradual Bootstrapping**: New nodes integrate smoothly without overwhelming cluster
|
||||
- **Zero Dependencies**: Single binary with embedded BadgerDB storage
|
||||
|
||||
## 🏗️ Architecture
|
||||
|
||||
@@ -23,36 +21,24 @@ A minimalistic, clustered key-value database system written in Go that prioritiz
|
||||
│ (Go Service) │ │ (Go Service) │ │ (Go Service) │
|
||||
│ │ │ │ │ │
|
||||
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
|
||||
│ │HTTP API+Auth│ │◄──►│ │HTTP API+Auth│ │◄──►│ │HTTP API+Auth│ │
|
||||
│ │ HTTP Server │ │◄──►│ │ HTTP Server │ │◄──►│ │ HTTP Server │ │
|
||||
│ │ (API) │ │ │ │ (API) │ │ │ │ (API) │ │
|
||||
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
|
||||
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
|
||||
│ │ Gossip │ │◄──►│ │ Gossip │ │◄──►│ │ Gossip │ │
|
||||
│ │ Protocol │ │ │ │ Protocol │ │ │ │ Protocol │ │
|
||||
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
|
||||
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
|
||||
│ │Merkle Sync │ │◄──►│ │Merkle Sync │ │◄──►│ │Merkle Sync │ │
|
||||
│ │& Conflict │ │ │ │& Conflict │ │ │ │& Conflict │ │
|
||||
│ │ Resolution │ │ │ │ Resolution │ │ │ │ Resolution │ │
|
||||
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
|
||||
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
|
||||
│ │Storage+ │ │ │ │Storage+ │ │ │ │Storage+ │ │
|
||||
│ │Features │ │ │ │Features │ │ │ │Features │ │
|
||||
│ │(BadgerDB) │ │ │ │(BadgerDB) │ │ │ │(BadgerDB) │ │
|
||||
│ │ BadgerDB │ │ │ │ BadgerDB │ │ │ │ BadgerDB │ │
|
||||
│ │ (Local KV) │ │ │ │ (Local KV) │ │ │ │ (Local KV) │ │
|
||||
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
|
||||
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
||||
▲
|
||||
│
|
||||
External Clients (JWT Auth)
|
||||
External Clients
|
||||
```
|
||||
|
||||
### Modular Design
|
||||
KVS features a clean modular architecture with dedicated packages:
|
||||
- **`auth/`** - JWT authentication and POSIX-inspired permissions
|
||||
- **`cluster/`** - Gossip protocol, Merkle tree sync, and conflict resolution
|
||||
- **`storage/`** - BadgerDB abstraction with compression and revisions
|
||||
- **`server/`** - HTTP API, routing, and lifecycle management
|
||||
- **`features/`** - TTL, rate limiting, tamper logging, backup utilities
|
||||
- **`config/`** - Configuration management with auto-generation
|
||||
Each node is fully autonomous and communicates with peers via HTTP REST API for both external client requests and internal cluster operations.
|
||||
|
||||
## 📦 Installation
|
||||
|
||||
@@ -81,47 +67,20 @@ curl http://localhost:8080/health
|
||||
KVS uses YAML configuration files. On first run, a default `config.yaml` is automatically generated:
|
||||
|
||||
```yaml
|
||||
node_id: "hostname" # Unique node identifier
|
||||
bind_address: "127.0.0.1" # IP address to bind to
|
||||
port: 8080 # HTTP port
|
||||
data_dir: "./data" # Directory for BadgerDB storage
|
||||
seed_nodes: [] # List of seed nodes for cluster joining
|
||||
read_only: false # Enable read-only mode
|
||||
log_level: "info" # Logging level (debug, info, warn, error)
|
||||
|
||||
# Cluster timing configuration
|
||||
gossip_interval_min: 60 # Min gossip interval (seconds)
|
||||
gossip_interval_max: 120 # Max gossip interval (seconds)
|
||||
sync_interval: 300 # Regular sync interval (seconds)
|
||||
catchup_interval: 120 # Catch-up sync interval (seconds)
|
||||
bootstrap_max_age_hours: 720 # Max age for bootstrap sync (hours)
|
||||
throttle_delay_ms: 100 # Delay between sync requests (ms)
|
||||
fetch_delay_ms: 50 # Delay between data fetches (ms)
|
||||
|
||||
# Feature configuration
|
||||
compression_enabled: true # Enable ZSTD compression
|
||||
compression_level: 3 # Compression level (1-19)
|
||||
default_ttl: "0" # Default TTL ("0" = no expiry)
|
||||
max_json_size: 1048576 # Max JSON payload size (1MB)
|
||||
rate_limit_requests: 100 # Requests per window
|
||||
rate_limit_window: "1m" # Rate limit window
|
||||
|
||||
# Feature toggles
|
||||
auth_enabled: true # JWT authentication system
|
||||
tamper_logging_enabled: true # Cryptographic audit trail
|
||||
clustering_enabled: true # Gossip protocol and sync
|
||||
rate_limiting_enabled: true # Rate limiting
|
||||
revision_history_enabled: true # Automatic versioning
|
||||
|
||||
# Anonymous access control (when auth_enabled: true)
|
||||
allow_anonymous_read: false # Allow unauthenticated read access to KV endpoints
|
||||
allow_anonymous_write: false # Allow unauthenticated write access to KV endpoints
|
||||
|
||||
# Backup configuration
|
||||
backup_enabled: true # Automated backups
|
||||
backup_schedule: "0 0 * * *" # Daily at midnight (cron format)
|
||||
backup_path: "./backups" # Backup directory
|
||||
backup_retention: 7 # Days to keep backups
|
||||
node_id: "hostname" # Unique node identifier
|
||||
bind_address: "127.0.0.1" # IP address to bind to
|
||||
port: 8080 # HTTP port
|
||||
data_dir: "./data" # Directory for BadgerDB storage
|
||||
seed_nodes: [] # List of seed nodes for cluster joining
|
||||
read_only: false # Enable read-only mode
|
||||
log_level: "info" # Logging level (debug, info, warn, error)
|
||||
gossip_interval_min: 60 # Min gossip interval (seconds)
|
||||
gossip_interval_max: 120 # Max gossip interval (seconds)
|
||||
sync_interval: 300 # Regular sync interval (seconds)
|
||||
catchup_interval: 120 # Catch-up sync interval (seconds)
|
||||
bootstrap_max_age_hours: 720 # Max age for bootstrap sync (hours)
|
||||
throttle_delay_ms: 100 # Delay between sync requests (ms)
|
||||
fetch_delay_ms: 50 # Delay between data fetches (ms)
|
||||
```
|
||||
|
||||
### Custom Configuration
|
||||
@@ -138,20 +97,11 @@ backup_retention: 7 # Days to keep backups
|
||||
```bash
|
||||
PUT /kv/{path}
|
||||
Content-Type: application/json
|
||||
Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_write
|
||||
|
||||
# Basic storage
|
||||
curl -X PUT http://localhost:8080/kv/users/john/profile \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "Authorization: Bearer eyJ..." \
|
||||
-d '{"name":"John Doe","age":30,"email":"john@example.com"}'
|
||||
|
||||
# Storage with TTL
|
||||
curl -X PUT http://localhost:8080/kv/cache/session/abc123 \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "Authorization: Bearer eyJ..." \
|
||||
-d '{"data":{"user_id":"john"}, "ttl":"1h"}'
|
||||
|
||||
# Response
|
||||
{
|
||||
"uuid": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
|
||||
@@ -162,62 +112,25 @@ curl -X PUT http://localhost:8080/kv/cache/session/abc123 \
|
||||
#### Retrieve Data
|
||||
```bash
|
||||
GET /kv/{path}
|
||||
Authorization: Bearer <jwt-token> # Required if auth_enabled && !allow_anonymous_read
|
||||
|
||||
curl -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
|
||||
curl http://localhost:8080/kv/users/john/profile
|
||||
|
||||
# Response (full StoredValue format)
|
||||
# Response
|
||||
{
|
||||
"uuid": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
|
||||
"timestamp": 1672531200000,
|
||||
"data": {
|
||||
"name": "John Doe",
|
||||
"age": 30,
|
||||
"email": "john@example.com"
|
||||
}
|
||||
"name": "John Doe",
|
||||
"age": 30,
|
||||
"email": "john@example.com"
|
||||
}
|
||||
```
|
||||
|
||||
#### Delete Data
|
||||
```bash
|
||||
DELETE /kv/{path}
|
||||
Authorization: Bearer <jwt-token> # Always required when auth_enabled (no anonymous delete)
|
||||
|
||||
curl -X DELETE -H "Authorization: Bearer eyJ..." http://localhost:8080/kv/users/john/profile
|
||||
curl -X DELETE http://localhost:8080/kv/users/john/profile
|
||||
# Returns: 204 No Content
|
||||
```
|
||||
|
||||
### Authentication Operations (`/auth/`)
|
||||
|
||||
#### Create User
|
||||
```bash
|
||||
POST /auth/users
|
||||
Content-Type: application/json
|
||||
|
||||
curl -X POST http://localhost:8080/auth/users \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"nickname":"john"}'
|
||||
|
||||
# Response
|
||||
{"uuid": "user-abc123"}
|
||||
```
|
||||
|
||||
#### Create API Token
|
||||
```bash
|
||||
POST /auth/tokens
|
||||
Content-Type: application/json
|
||||
|
||||
curl -X POST http://localhost:8080/auth/tokens \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"user_uuid":"user-abc123", "scopes":["read","write"]}'
|
||||
|
||||
# Response
|
||||
{
|
||||
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
||||
"expires_at": 1672617600000
|
||||
}
|
||||
```
|
||||
|
||||
### Cluster Operations (`/members/`)
|
||||
|
||||
#### View Cluster Members
|
||||
@@ -236,6 +149,12 @@ curl http://localhost:8080/members/
|
||||
]
|
||||
```
|
||||
|
||||
#### Join Cluster (Internal)
|
||||
```bash
|
||||
POST /members/join
|
||||
# Used internally during bootstrap process
|
||||
```
|
||||
|
||||
#### Health Check
|
||||
```bash
|
||||
GET /health
|
||||
@@ -250,20 +169,6 @@ curl http://localhost:8080/health
|
||||
}
|
||||
```
|
||||
|
||||
### Merkle Tree Operations (`/sync/`)
|
||||
|
||||
#### Get Merkle Root
|
||||
```bash
|
||||
GET /sync/merkle/root
|
||||
# Used internally for data synchronization
|
||||
```
|
||||
|
||||
#### Range Queries
|
||||
```bash
|
||||
GET /kv/_range?start_key=users/&end_key=users/z&limit=100
|
||||
# Fetch key ranges for synchronization
|
||||
```
|
||||
|
||||
## 🏘️ Cluster Setup
|
||||
|
||||
### Single Node (Standalone)
|
||||
@@ -282,8 +187,6 @@ seed_nodes: [] # Empty = standalone mode
|
||||
node_id: "node1"
|
||||
port: 8081
|
||||
seed_nodes: [] # First node, no seeds needed
|
||||
auth_enabled: true
|
||||
clustering_enabled: true
|
||||
```
|
||||
|
||||
#### Node 2 (Joins via Node 1)
|
||||
@@ -292,8 +195,6 @@ clustering_enabled: true
|
||||
node_id: "node2"
|
||||
port: 8082
|
||||
seed_nodes: ["127.0.0.1:8081"] # Points to node1
|
||||
auth_enabled: true
|
||||
clustering_enabled: true
|
||||
```
|
||||
|
||||
#### Node 3 (Joins via Node 1 & 2)
|
||||
@@ -302,8 +203,6 @@ clustering_enabled: true
|
||||
node_id: "node3"
|
||||
port: 8083
|
||||
seed_nodes: ["127.0.0.1:8081", "127.0.0.1:8082"] # Multiple seeds for reliability
|
||||
auth_enabled: true
|
||||
clustering_enabled: true
|
||||
```
|
||||
|
||||
#### Start the Cluster
|
||||
@@ -316,9 +215,6 @@ clustering_enabled: true
|
||||
|
||||
# Terminal 3 (wait a few seconds)
|
||||
./kvs node3.yaml
|
||||
|
||||
# Verify cluster formation
|
||||
curl http://localhost:8081/members/ # Should show all 3 nodes
|
||||
```
|
||||
|
||||
## 🔄 How It Works
|
||||
@@ -328,30 +224,20 @@ curl http://localhost:8081/members/ # Should show all 3 nodes
|
||||
- Failed nodes are detected via timeout (5 minutes) and removed (10 minutes)
|
||||
- New members are automatically discovered and added to local member lists
|
||||
|
||||
### Merkle Tree Synchronization
|
||||
- **Merkle Trees**: Each node builds cryptographic trees of their data for efficient comparison
|
||||
- **Regular Sync**: Every 5 minutes, nodes compare Merkle roots and sync divergent branches
|
||||
### Data Synchronization
|
||||
- **Regular Sync**: Every 5 minutes, nodes compare their latest 15 data items with a random peer
|
||||
- **Catch-up Sync**: Every 2 minutes when nodes detect they're significantly behind
|
||||
- **Bootstrap Sync**: New nodes gradually fetch historical data up to 30 days old
|
||||
- **Efficient Detection**: Only synchronizes actual differences, not entire datasets
|
||||
|
||||
### Sophisticated Conflict Resolution
|
||||
### Conflict Resolution
|
||||
When two nodes have different data for the same key with identical timestamps:
|
||||
|
||||
1. **Detection**: Merkle tree comparison identifies conflicting keys
|
||||
2. **Oldest-Node Rule**: The version from the node with earliest `joined_timestamp` wins
|
||||
3. **UUID Tie-Breaker**: If join times are identical, lexicographically smaller UUID wins
|
||||
4. **Automatic Resolution**: Losing nodes automatically fetch and store the winning version
|
||||
5. **Consistency**: All nodes converge to the same data within seconds
|
||||
|
||||
### Authentication & Authorization
|
||||
- **JWT Tokens**: Secure API access with scoped permissions
|
||||
- **POSIX-Inspired ACLs**: 12-bit permission system (owner/group/others with create/delete/write/read)
|
||||
- **Resource Metadata**: Each stored item has ownership and permission information
|
||||
- **Feature Toggle**: Can be completely disabled for simpler deployments
|
||||
1. **Majority Vote**: Query all healthy cluster members for their version
|
||||
2. **Tie-Breaker**: If votes are tied, the version from the oldest node (earliest `joined_timestamp`) wins
|
||||
3. **Automatic Resolution**: Losing nodes automatically fetch and store the winning version
|
||||
|
||||
### Operational Modes
|
||||
- **Normal**: Full read/write capabilities with all features
|
||||
- **Normal**: Full read/write capabilities
|
||||
- **Read-Only**: Rejects external writes but accepts internal replication
|
||||
- **Syncing**: Temporary mode during bootstrap, rejects external writes
|
||||
|
||||
@@ -359,146 +245,57 @@ When two nodes have different data for the same key with identical timestamps:
|
||||
|
||||
### Running Tests
|
||||
```bash
|
||||
# Build and run comprehensive integration tests
|
||||
# Basic functionality test
|
||||
go build -o kvs .
|
||||
./integration_test.sh
|
||||
|
||||
# Manual basic functionality test
|
||||
./kvs &
|
||||
curl http://localhost:8080/health
|
||||
pkill kvs
|
||||
|
||||
# Manual cluster test (requires creating configs)
|
||||
echo 'node_id: "test1"
|
||||
port: 8081
|
||||
seed_nodes: []
|
||||
auth_enabled: false' > test1.yaml
|
||||
# Cluster test with provided configs
|
||||
./kvs node1.yaml &
|
||||
./kvs node2.yaml &
|
||||
./kvs node3.yaml &
|
||||
|
||||
echo 'node_id: "test2"
|
||||
port: 8082
|
||||
seed_nodes: ["127.0.0.1:8081"]
|
||||
auth_enabled: false' > test2.yaml
|
||||
|
||||
./kvs test1.yaml &
|
||||
./kvs test2.yaml &
|
||||
|
||||
# Test data replication (wait for cluster formation)
|
||||
sleep 10
|
||||
# Test data replication
|
||||
curl -X PUT http://localhost:8081/kv/test/data \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"message":"hello world"}'
|
||||
|
||||
# Wait for Merkle sync, then check replication
|
||||
sleep 30
|
||||
# Wait 30+ seconds for sync, then check other nodes
|
||||
curl http://localhost:8082/kv/test/data
|
||||
curl http://localhost:8083/kv/test/data
|
||||
|
||||
# Cleanup
|
||||
pkill kvs
|
||||
rm test1.yaml test2.yaml
|
||||
```
|
||||
|
||||
### Conflict Resolution Testing
|
||||
```bash
|
||||
# Create conflicting data scenario using utility
|
||||
go run test_conflict.go /tmp/conflict1 /tmp/conflict2
|
||||
|
||||
# Create configs for conflict test
|
||||
echo 'node_id: "conflict1"
|
||||
port: 9111
|
||||
data_dir: "/tmp/conflict1"
|
||||
seed_nodes: []
|
||||
auth_enabled: false
|
||||
log_level: "debug"' > conflict1.yaml
|
||||
|
||||
echo 'node_id: "conflict2"
|
||||
port: 9112
|
||||
data_dir: "/tmp/conflict2"
|
||||
seed_nodes: ["127.0.0.1:9111"]
|
||||
auth_enabled: false
|
||||
log_level: "debug"' > conflict2.yaml
|
||||
# Create conflicting data scenario
|
||||
rm -rf data1 data2
|
||||
mkdir data1 data2
|
||||
go run test_conflict.go data1 data2
|
||||
|
||||
# Start nodes with conflicting data
|
||||
./kvs conflict1.yaml &
|
||||
./kvs conflict2.yaml &
|
||||
./kvs node1.yaml &
|
||||
./kvs node2.yaml &
|
||||
|
||||
# Watch logs for conflict resolution
|
||||
# Both nodes will converge within ~10-30 seconds
|
||||
# Check final state
|
||||
sleep 30
|
||||
curl http://localhost:9111/kv/test/conflict/data
|
||||
curl http://localhost:9112/kv/test/conflict/data
|
||||
|
||||
pkill kvs
|
||||
rm conflict1.yaml conflict2.yaml
|
||||
```
|
||||
|
||||
### Code Quality
|
||||
```bash
|
||||
# Format and lint
|
||||
go fmt ./...
|
||||
go vet ./...
|
||||
|
||||
# Dependency management
|
||||
go mod tidy
|
||||
go mod verify
|
||||
|
||||
# Build verification
|
||||
go build .
|
||||
# Both nodes will converge to same data within ~30 seconds
|
||||
```
|
||||
|
||||
### Project Structure
|
||||
```
|
||||
kvs/
|
||||
├── main.go # Main application entry point
|
||||
├── config.yaml # Default configuration (auto-generated)
|
||||
├── integration_test.sh # Comprehensive test suite
|
||||
├── test_conflict.go # Conflict resolution testing utility
|
||||
├── CLAUDE.md # Development guidance for Claude Code
|
||||
├── go.mod # Go module dependencies
|
||||
├── go.sum # Go module checksums
|
||||
├── README.md # This documentation
|
||||
│
|
||||
├── auth/ # Authentication & authorization
|
||||
│ ├── auth.go # Main auth logic
|
||||
│ ├── jwt.go # JWT token management
|
||||
│ ├── middleware.go # HTTP middleware
|
||||
│ ├── permissions.go # POSIX-inspired ACL system
|
||||
│ └── storage.go # Auth data storage
|
||||
│
|
||||
├── cluster/ # Distributed systems components
|
||||
│ ├── bootstrap.go # New node integration
|
||||
│ ├── gossip.go # Membership protocol
|
||||
│ ├── merkle.go # Merkle tree implementation
|
||||
│ └── sync.go # Data synchronization & conflict resolution
|
||||
│
|
||||
├── config/ # Configuration management
|
||||
│ └── config.go # Config loading & defaults
|
||||
│
|
||||
├── features/ # Utility features
|
||||
│ ├── auth.go # Auth utilities
|
||||
│ ├── backup.go # Backup system
|
||||
│ ├── features.go # Feature toggles
|
||||
│ ├── ratelimit.go # Rate limiting
|
||||
│ ├── revision.go # Revision history
|
||||
│ ├── tamperlog.go # Tamper-evident logging
|
||||
│ └── validation.go # TTL parsing
|
||||
│
|
||||
├── server/ # HTTP server & API
|
||||
│ ├── handlers.go # Request handlers
|
||||
│ ├── lifecycle.go # Server lifecycle
|
||||
│ ├── routes.go # Route definitions
|
||||
│ └── server.go # Server setup
|
||||
│
|
||||
├── storage/ # Data storage abstraction
|
||||
│ ├── compression.go # ZSTD compression
|
||||
│ ├── revision.go # Revision history
|
||||
│ └── storage.go # BadgerDB interface
|
||||
│
|
||||
├── types/ # Shared type definitions
|
||||
│ └── types.go # All data structures
|
||||
│
|
||||
└── utils/ # Utilities
|
||||
└── hash.go # Cryptographic hashing
|
||||
├── main.go # Main application with all functionality
|
||||
├── config.yaml # Default configuration (auto-generated)
|
||||
├── test_conflict.go # Conflict resolution testing utility
|
||||
├── node1.yaml # Example cluster node config
|
||||
├── node2.yaml # Example cluster node config
|
||||
├── node3.yaml # Example cluster node config
|
||||
├── go.mod # Go module dependencies
|
||||
├── go.sum # Go module checksums
|
||||
└── README.md # This documentation
|
||||
```
|
||||
|
||||
### Key Data Structures
|
||||
@@ -521,7 +318,6 @@ type StoredValue struct {
|
||||
|
||||
| Setting | Description | Default | Notes |
|
||||
|---------|-------------|---------|-------|
|
||||
| **Core Settings** |
|
||||
| `node_id` | Unique identifier for this node | hostname | Must be unique across cluster |
|
||||
| `bind_address` | IP address to bind HTTP server | "127.0.0.1" | Use 0.0.0.0 for external access |
|
||||
| `port` | HTTP port for API and cluster communication | 8080 | Must be accessible to peers |
|
||||
@@ -529,20 +325,8 @@ type StoredValue struct {
|
||||
| `seed_nodes` | List of initial cluster nodes | [] | Empty = standalone mode |
|
||||
| `read_only` | Enable read-only mode | false | Accepts replication, rejects client writes |
|
||||
| `log_level` | Logging verbosity | "info" | debug/info/warn/error |
|
||||
| **Cluster Timing** |
|
||||
| `gossip_interval_min/max` | Gossip frequency range | 60-120 sec | Randomized interval |
|
||||
| `sync_interval` | Regular Merkle sync frequency | 300 sec | How often to sync with peers |
|
||||
| `catchup_interval` | Catch-up sync frequency | 120 sec | Faster sync when behind |
|
||||
| `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default |
|
||||
| **Feature Toggles** |
|
||||
| `auth_enabled` | JWT authentication system | true | Complete auth/authz system |
|
||||
| `allow_anonymous_read` | Allow unauthenticated read access | false | When auth_enabled, controls KV GET endpoints |
|
||||
| `allow_anonymous_write` | Allow unauthenticated write access | false | When auth_enabled, controls KV PUT endpoints |
|
||||
| `clustering_enabled` | Gossip protocol and sync | true | Distributed mode |
|
||||
| `compression_enabled` | ZSTD compression | true | Reduces storage size |
|
||||
| `rate_limiting_enabled` | Rate limiting | true | Per-client limits |
|
||||
| `tamper_logging_enabled` | Cryptographic audit trail | true | Security logging |
|
||||
| `revision_history_enabled` | Automatic versioning | true | Data history tracking |
|
||||
| `sync_interval` | Regular sync frequency | 300 sec | How often to sync with peers |
|
||||
| `catchup_interval` | Catch-up sync frequency | 120 sec | Faster sync when behind |
|
||||
| `bootstrap_max_age_hours` | Max historical data to sync | 720 hours | 30 days default |
|
||||
| `throttle_delay_ms` | Delay between sync requests | 100 ms | Prevents overwhelming peers |
|
||||
@@ -562,20 +346,18 @@ type StoredValue struct {
|
||||
- IPv4 private networks supported (IPv6 not tested)
|
||||
|
||||
### Limitations
|
||||
- No authentication/authorization (planned for future releases)
|
||||
- No encryption in transit (use reverse proxy for TLS)
|
||||
- No cross-key transactions or ACID guarantees
|
||||
- No cross-key transactions
|
||||
- No complex queries (key-based lookups only)
|
||||
- No automatic data sharding (single keyspace per cluster)
|
||||
- No multi-datacenter replication
|
||||
- No data compression (planned for future releases)
|
||||
|
||||
### Performance Characteristics
|
||||
- **Read Latency**: ~1ms (local BadgerDB lookup)
|
||||
- **Write Latency**: ~5ms (local write + indexing + optional compression)
|
||||
- **Replication Lag**: 10-30 seconds with Merkle tree sync
|
||||
- **Memory Usage**: Minimal (BadgerDB + Merkle tree caching)
|
||||
- **Disk Usage**: Raw JSON + metadata + optional compression (10-50% savings)
|
||||
- **Conflict Resolution**: Sub-second convergence time
|
||||
- **Cluster Formation**: ~10-20 seconds for gossip stabilization
|
||||
- **Write Latency**: ~5ms (local write + timestamp indexing)
|
||||
- **Replication Lag**: 30 seconds - 5 minutes depending on sync cycles
|
||||
- **Memory Usage**: Minimal (BadgerDB handles caching efficiently)
|
||||
- **Disk Usage**: Raw JSON + metadata overhead (~20-30%)
|
||||
|
||||
## 🛡️ Production Considerations
|
||||
|
||||
|
||||
26
auth/auth.go
26
auth/auth.go
@@ -26,15 +26,13 @@ type AuthContext struct {
|
||||
type AuthService struct {
|
||||
db *badger.DB
|
||||
logger *logrus.Logger
|
||||
config *types.Config
|
||||
}
|
||||
|
||||
// NewAuthService creates a new authentication service
|
||||
func NewAuthService(db *badger.DB, logger *logrus.Logger, config *types.Config) *AuthService {
|
||||
func NewAuthService(db *badger.DB, logger *logrus.Logger) *AuthService {
|
||||
return &AuthService{
|
||||
db: db,
|
||||
logger: logger,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,23 +206,17 @@ func GetAuthContext(ctx context.Context) *AuthContext {
|
||||
|
||||
// HasUsers checks if any users exist in the database
|
||||
func (s *AuthService) HasUsers() (bool, error) {
|
||||
var hasUsers bool
|
||||
|
||||
var has bool
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = false // We only need to check if keys exist
|
||||
iterator := txn.NewIterator(opts)
|
||||
defer iterator.Close()
|
||||
|
||||
// Look for any key starting with "user:"
|
||||
prefix := []byte("user:")
|
||||
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
|
||||
hasUsers = true
|
||||
return nil // Found at least one user, can exit early
|
||||
}
|
||||
opts.PrefetchValues = false // Only need keys
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
prefix := []byte("user:") // Adjust if UserStorageKey uses a different prefix
|
||||
it.Seek(prefix)
|
||||
has = it.ValidForPrefix(prefix)
|
||||
return nil
|
||||
})
|
||||
|
||||
return hasUsers, err
|
||||
return has, err
|
||||
}
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// ClusterAuthService handles authentication for inter-cluster communication
|
||||
type ClusterAuthService struct {
|
||||
clusterSecret string
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
// NewClusterAuthService creates a new cluster authentication service
|
||||
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
|
||||
return &ClusterAuthService{
|
||||
clusterSecret: clusterSecret,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Middleware validates cluster authentication headers
|
||||
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Extract authentication headers
|
||||
clusterSecret := r.Header.Get("X-Cluster-Secret")
|
||||
nodeID := r.Header.Get("X-Node-ID")
|
||||
|
||||
// Log authentication attempt
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": nodeID,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
"path": r.URL.Path,
|
||||
"method": r.Method,
|
||||
}).Debug("Cluster authentication attempt")
|
||||
|
||||
// Validate cluster secret
|
||||
if clusterSecret == "" {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": nodeID,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
"path": r.URL.Path,
|
||||
}).Warn("Missing X-Cluster-Secret header")
|
||||
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
if clusterSecret != s.clusterSecret {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": nodeID,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
"path": r.URL.Path,
|
||||
}).Warn("Invalid cluster secret")
|
||||
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate node ID is present
|
||||
if nodeID == "" {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"remote_addr": r.RemoteAddr,
|
||||
"path": r.URL.Path,
|
||||
}).Warn("Missing X-Node-ID header")
|
||||
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Authentication successful
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": nodeID,
|
||||
"path": r.URL.Path,
|
||||
}).Debug("Cluster authentication successful")
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
@@ -64,4 +64,4 @@ func ValidateJWT(tokenString string) (*JWTClaims, error) {
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("invalid token")
|
||||
}
|
||||
}
|
||||
@@ -33,7 +33,7 @@ func (s *AuthService) Middleware(requiredScopes []string, resourceKeyExtractor f
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Authenticate request
|
||||
authCtx, err := s.AuthenticateRequest(r)
|
||||
if err != nil {
|
||||
@@ -102,7 +102,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Extract auth context to get user UUID
|
||||
authCtx := GetAuthContext(r.Context())
|
||||
if authCtx == nil {
|
||||
@@ -110,7 +110,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Check rate limit
|
||||
allowed, err := s.checkRateLimit(authCtx.UserUUID)
|
||||
if err != nil {
|
||||
@@ -118,32 +118,31 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
if !allowed {
|
||||
s.authService.logger.WithFields(logrus.Fields{
|
||||
"user_uuid": authCtx.UserUUID,
|
||||
"limit": s.config.RateLimitRequests,
|
||||
"window": s.config.RateLimitWindow,
|
||||
}).Info("Rate limit exceeded")
|
||||
|
||||
|
||||
// Set rate limit headers
|
||||
w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests))
|
||||
w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow)
|
||||
|
||||
|
||||
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
next(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// isAuthEnabled checks if authentication is enabled from config
|
||||
// isAuthEnabled checks if authentication is enabled (would be passed from config)
|
||||
func (s *AuthService) isAuthEnabled() bool {
|
||||
if s.config != nil {
|
||||
return s.config.AuthEnabled
|
||||
}
|
||||
return true // Default to enabled if no config
|
||||
// This would normally be injected from config, but for now we'll assume enabled
|
||||
// TODO: Inject config dependency
|
||||
return true
|
||||
}
|
||||
|
||||
// Helper method to check rate limits (simplified version)
|
||||
@@ -151,8 +150,8 @@ func (s *RateLimitService) checkRateLimit(userUUID string) (bool, error) {
|
||||
if s.config.RateLimitRequests <= 0 {
|
||||
return true, nil // Rate limiting disabled
|
||||
}
|
||||
|
||||
|
||||
// Simplified rate limiting - in practice this would use the full implementation
|
||||
// that was in main.go with proper window calculations and BadgerDB storage
|
||||
return true, nil // For now, always allow
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
||||
return (permissions & types.PermGroupCreate) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersCreate) != 0
|
||||
|
||||
|
||||
case "delete":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerDelete) != 0
|
||||
@@ -24,7 +24,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
||||
return (permissions & types.PermGroupDelete) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersDelete) != 0
|
||||
|
||||
|
||||
case "write":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerWrite) != 0
|
||||
@@ -33,7 +33,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
||||
return (permissions & types.PermGroupWrite) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersWrite) != 0
|
||||
|
||||
|
||||
case "read":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerRead) != 0
|
||||
@@ -42,7 +42,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
||||
return (permissions & types.PermGroupRead) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersRead) != 0
|
||||
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
@@ -51,7 +51,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
||||
// CheckUserResourceRelationship determines user relationship to resource
|
||||
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
|
||||
isOwner = (userUUID == metadata.OwnerUUID)
|
||||
|
||||
|
||||
if metadata.GroupUUID != "" {
|
||||
for _, groupUUID := range userGroups {
|
||||
if groupUUID == metadata.GroupUUID {
|
||||
@@ -60,6 +60,6 @@ func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMeta
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return isOwner, isGroupMember
|
||||
}
|
||||
}
|
||||
@@ -16,4 +16,4 @@ func TokenStorageKey(tokenHash string) string {
|
||||
|
||||
func ResourceMetadataKey(resourceKey string) string {
|
||||
return resourceKey + ":metadata"
|
||||
}
|
||||
}
|
||||
@@ -82,19 +82,10 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/members/join", seedAddr)
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to create join request")
|
||||
return false
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"seed": seedAddr,
|
||||
@@ -151,4 +142,4 @@ func (s *BootstrapService) performGradualSync() {
|
||||
}
|
||||
|
||||
s.logger.Info("Gradual sync completed")
|
||||
}
|
||||
}
|
||||
@@ -17,13 +17,13 @@ import (
|
||||
|
||||
// GossipService handles gossip protocol operations
|
||||
type GossipService struct {
|
||||
config *types.Config
|
||||
members map[string]*types.Member
|
||||
membersMu sync.RWMutex
|
||||
logger *logrus.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
config *types.Config
|
||||
members map[string]*types.Member
|
||||
membersMu sync.RWMutex
|
||||
logger *logrus.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewGossipService creates a new gossip service
|
||||
@@ -44,7 +44,7 @@ func (s *GossipService) Start() {
|
||||
s.logger.Info("Clustering disabled, skipping gossip routine")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.gossipRoutine()
|
||||
}
|
||||
@@ -181,20 +181,11 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send HTTP request to peer with cluster authentication
|
||||
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
|
||||
// Send HTTP request to peer
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to create gossip request")
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peer.Address,
|
||||
@@ -309,4 +300,4 @@ func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID
|
||||
func (s *GossipService) GetJoinedTimestamp() int64 {
|
||||
// This should be implemented by the server that uses this service
|
||||
return time.Now().UnixMilli()
|
||||
}
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
|
||||
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
// Configure TLS if enabled
|
||||
if config.ClusterTLSEnabled {
|
||||
tlsConfig := &tls.Config{
|
||||
InsecureSkipVerify: config.ClusterTLSSkipVerify,
|
||||
}
|
||||
|
||||
client.Transport = &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
// AddClusterAuthHeaders adds authentication headers to an HTTP request
|
||||
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
|
||||
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
|
||||
req.Header.Set("X-Node-ID", config.NodeID)
|
||||
}
|
||||
|
||||
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
|
||||
func GetProtocol(config *types.Config) string {
|
||||
if config.ClusterTLSEnabled {
|
||||
return "https"
|
||||
}
|
||||
return "http"
|
||||
}
|
||||
@@ -170,7 +170,7 @@ func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.Me
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err)
|
||||
}
|
||||
|
||||
|
||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
||||
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
||||
}
|
||||
}
|
||||
112
cluster/sync.go
112
cluster/sync.go
@@ -51,11 +51,11 @@ func (s *SyncService) Start() {
|
||||
s.logger.Info("Clustering disabled, skipping sync routines")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Start sync routine
|
||||
s.wg.Add(1)
|
||||
go s.syncRoutine()
|
||||
|
||||
|
||||
// Start Merkle tree rebuild routine
|
||||
s.wg.Add(1)
|
||||
go s.merkleTreeRebuildRoutine()
|
||||
@@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() {
|
||||
// 2. Compare roots and start recursive diffing if they differ
|
||||
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peer.Address,
|
||||
"local_root": hex.EncodeToString(localRoot.Hash),
|
||||
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
||||
"peer": peer.Address,
|
||||
"local_root": hex.EncodeToString(localRoot.Hash),
|
||||
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
||||
}).Info("Merkle roots differ, starting recursive diff")
|
||||
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
||||
} else {
|
||||
@@ -186,17 +186,10 @@ func (s *SyncService) performMerkleSync() {
|
||||
|
||||
// requestMerkleRoot requests the Merkle root from a peer
|
||||
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
||||
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -223,7 +216,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re
|
||||
// Hashes differ, need to go deeper.
|
||||
// Request children from the remote peer for the current range.
|
||||
req := types.MerkleTreeDiffRequest{
|
||||
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
||||
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
||||
LocalHash: localNode.Hash, // Our hash for this range
|
||||
}
|
||||
|
||||
@@ -301,17 +294,10 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
|
||||
|
||||
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -412,14 +398,14 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
||||
|
||||
// Timestamps are equal - need sophisticated conflict resolution
|
||||
s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule")
|
||||
|
||||
|
||||
// Get cluster members to determine which node is older
|
||||
members := s.gossipService.GetMembers()
|
||||
|
||||
|
||||
// Find the local node and the remote node in membership
|
||||
var localMember, remoteMember *types.Member
|
||||
localNodeID := s.config.NodeID
|
||||
|
||||
|
||||
for _, member := range members {
|
||||
if member.ID == localNodeID {
|
||||
localMember = member
|
||||
@@ -428,16 +414,16 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
||||
remoteMember = member
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
||||
if localMember == nil || remoteMember == nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"peerAddress": peerAddress,
|
||||
"localNodeID": localNodeID,
|
||||
"localMember": localMember != nil,
|
||||
"remoteMember": remoteMember != nil,
|
||||
"totalMembers": len(members),
|
||||
"key": key,
|
||||
"peerAddress": peerAddress,
|
||||
"localNodeID": localNodeID,
|
||||
"localMember": localMember != nil,
|
||||
"remoteMember": remoteMember != nil,
|
||||
"totalMembers": len(members),
|
||||
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
||||
if remote.UUID < local.UUID {
|
||||
// Remote UUID lexically smaller (deterministic choice)
|
||||
@@ -450,49 +436,41 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
||||
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
// Apply oldest-node rule: node with earliest joined_timestamp wins
|
||||
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
|
||||
// Remote node is older, its data wins
|
||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||
if err == nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// Local node is older or equal, keep local data
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||
jsonData, err := json.Marshal(reqData)
|
||||
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||
jsonData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -547,28 +525,20 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
|
||||
|
||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||
reqData := types.KVRangeRequest{
|
||||
req := types.KVRangeRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: 0, // No limit
|
||||
}
|
||||
jsonData, err := json.Marshal(reqData)
|
||||
jsonData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
|
||||
protocol := GetProtocol(s.config)
|
||||
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
|
||||
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
||||
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
AddClusterAuthHeaders(req, s.config)
|
||||
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -598,4 +568,4 @@ func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey st
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,12 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
"kvs/types"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Default configuration
|
||||
@@ -29,61 +27,37 @@ func Default() *types.Config {
|
||||
BootstrapMaxAgeHours: 720, // 30 days
|
||||
ThrottleDelayMs: 100,
|
||||
FetchDelayMs: 50,
|
||||
|
||||
|
||||
// Default compression settings
|
||||
CompressionEnabled: true,
|
||||
CompressionLevel: 3, // Balance between performance and compression ratio
|
||||
|
||||
|
||||
// Default TTL and size limit settings
|
||||
DefaultTTL: "0", // No default TTL
|
||||
MaxJSONSize: 1048576, // 1MB default max JSON size
|
||||
|
||||
DefaultTTL: "0", // No default TTL
|
||||
MaxJSONSize: 1048576, // 1MB default max JSON size
|
||||
|
||||
// Default rate limiting settings
|
||||
RateLimitRequests: 100, // 100 requests per window
|
||||
RateLimitWindow: "1m", // 1 minute window
|
||||
|
||||
|
||||
// Default tamper-evident logging settings
|
||||
TamperLogActions: []string{"data_write", "user_create", "auth_failure"},
|
||||
|
||||
|
||||
// Default backup system settings
|
||||
BackupEnabled: true,
|
||||
BackupSchedule: "0 0 * * *", // Daily at midnight
|
||||
BackupPath: "./backups",
|
||||
BackupRetention: 7, // Keep backups for 7 days
|
||||
|
||||
|
||||
// Default feature toggle settings (all enabled by default)
|
||||
AuthEnabled: true,
|
||||
TamperLoggingEnabled: true,
|
||||
ClusteringEnabled: true,
|
||||
RateLimitingEnabled: true,
|
||||
RevisionHistoryEnabled: true,
|
||||
|
||||
// Default anonymous access settings (both disabled by default for security)
|
||||
AllowAnonymousRead: false,
|
||||
AllowAnonymousWrite: false,
|
||||
|
||||
// Default cluster authentication settings (Issue #13)
|
||||
ClusterSecret: generateClusterSecret(),
|
||||
ClusterTLSEnabled: false,
|
||||
ClusterTLSCertFile: "",
|
||||
ClusterTLSKeyFile: "",
|
||||
ClusterTLSSkipVerify: false,
|
||||
}
|
||||
}
|
||||
|
||||
// generateClusterSecret generates a cryptographically secure random cluster secret
|
||||
func generateClusterSecret() string {
|
||||
// Generate 32 bytes (256 bits) of random data
|
||||
randomBytes := make([]byte, 32)
|
||||
if _, err := rand.Read(randomBytes); err != nil {
|
||||
// Fallback to a warning - this should never happen in practice
|
||||
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
|
||||
return ""
|
||||
}
|
||||
// Encode as base64 for easy configuration file storage
|
||||
return base64.StdEncoding.EncodeToString(randomBytes)
|
||||
}
|
||||
|
||||
// Load configuration from file or create default
|
||||
func Load(configPath string) (*types.Config, error) {
|
||||
config := Default()
|
||||
@@ -116,13 +90,5 @@ func Load(configPath string) (*types.Config, error) {
|
||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||
}
|
||||
|
||||
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
|
||||
if config.ClusteringEnabled && config.ClusterSecret == "" {
|
||||
config.ClusterSecret = generateClusterSecret()
|
||||
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
|
||||
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
|
||||
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
}
|
||||
@@ -99,4 +99,4 @@ func ExtractKVResourceKey(r *http.Request) string {
|
||||
return path
|
||||
}
|
||||
return ""
|
||||
}
|
||||
}
|
||||
@@ -8,4 +8,4 @@ import (
|
||||
// GetBackupFilename generates a filename for a backup
|
||||
func GetBackupFilename(timestamp time.Time) string {
|
||||
return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02"))
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
// Package features provides utility functions for KVS authentication, validation,
|
||||
// logging, backup, and other operational features. These functions were extracted
|
||||
// from main.go to improve code organization and maintainability.
|
||||
package features
|
||||
package features
|
||||
@@ -5,4 +5,4 @@ import "fmt"
|
||||
// GetRateLimitKey generates the storage key for rate limiting
|
||||
func GetRateLimitKey(userUUID string, windowStart int64) string {
|
||||
return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart)
|
||||
}
|
||||
}
|
||||
@@ -5,4 +5,4 @@ import "fmt"
|
||||
// GetRevisionKey generates the storage key for a specific revision
|
||||
func GetRevisionKey(baseKey string, revision int) string {
|
||||
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
||||
}
|
||||
}
|
||||
@@ -21,4 +21,4 @@ func GenerateLogSignature(timestamp, action, userUUID, resource string) string {
|
||||
// Concatenate all fields in a deterministic order
|
||||
data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource)
|
||||
return utils.HashSHA3512(data)
|
||||
}
|
||||
}
|
||||
@@ -21,4 +21,4 @@ func ParseTTL(ttlString string) (time.Duration, error) {
|
||||
}
|
||||
|
||||
return duration, nil
|
||||
}
|
||||
}
|
||||
@@ -91,8 +91,6 @@ port: 8090
|
||||
data_dir: "./basic_data"
|
||||
seed_nodes: []
|
||||
log_level: "error"
|
||||
allow_anonymous_read: true
|
||||
allow_anonymous_write: true
|
||||
EOF
|
||||
|
||||
# Start node
|
||||
@@ -124,10 +122,7 @@ EOF
|
||||
# Test 3: Cluster formation
|
||||
test_cluster_formation() {
|
||||
test_start "2-node cluster formation and Merkle Tree replication"
|
||||
|
||||
# Shared cluster secret for authentication (Issue #13)
|
||||
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
|
||||
|
||||
|
||||
# Node 1 config
|
||||
cat > cluster1.yaml <<EOF
|
||||
node_id: "cluster-1"
|
||||
@@ -139,11 +134,8 @@ log_level: "error"
|
||||
gossip_interval_min: 5
|
||||
gossip_interval_max: 10
|
||||
sync_interval: 10
|
||||
allow_anonymous_read: true
|
||||
allow_anonymous_write: true
|
||||
cluster_secret: "$CLUSTER_SECRET"
|
||||
EOF
|
||||
|
||||
|
||||
# Node 2 config
|
||||
cat > cluster2.yaml <<EOF
|
||||
node_id: "cluster-2"
|
||||
@@ -155,9 +147,6 @@ log_level: "error"
|
||||
gossip_interval_min: 5
|
||||
gossip_interval_max: 10
|
||||
sync_interval: 10
|
||||
allow_anonymous_read: true
|
||||
allow_anonymous_write: true
|
||||
cluster_secret: "$CLUSTER_SECRET"
|
||||
EOF
|
||||
|
||||
# Start nodes
|
||||
@@ -235,18 +224,15 @@ EOF
|
||||
# but same path. The Merkle tree sync should then trigger conflict resolution.
|
||||
test_conflict_resolution() {
|
||||
test_start "Conflict resolution test (Merkle Tree based)"
|
||||
|
||||
|
||||
# Create conflicting data using our utility
|
||||
rm -rf conflict1_data conflict2_data 2>/dev/null || true
|
||||
mkdir -p conflict1_data conflict2_data
|
||||
|
||||
|
||||
cd "$SCRIPT_DIR"
|
||||
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
||||
cd "$TEST_DIR"
|
||||
|
||||
# Shared cluster secret for authentication (Issue #13)
|
||||
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
|
||||
|
||||
|
||||
# Create configs
|
||||
cat > conflict1.yaml <<EOF
|
||||
node_id: "conflict-1"
|
||||
@@ -256,11 +242,8 @@ data_dir: "./conflict1_data"
|
||||
seed_nodes: []
|
||||
log_level: "info"
|
||||
sync_interval: 3
|
||||
allow_anonymous_read: true
|
||||
allow_anonymous_write: true
|
||||
cluster_secret: "$CLUSTER_SECRET"
|
||||
EOF
|
||||
|
||||
|
||||
cat > conflict2.yaml <<EOF
|
||||
node_id: "conflict-2"
|
||||
bind_address: "127.0.0.1"
|
||||
@@ -269,9 +252,6 @@ data_dir: "./conflict2_data"
|
||||
seed_nodes: ["127.0.0.1:8111"]
|
||||
log_level: "info"
|
||||
sync_interval: 3
|
||||
allow_anonymous_read: true
|
||||
allow_anonymous_write: true
|
||||
cluster_secret: "$CLUSTER_SECRET"
|
||||
EOF
|
||||
|
||||
# Start nodes
|
||||
@@ -371,79 +351,6 @@ EOF
|
||||
fi
|
||||
}
|
||||
|
||||
# Test 5: Authentication middleware (Issue #4)
|
||||
test_authentication_middleware() {
|
||||
test_start "Authentication middleware test (Issue #4)"
|
||||
|
||||
# Create auth test config
|
||||
cat > auth_test.yaml <<EOF
|
||||
node_id: "auth-test"
|
||||
bind_address: "127.0.0.1"
|
||||
port: 8095
|
||||
data_dir: "./auth_test_data"
|
||||
seed_nodes: []
|
||||
log_level: "error"
|
||||
auth_enabled: true
|
||||
allow_anonymous_read: false
|
||||
allow_anonymous_write: false
|
||||
EOF
|
||||
|
||||
# Start node
|
||||
$BINARY auth_test.yaml >auth_test.log 2>&1 &
|
||||
local pid=$!
|
||||
|
||||
if wait_for_service 8095; then
|
||||
sleep 2 # Allow root account creation
|
||||
|
||||
# Extract the token from logs
|
||||
local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r')
|
||||
|
||||
if [ -z "$token" ]; then
|
||||
log_error "Failed to extract authentication token from logs"
|
||||
kill $pid 2>/dev/null || true
|
||||
return
|
||||
fi
|
||||
|
||||
# Test 1: Admin endpoints should fail without authentication
|
||||
local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}')
|
||||
if echo "$no_auth_response" | grep -q "Unauthorized"; then
|
||||
log_success "Admin endpoints properly reject unauthenticated requests"
|
||||
else
|
||||
log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response"
|
||||
fi
|
||||
|
||||
# Test 2: Admin endpoints should work with valid authentication
|
||||
local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}')
|
||||
if echo "$auth_response" | grep -q "uuid"; then
|
||||
log_success "Admin endpoints work with valid authentication"
|
||||
else
|
||||
log_error "Admin endpoints should work with authentication, got: $auth_response"
|
||||
fi
|
||||
|
||||
# Test 3: KV endpoints should require auth when anonymous access is disabled
|
||||
local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}')
|
||||
if echo "$kv_no_auth" | grep -q "Unauthorized"; then
|
||||
log_success "KV endpoints properly require authentication when anonymous access disabled"
|
||||
else
|
||||
log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth"
|
||||
fi
|
||||
|
||||
# Test 4: KV endpoints should work with valid authentication
|
||||
local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}')
|
||||
if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then
|
||||
log_success "KV endpoints work with valid authentication"
|
||||
else
|
||||
log_error "KV endpoints should work with authentication, got: $kv_auth"
|
||||
fi
|
||||
|
||||
kill $pid 2>/dev/null || true
|
||||
sleep 2
|
||||
else
|
||||
log_error "Auth test node failed to start"
|
||||
kill $pid 2>/dev/null || true
|
||||
fi
|
||||
}
|
||||
|
||||
# Main test execution
|
||||
main() {
|
||||
echo "=================================================="
|
||||
@@ -461,7 +368,6 @@ main() {
|
||||
test_basic_functionality
|
||||
test_cluster_formation
|
||||
test_conflict_resolution
|
||||
test_authentication_middleware
|
||||
|
||||
# Results
|
||||
echo "=================================================="
|
||||
|
||||
65
issues/2.md
65
issues/2.md
@@ -1,65 +0,0 @@
|
||||
# Issue #2: Update README.md
|
||||
|
||||
**Status:** ✅ **COMPLETED** *(updated during this session)*
|
||||
**Author:** MrKalzu
|
||||
**Created:** 2025-09-12 22:01:34 +03:00
|
||||
**Repository:** https://git.rauhala.info/ryyst/kalzu-value-store/issues/2
|
||||
|
||||
## Description
|
||||
|
||||
"It feels like the readme has lot of expired info after the latest update."
|
||||
|
||||
## Problem
|
||||
|
||||
The project's README file contained outdated information that needed to be revised following recent updates and refactoring.
|
||||
|
||||
## Resolution Status
|
||||
|
||||
**✅ COMPLETED** - The README.md has been comprehensively updated to reflect the current state of the codebase.
|
||||
|
||||
## Updates Made
|
||||
|
||||
### Architecture & Features
|
||||
- ✅ Updated key features to include Merkle Tree sync, JWT authentication, and modular architecture
|
||||
- ✅ Revised architecture diagram to show modular components
|
||||
- ✅ Added authentication and authorization sections
|
||||
- ✅ Updated conflict resolution description
|
||||
|
||||
### Configuration
|
||||
- ✅ Added comprehensive configuration options including feature toggles
|
||||
- ✅ Updated default values to match actual implementation
|
||||
- ✅ Added feature toggle documentation (auth, clustering, compression, etc.)
|
||||
- ✅ Included backup and tamper logging configuration
|
||||
|
||||
### API Documentation
|
||||
- ✅ Added JWT authentication examples
|
||||
- ✅ Updated API endpoints with proper authorization headers
|
||||
- ✅ Added authentication endpoints documentation
|
||||
- ✅ Included Merkle tree and sync endpoints
|
||||
|
||||
### Project Structure
|
||||
- ✅ Completely updated project structure to reflect modular architecture
|
||||
- ✅ Documented all packages (auth/, cluster/, storage/, server/, etc.)
|
||||
- ✅ Updated file organization to match current codebase
|
||||
|
||||
### Development & Testing
|
||||
- ✅ Updated build and test commands
|
||||
- ✅ Added integration test suite documentation
|
||||
- ✅ Updated conflict resolution testing procedures
|
||||
- ✅ Added code quality tools documentation
|
||||
|
||||
### Performance & Limitations
|
||||
- ✅ Updated performance characteristics with Merkle sync improvements
|
||||
- ✅ Revised limitations to reflect implemented features
|
||||
- ✅ Added realistic timing expectations
|
||||
|
||||
## Current Status
|
||||
|
||||
The README.md now accurately reflects:
|
||||
- Current modular architecture
|
||||
- All implemented features and capabilities
|
||||
- Proper configuration options
|
||||
- Updated development workflow
|
||||
- Comprehensive API documentation
|
||||
|
||||
**This issue has been resolved.**
|
||||
71
issues/3.md
71
issues/3.md
@@ -1,71 +0,0 @@
|
||||
# Issue #3: Implement Autogenerated Root Account for Initial Setup
|
||||
|
||||
**Status:** ✅ **COMPLETED**
|
||||
**Author:** MrKalzu
|
||||
**Created:** 2025-09-12 22:17:12 +03:00
|
||||
**Repository:** https://git.rauhala.info/ryyst/kalzu-value-store/issues/3
|
||||
|
||||
## Problem Statement
|
||||
|
||||
The KVS server lacks a mechanism to create an initial administrative user when starting with an empty database and no seed nodes. This makes it impossible to interact with authentication-protected endpoints during initial setup.
|
||||
|
||||
## Current Challenge
|
||||
|
||||
- Empty database + no seed nodes = no way to authenticate
|
||||
- No existing users means no way to create API tokens
|
||||
- Authentication-protected endpoints become inaccessible
|
||||
- Manual database seeding required for initial setup
|
||||
|
||||
## Proposed Solution
|
||||
|
||||
### 1. Detection Logic
|
||||
- Detect empty database condition
|
||||
- Verify no seed nodes are configured
|
||||
- Only trigger on initial startup with empty state
|
||||
|
||||
### 2. Root Account Generation
|
||||
Create a default "root" user with:
|
||||
- **Server-generated UUID**
|
||||
- **Hashed nickname** (e.g., "root")
|
||||
- **Assigned to default "admin" group**
|
||||
- **Full administrative privileges**
|
||||
|
||||
### 3. API Token Creation
|
||||
- Generate API token with administrative scopes
|
||||
- Include all necessary permissions for initial setup
|
||||
- Set reasonable expiration time
|
||||
|
||||
### 4. Secure Token Distribution
|
||||
- **Securely log the token to console** (one-time display)
|
||||
- **Persist user and token in BadgerDB**
|
||||
- **Clear token from memory after logging**
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Relevant Code Sections
|
||||
- `NewServer` function - Add initialization logic
|
||||
- `User`, `Group`, `APIToken` structs - Use existing data structures
|
||||
- Hashing and storage key functions - Leverage existing auth system
|
||||
|
||||
### Proposed Changes (from MrKalzu's comment)
|
||||
- **Added `HasUsers() (bool, error)`** to `auth/auth.go`
|
||||
- **Added "Initial root account setup for empty DB with no seeds"** to `server/server.go`
|
||||
- **Diff file attached** with implementation details
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- Token should be displayed only once during startup
|
||||
- Token should have reasonable expiration
|
||||
- Root account should be clearly identified in logs
|
||||
- Consider forcing password change on first use (future enhancement)
|
||||
|
||||
## Benefits
|
||||
|
||||
- Enables zero-configuration initial setup
|
||||
- Provides secure bootstrap process
|
||||
- Eliminates manual database seeding
|
||||
- Supports automated deployment scenarios
|
||||
|
||||
## Dependencies
|
||||
|
||||
This issue blocks **Issue #4** (securing administrative endpoints), as it provides the mechanism for initial administrative access.
|
||||
59
issues/4.md
59
issues/4.md
@@ -1,59 +0,0 @@
|
||||
# Issue #4: Secure User and Group Management Endpoints with Authentication Middleware
|
||||
|
||||
**Status:** Open
|
||||
**Author:** MrKalzu
|
||||
**Created:** 2025-09-12
|
||||
**Assignee:** ryyst
|
||||
**Repository:** https://git.rauhala.info/ryyst/kalzu-value-store/issues/4
|
||||
|
||||
## Description
|
||||
|
||||
**Security Vulnerability:** User, group, and token management API endpoints are currently exposed without authentication, creating a significant security risk.
|
||||
|
||||
## Current Problem
|
||||
|
||||
The following administrative endpoints are accessible without authentication:
|
||||
- User management endpoints (`createUserHandler`, `getUserHandler`, etc.)
|
||||
- Group management endpoints
|
||||
- Token management endpoints
|
||||
|
||||
## Proposed Solution
|
||||
|
||||
### 1. Define Granular Administrative Scopes
|
||||
|
||||
Create specific administrative scopes for fine-grained access control:
|
||||
- `admin:users:create` - Create new users
|
||||
- `admin:users:read` - View user information
|
||||
- `admin:users:update` - Modify user data
|
||||
- `admin:users:delete` - Remove users
|
||||
- `admin:groups:create` - Create new groups
|
||||
- `admin:groups:read` - View group information
|
||||
- `admin:groups:update` - Modify group membership
|
||||
- `admin:groups:delete` - Remove groups
|
||||
- `admin:tokens:create` - Generate API tokens
|
||||
- `admin:tokens:revoke` - Revoke API tokens
|
||||
|
||||
### 2. Apply Authentication Middleware
|
||||
|
||||
Wrap all administrative handlers with `authMiddleware` and specific scope requirements:
|
||||
|
||||
```go
|
||||
// Example implementation
|
||||
router.Handle("/auth/users", authMiddleware("admin:users:create")(createUserHandler))
|
||||
router.Handle("/auth/users/{id}", authMiddleware("admin:users:read")(getUserHandler))
|
||||
```
|
||||
|
||||
## Dependencies
|
||||
|
||||
- **Depends on Issue #3**: Requires implementation of autogenerated root account for initial setup
|
||||
|
||||
## Security Benefits
|
||||
|
||||
- Prevents unauthorized administrative access
|
||||
- Implements principle of least privilege
|
||||
- Provides audit trail for administrative operations
|
||||
- Protects against privilege escalation attacks
|
||||
|
||||
## Implementation Priority
|
||||
|
||||
**High Priority** - This addresses a critical security vulnerability that could allow unauthorized access to administrative functions.
|
||||
47
issues/5.md
47
issues/5.md
@@ -1,47 +0,0 @@
|
||||
# Issue #5: Add Configuration for Anonymous Read and Write Access to KV Endpoints
|
||||
|
||||
**Status:** Open
|
||||
**Author:** MrKalzu
|
||||
**Created:** 2025-09-12
|
||||
**Repository:** https://git.rauhala.info/ryyst/kalzu-value-store/issues/5
|
||||
|
||||
## Description
|
||||
|
||||
Currently, KV endpoints are publicly accessible without authentication. This issue proposes adding granular control over public access to key-value store functionality.
|
||||
|
||||
## Proposed Configuration Parameters
|
||||
|
||||
Add two new configuration parameters to the `Config` struct:
|
||||
|
||||
1. **`AllowAnonymousRead`** (boolean, default `false`)
|
||||
- Controls whether unauthenticated users can read data
|
||||
|
||||
2. **`AllowAnonymousWrite`** (boolean, default `false`)
|
||||
- Controls whether unauthenticated users can write data
|
||||
|
||||
## Proposed Implementation Changes
|
||||
|
||||
### Modify `setupRoutes` Function
|
||||
- Conditionally apply authentication middleware based on configuration flags
|
||||
|
||||
### Specific Handler Changes
|
||||
- **`getKVHandler`**: Apply auth middleware with "read" scope if `AllowAnonymousRead` is `false`
|
||||
- **`putKVHandler`**: Apply auth middleware with "write" scope if `AllowAnonymousWrite` is `false`
|
||||
- **`deleteKVHandler`**: Always require authentication (no anonymous delete)
|
||||
|
||||
## Goal
|
||||
|
||||
Provide granular control over public access to key-value store functionality while maintaining security for sensitive operations.
|
||||
|
||||
## Use Cases
|
||||
|
||||
- **Public read-only deployments**: Allow anonymous reading for public data
|
||||
- **Public write scenarios**: Allow anonymous data submission (like forms or logs)
|
||||
- **Secure deployments**: Require authentication for all operations
|
||||
- **Mixed access patterns**: Different permissions for read vs write operations
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- Delete operations should always require authentication
|
||||
- Consider rate limiting for anonymous access
|
||||
- Audit logging should track anonymous operations differently
|
||||
46
issues/6.md
46
issues/6.md
@@ -1,46 +0,0 @@
|
||||
# Issue #6: Configuration Options to Disable Optional Functionalities
|
||||
|
||||
**Status:** ✅ **COMPLETED**
|
||||
**Author:** MrKalzu
|
||||
**Created:** 2025-09-12
|
||||
**Repository:** https://git.rauhala.info/ryyst/kalzu-value-store/issues/6
|
||||
|
||||
## Description
|
||||
|
||||
Proposes adding configuration options to disable advanced features in the KVS (Key-Value Store) server to allow more flexible and lightweight deployment scenarios.
|
||||
|
||||
## Suggested Disablement Options
|
||||
|
||||
1. **Authentication System** - Disable JWT authentication entirely
|
||||
2. **Tamper-Evident Logging** - Disable cryptographic audit trails
|
||||
3. **Clustering** - Disable gossip protocol and distributed features
|
||||
4. **Rate Limiting** - Disable per-client rate limiting
|
||||
5. **Revision History** - Disable automatic versioning
|
||||
|
||||
## Proposed Implementation
|
||||
|
||||
- Add boolean flags to the Config struct for each feature
|
||||
- Modify server initialization and request handling to respect these flags
|
||||
- Allow conditional compilation/execution of features based on configuration
|
||||
|
||||
## Pros of Proposed Changes
|
||||
|
||||
- Reduce unnecessary overhead for simple deployments
|
||||
- Simplify setup for different deployment needs
|
||||
- Improve performance for specific use cases
|
||||
- Lower resource consumption
|
||||
|
||||
## Cons of Proposed Changes
|
||||
|
||||
- Potential security risks if features are disabled inappropriately
|
||||
- Loss of advanced functionality like audit trails or data recovery
|
||||
- Increased complexity in codebase with conditional feature logic
|
||||
|
||||
## Already Implemented Features
|
||||
|
||||
- Backup System (configurable)
|
||||
- Compression (configurable)
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
The issue suggests modifying relevant code sections to conditionally enable/disable these features based on configuration, similar to how backup and compression are currently handled.
|
||||
1
main.go
1
main.go
@@ -11,6 +11,7 @@ import (
|
||||
"kvs/server"
|
||||
)
|
||||
|
||||
|
||||
func main() {
|
||||
configPath := "./config.yaml"
|
||||
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"kvs/utils"
|
||||
)
|
||||
|
||||
|
||||
|
||||
// healthHandler returns server health status
|
||||
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
mode := s.getMode()
|
||||
@@ -1269,29 +1271,3 @@ func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error
|
||||
func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredValue, error) {
|
||||
return s.revisionService.GetSpecificRevision(key, revision)
|
||||
}
|
||||
|
||||
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
|
||||
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// Ensure clustering is enabled
|
||||
if !s.config.ClusteringEnabled {
|
||||
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure cluster secret is configured
|
||||
if s.config.ClusterSecret == "" {
|
||||
s.logger.Error("Cluster secret is not configured")
|
||||
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Return the cluster secret for secure bootstrap
|
||||
response := map[string]string{
|
||||
"cluster_secret": s.config.ClusterSecret,
|
||||
}
|
||||
|
||||
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}
|
||||
|
||||
140
server/routes.go
140
server/routes.go
@@ -1,8 +1,6 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
@@ -10,122 +8,46 @@ import (
|
||||
func (s *Server) setupRoutes() *mux.Router {
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Health endpoint (always available)
|
||||
// Health endpoint
|
||||
router.HandleFunc("/health", s.healthHandler).Methods("GET")
|
||||
|
||||
// KV endpoints (with conditional authentication based on anonymous access settings)
|
||||
// GET endpoint - require auth if anonymous read is disabled
|
||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
||||
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
||||
[]string{"read"}, nil, "",
|
||||
)(s.getKVHandler)).Methods("GET")
|
||||
} else {
|
||||
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
||||
}
|
||||
// KV endpoints
|
||||
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
||||
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||
|
||||
// PUT endpoint - require auth if anonymous write is disabled
|
||||
if s.config.AuthEnabled && !s.config.AllowAnonymousWrite {
|
||||
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
||||
[]string{"write"}, nil, "",
|
||||
)(s.putKVHandler)).Methods("PUT")
|
||||
} else {
|
||||
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
||||
}
|
||||
// Member endpoints
|
||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") // Still available for clients
|
||||
|
||||
// DELETE endpoint - always require authentication (no anonymous delete)
|
||||
if s.config.AuthEnabled {
|
||||
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
||||
[]string{"delete"}, nil, "",
|
||||
)(s.deleteKVHandler)).Methods("DELETE")
|
||||
} else {
|
||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||
}
|
||||
// Merkle Tree endpoints
|
||||
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") // New endpoint for fetching ranges
|
||||
|
||||
// Member endpoints (available when clustering is enabled)
|
||||
if s.config.ClusteringEnabled {
|
||||
// GET /members/ is unprotected for monitoring/inspection
|
||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||
// User Management endpoints
|
||||
router.HandleFunc("/api/users", s.createUserHandler).Methods("POST")
|
||||
router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET")
|
||||
router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT")
|
||||
router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE")
|
||||
|
||||
// Apply cluster authentication middleware to all cluster communication endpoints
|
||||
if s.clusterAuthService != nil {
|
||||
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
||||
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
||||
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
||||
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
|
||||
// Group Management endpoints
|
||||
router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE")
|
||||
|
||||
// Merkle Tree endpoints (clustering feature)
|
||||
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
|
||||
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
|
||||
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
||||
} else {
|
||||
// Fallback to unprotected endpoints (for backwards compatibility)
|
||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
||||
// Token Management endpoints
|
||||
router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST")
|
||||
|
||||
// Merkle Tree endpoints (clustering feature)
|
||||
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
||||
}
|
||||
}
|
||||
// Revision History endpoints
|
||||
router.HandleFunc("/api/data/{key}/history", s.getRevisionHistoryHandler).Methods("GET")
|
||||
router.HandleFunc("/api/data/{key}/history/{revision}", s.getSpecificRevisionHandler).Methods("GET")
|
||||
|
||||
// Authentication and user management endpoints (available when auth is enabled)
|
||||
if s.config.AuthEnabled {
|
||||
// User Management endpoints (with authentication middleware)
|
||||
router.Handle("/api/users", s.authService.Middleware(
|
||||
[]string{"admin:users:create"}, nil, "",
|
||||
)(s.createUserHandler)).Methods("POST")
|
||||
|
||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:users:read"}, nil, "",
|
||||
)(s.getUserHandler)).Methods("GET")
|
||||
|
||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:users:update"}, nil, "",
|
||||
)(s.updateUserHandler)).Methods("PUT")
|
||||
|
||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:users:delete"}, nil, "",
|
||||
)(s.deleteUserHandler)).Methods("DELETE")
|
||||
|
||||
// Group Management endpoints (with authentication middleware)
|
||||
router.Handle("/api/groups", s.authService.Middleware(
|
||||
[]string{"admin:groups:create"}, nil, "",
|
||||
)(s.createGroupHandler)).Methods("POST")
|
||||
|
||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:groups:read"}, nil, "",
|
||||
)(s.getGroupHandler)).Methods("GET")
|
||||
|
||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:groups:update"}, nil, "",
|
||||
)(s.updateGroupHandler)).Methods("PUT")
|
||||
|
||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||
[]string{"admin:groups:delete"}, nil, "",
|
||||
)(s.deleteGroupHandler)).Methods("DELETE")
|
||||
|
||||
// Token Management endpoints (with authentication middleware)
|
||||
router.Handle("/api/tokens", s.authService.Middleware(
|
||||
[]string{"admin:tokens:create"}, nil, "",
|
||||
)(s.createTokenHandler)).Methods("POST")
|
||||
|
||||
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
|
||||
// Allows authenticated administrators to retrieve the cluster secret for new nodes
|
||||
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
|
||||
[]string{"admin:tokens:create"}, nil, "",
|
||||
)(s.clusterBootstrapHandler)).Methods("GET")
|
||||
}
|
||||
|
||||
// Revision History endpoints (available when revision history is enabled)
|
||||
if s.config.RevisionHistoryEnabled {
|
||||
router.HandleFunc("/api/data/{key}/history", s.getRevisionHistoryHandler).Methods("GET")
|
||||
router.HandleFunc("/api/data/{key}/history/{revision}", s.getSpecificRevisionHandler).Methods("GET")
|
||||
}
|
||||
|
||||
// Backup Status endpoint (always available if backup is enabled)
|
||||
// Backup Status endpoint
|
||||
router.HandleFunc("/api/backup/status", s.getBackupStatusHandler).Methods("GET")
|
||||
|
||||
return router
|
||||
|
||||
231
server/server.go
231
server/server.go
@@ -2,18 +2,18 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"kvs/auth"
|
||||
"kvs/cluster"
|
||||
@@ -50,8 +50,7 @@ type Server struct {
|
||||
backupMu sync.RWMutex // Protects backup status
|
||||
|
||||
// Authentication service
|
||||
authService *auth.AuthService
|
||||
clusterAuthService *auth.ClusterAuthService
|
||||
authService *auth.AuthService
|
||||
}
|
||||
|
||||
// NewServer initializes and returns a new Server instance
|
||||
@@ -119,17 +118,88 @@ func NewServer(config *types.Config) (*Server, error) {
|
||||
server.revisionService = storage.NewRevisionService(storageService)
|
||||
|
||||
// Initialize authentication service
|
||||
server.authService = auth.NewAuthService(db, logger, config)
|
||||
server.authService = auth.NewAuthService(db, logger)
|
||||
|
||||
// Initialize cluster authentication service (Issue #13)
|
||||
if config.ClusteringEnabled {
|
||||
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
|
||||
}
|
||||
// New: Initial root account setup for empty DB with no seeds
|
||||
if len(config.SeedNodes) == 0 {
|
||||
hasUsers, err := server.authService.HasUsers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check for existing users: %v", err)
|
||||
}
|
||||
if !hasUsers {
|
||||
server.logger.Info("Detected empty database with no seed nodes; creating initial root account")
|
||||
|
||||
// Setup initial root account if needed (Issue #3)
|
||||
if config.AuthEnabled {
|
||||
if err := server.setupRootAccount(); err != nil {
|
||||
return nil, fmt.Errorf("failed to setup root account: %v", err)
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Create admin group
|
||||
adminGroupUUID := uuid.NewString()
|
||||
hashedGroupName := utils.HashGroupName("admin") // Adjust if function name differs
|
||||
adminGroup := types.Group{
|
||||
UUID: adminGroupUUID,
|
||||
Name: hashedGroupName,
|
||||
CreatedAt: now, // Add if field exists; remove otherwise
|
||||
// Members: []string{}, // Add if needed; e.g., add root later
|
||||
}
|
||||
groupData, err := json.Marshal(adminGroup)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal admin group: %v", err)
|
||||
}
|
||||
err = db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set([]byte(GroupStorageKey(adminGroupUUID)), groupData)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to store admin group: %v", err)
|
||||
}
|
||||
|
||||
// Create root user
|
||||
rootUUID := uuid.NewString()
|
||||
hashedNickname := utils.HashUserNickname("root") // Adjust if function name differs
|
||||
rootUser := types.User{
|
||||
UUID: rootUUID,
|
||||
Nickname: hashedNickname,
|
||||
Groups: []string{adminGroupUUID},
|
||||
CreatedAt: now, // Add if field exists; remove otherwise
|
||||
}
|
||||
userData, err := json.Marshal(rootUser)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal root user: %v", err)
|
||||
}
|
||||
err = db.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set([]byte(UserStorageKey(rootUUID)), userData)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to store root user: %v", err)
|
||||
}
|
||||
|
||||
// Optionally update group members if bidirectional
|
||||
// adminGroup.Members = append(adminGroup.Members, rootUUID)
|
||||
// Update group in DB if needed...
|
||||
|
||||
// Generate and store API token
|
||||
scopes := []string{"admin", "read", "write", "create", "delete"}
|
||||
expirationHours := 8760 // 1 year
|
||||
tokenString, expiresAt, err := auth.GenerateJWT(rootUUID, scopes, expirationHours)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate JWT: %v", err)
|
||||
}
|
||||
err = server.authService.StoreAPIToken(tokenString, rootUUID, scopes, expiresAt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to store API token: %v", err)
|
||||
}
|
||||
|
||||
// Log the details securely (only once, to stderr)
|
||||
fmt.Fprintf(os.Stderr, `
|
||||
***************************************************************************
|
||||
WARNING: Initial root user created for new server instance.
|
||||
Save this information securely—it will not be shown again.
|
||||
|
||||
Root User UUID: %s
|
||||
API Token (Bearer): %s
|
||||
Expires At: %s (UTC)
|
||||
|
||||
Use this token for authentication in API requests. Change or revoke it immediately via the API for security.
|
||||
***************************************************************************
|
||||
`, rootUUID, tokenString, time.Unix(expiresAt, 0).UTC())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,138 +268,3 @@ func (s *Server) getBackupStatus() types.BackupStatus {
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// setupRootAccount creates an initial root account if no users exist and no seed nodes are configured
|
||||
func (s *Server) setupRootAccount() error {
|
||||
// Only create root account if:
|
||||
// 1. No users exist in the database
|
||||
// 2. No seed nodes are configured (standalone mode)
|
||||
hasUsers, err := s.authService.HasUsers()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if users exist: %v", err)
|
||||
}
|
||||
|
||||
// If users already exist or we have seed nodes, no need to create root account
|
||||
if hasUsers || len(s.config.SeedNodes) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Info("Creating initial root account for empty database with no seed nodes")
|
||||
|
||||
// Import required packages for user creation
|
||||
// Note: We need these imports at the top of the file
|
||||
return s.createRootUserAndToken()
|
||||
}
|
||||
|
||||
// createRootUserAndToken creates the root user, admin group, and initial token
|
||||
func (s *Server) createRootUserAndToken() error {
|
||||
rootNickname := "root"
|
||||
adminGroupName := "admin"
|
||||
|
||||
// Generate UUIDs
|
||||
rootUserUUID := "root-" + time.Now().Format("20060102-150405")
|
||||
adminGroupUUID := "admin-" + time.Now().Format("20060102-150405")
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Create admin group
|
||||
adminGroup := types.Group{
|
||||
UUID: adminGroupUUID,
|
||||
NameHash: hashGroupName(adminGroupName),
|
||||
Members: []string{rootUserUUID},
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
// Create root user
|
||||
rootUser := types.User{
|
||||
UUID: rootUserUUID,
|
||||
NicknameHash: hashUserNickname(rootNickname),
|
||||
Groups: []string{adminGroupUUID},
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
// Store group and user in database
|
||||
if err := s.storeUserAndGroup(&rootUser, &adminGroup); err != nil {
|
||||
return fmt.Errorf("failed to store root user and admin group: %v", err)
|
||||
}
|
||||
|
||||
// Create API token with full administrative scopes
|
||||
adminScopes := []string{
|
||||
"admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete",
|
||||
"admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete",
|
||||
"admin:tokens:create", "admin:tokens:revoke",
|
||||
"read", "write", "delete",
|
||||
}
|
||||
|
||||
// Generate token with 24 hour expiration for initial setup
|
||||
tokenString, expiresAt, err := auth.GenerateJWT(rootUserUUID, adminScopes, 24)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate root token: %v", err)
|
||||
}
|
||||
|
||||
// Store token in database
|
||||
if err := s.storeAPIToken(tokenString, rootUserUUID, adminScopes, expiresAt); err != nil {
|
||||
return fmt.Errorf("failed to store root token: %v", err)
|
||||
}
|
||||
|
||||
// Log the token securely (one-time display)
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"user_uuid": rootUserUUID,
|
||||
"group_uuid": adminGroupUUID,
|
||||
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
|
||||
"expires_in": "24 hours",
|
||||
}).Warn("Root account created - SAVE THIS TOKEN:")
|
||||
|
||||
// Display token prominently
|
||||
fmt.Printf("\n" + strings.Repeat("=", 80) + "\n")
|
||||
fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n")
|
||||
fmt.Printf("===========================================\n")
|
||||
fmt.Printf("User UUID: %s\n", rootUserUUID)
|
||||
fmt.Printf("Group UUID: %s\n", adminGroupUUID)
|
||||
fmt.Printf("Token: %s\n", tokenString)
|
||||
fmt.Printf("Expires: %s (24 hours)\n", time.Unix(expiresAt, 0).Format(time.RFC3339))
|
||||
fmt.Printf("\n⚠️ IMPORTANT: Save this token immediately!\n")
|
||||
fmt.Printf(" This is the only time it will be displayed.\n")
|
||||
fmt.Printf(" Use this token to authenticate and create additional users.\n")
|
||||
fmt.Printf(strings.Repeat("=", 80) + "\n\n")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// hashUserNickname creates a hash of the user nickname (similar to handlers.go)
|
||||
func hashUserNickname(nickname string) string {
|
||||
return utils.HashSHA3512(nickname)
|
||||
}
|
||||
|
||||
// hashGroupName creates a hash of the group name (similar to handlers.go)
|
||||
func hashGroupName(groupname string) string {
|
||||
return utils.HashSHA3512(groupname)
|
||||
}
|
||||
|
||||
// storeUserAndGroup stores both user and group in the database
|
||||
func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
||||
return s.db.Update(func(txn *badger.Txn) error {
|
||||
// Store user
|
||||
userData, err := json.Marshal(user)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal user data: %v", err)
|
||||
}
|
||||
|
||||
if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil {
|
||||
return fmt.Errorf("failed to store user: %v", err)
|
||||
}
|
||||
|
||||
// Store group
|
||||
groupData, err := json.Marshal(group)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal group data: %v", err)
|
||||
}
|
||||
|
||||
if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil {
|
||||
return fmt.Errorf("failed to store group: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
@@ -57,4 +57,4 @@ func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, erro
|
||||
return nil, fmt.Errorf("decompressor not initialized")
|
||||
}
|
||||
return c.decompressor.DecodeAll(compressedData, nil)
|
||||
}
|
||||
}
|
||||
@@ -34,10 +34,10 @@ func GetRevisionKey(baseKey string, revision int) string {
|
||||
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
|
||||
// Get existing metadata to check current revisions
|
||||
metadataKey := auth.ResourceMetadataKey(key)
|
||||
|
||||
|
||||
var metadata types.ResourceMetadata
|
||||
var currentRevisions []int
|
||||
|
||||
|
||||
// Try to get existing metadata
|
||||
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
|
||||
if err == badger.ErrKeyNotFound {
|
||||
@@ -60,7 +60,7 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal metadata: %v", err)
|
||||
}
|
||||
|
||||
|
||||
// Extract current revisions (we store them as a custom field)
|
||||
if metadata.TTL == "" {
|
||||
currentRevisions = []int{}
|
||||
@@ -69,13 +69,13 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
||||
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Revision rotation logic: shift existing revisions
|
||||
if len(currentRevisions) >= 3 {
|
||||
// Delete oldest revision (rev:3)
|
||||
oldestRevKey := GetRevisionKey(key, 3)
|
||||
txn.Delete([]byte(oldestRevKey))
|
||||
|
||||
|
||||
// Shift rev:2 → rev:3
|
||||
rev2Key := GetRevisionKey(key, 2)
|
||||
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
|
||||
@@ -83,8 +83,8 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
||||
rev3Key := GetRevisionKey(key, 3)
|
||||
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
|
||||
}
|
||||
|
||||
// Shift rev:1 → rev:2
|
||||
|
||||
// Shift rev:1 → rev:2
|
||||
rev1Key := GetRevisionKey(key, 1)
|
||||
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
|
||||
if err == nil {
|
||||
@@ -92,80 +92,80 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
||||
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Store current value as rev:1
|
||||
currentValueBytes, err := json.Marshal(storedValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal current value for revision: %v", err)
|
||||
}
|
||||
|
||||
|
||||
rev1Key := GetRevisionKey(key, 1)
|
||||
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store revision 1: %v", err)
|
||||
}
|
||||
|
||||
|
||||
// Update metadata with new revision count
|
||||
metadata.UpdatedAt = time.Now().Unix()
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal metadata: %v", err)
|
||||
}
|
||||
|
||||
|
||||
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
|
||||
}
|
||||
|
||||
// GetRevisionHistory retrieves all available revisions for a given key
|
||||
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
|
||||
var revisions []map[string]interface{}
|
||||
|
||||
|
||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||
// Check revisions 1, 2, 3
|
||||
for rev := 1; rev <= 3; rev++ {
|
||||
revKey := GetRevisionKey(key, rev)
|
||||
|
||||
|
||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||
if err == badger.ErrKeyNotFound {
|
||||
continue // Skip missing revisions
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
|
||||
}
|
||||
|
||||
|
||||
var storedValue types.StoredValue
|
||||
err = json.Unmarshal(revData, &storedValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
|
||||
}
|
||||
|
||||
|
||||
var data interface{}
|
||||
err = json.Unmarshal(storedValue.Data, &data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
|
||||
}
|
||||
|
||||
|
||||
revision := map[string]interface{}{
|
||||
"revision": rev,
|
||||
"uuid": storedValue.UUID,
|
||||
"timestamp": storedValue.Timestamp,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
|
||||
revisions = append(revisions, revision)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
// Sort revisions by revision number (newest first)
|
||||
// Note: they're already in order since we iterate 1->3, but reverse for newest first
|
||||
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
|
||||
revisions[i], revisions[j] = revisions[j], revisions[i]
|
||||
}
|
||||
|
||||
|
||||
return revisions, nil
|
||||
}
|
||||
|
||||
@@ -174,23 +174,23 @@ func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.
|
||||
if revision < 1 || revision > 3 {
|
||||
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
|
||||
}
|
||||
|
||||
|
||||
var storedValue types.StoredValue
|
||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||
revKey := GetRevisionKey(key, revision)
|
||||
|
||||
|
||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
return json.Unmarshal(revData, &storedValue)
|
||||
})
|
||||
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
return &storedValue, nil
|
||||
}
|
||||
|
||||
@@ -200,15 +200,15 @@ func GetRevisionFromPath(path string) (string, int, error) {
|
||||
if len(parts) < 4 || parts[len(parts)-2] != "rev" {
|
||||
return "", 0, fmt.Errorf("invalid revision path format")
|
||||
}
|
||||
|
||||
|
||||
revisionStr := parts[len(parts)-1]
|
||||
revision, err := strconv.Atoi(revisionStr)
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
|
||||
}
|
||||
|
||||
|
||||
// Reconstruct the base key without the "/rev/N" suffix
|
||||
baseKey := strings.Join(parts[:len(parts)-2], "/")
|
||||
|
||||
|
||||
return baseKey, revision, nil
|
||||
}
|
||||
}
|
||||
@@ -12,17 +12,17 @@ import (
|
||||
|
||||
// StorageService handles all BadgerDB operations and data management
|
||||
type StorageService struct {
|
||||
db *badger.DB
|
||||
config *types.Config
|
||||
compressionSvc *CompressionService
|
||||
logger *logrus.Logger
|
||||
db *badger.DB
|
||||
config *types.Config
|
||||
compressionSvc *CompressionService
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
// NewStorageService creates a new storage service
|
||||
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
|
||||
var compressionSvc *CompressionService
|
||||
var err error
|
||||
|
||||
|
||||
// Initialize compression if enabled
|
||||
if config.CompressionEnabled {
|
||||
compressionSvc, err = NewCompressionService()
|
||||
@@ -50,7 +50,7 @@ func (s *StorageService) Close() {
|
||||
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
|
||||
var finalData []byte
|
||||
var err error
|
||||
|
||||
|
||||
// Compress data if compression is enabled
|
||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||
finalData, err = s.compressionSvc.CompressData(data)
|
||||
@@ -60,14 +60,14 @@ func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte,
|
||||
} else {
|
||||
finalData = data
|
||||
}
|
||||
|
||||
|
||||
entry := badger.NewEntry(key, finalData)
|
||||
|
||||
|
||||
// Apply TTL if specified
|
||||
if ttl > 0 {
|
||||
entry = entry.WithTTL(ttl)
|
||||
}
|
||||
|
||||
|
||||
return txn.SetEntry(entry)
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
var compressedData []byte
|
||||
err = item.Value(func(val []byte) error {
|
||||
compressedData = append(compressedData, val...)
|
||||
@@ -86,12 +86,12 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
// Decompress data if compression is enabled
|
||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||
return s.compressionSvc.DecompressData(compressedData)
|
||||
}
|
||||
|
||||
|
||||
return compressedData, nil
|
||||
}
|
||||
|
||||
@@ -109,4 +109,4 @@ func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) {
|
||||
return compressedData, nil
|
||||
}
|
||||
return s.compressionSvc.DecompressData(compressedData)
|
||||
}
|
||||
}
|
||||
101
types/types.go
101
types/types.go
@@ -13,20 +13,20 @@ type StoredValue struct {
|
||||
|
||||
// User represents a system user
|
||||
type User struct {
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
||||
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
}
|
||||
|
||||
// Group represents a user group
|
||||
type Group struct {
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
||||
Members []string `json:"members"` // List of user UUIDs in this group
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
||||
Members []string `json:"members"` // List of user UUIDs in this group
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
}
|
||||
|
||||
// APIToken represents a JWT authentication token
|
||||
@@ -40,12 +40,12 @@ type APIToken struct {
|
||||
|
||||
// ResourceMetadata contains ownership and permission information for stored resources
|
||||
type ResourceMetadata struct {
|
||||
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
||||
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
||||
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
||||
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
||||
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
||||
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
||||
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
||||
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
||||
}
|
||||
|
||||
// Permission constants for POSIX-inspired ACL
|
||||
@@ -55,19 +55,19 @@ const (
|
||||
PermOwnerDelete = 1 << 10
|
||||
PermOwnerWrite = 1 << 9
|
||||
PermOwnerRead = 1 << 8
|
||||
|
||||
|
||||
// Group permissions (bits 7-4)
|
||||
PermGroupCreate = 1 << 7
|
||||
PermGroupDelete = 1 << 6
|
||||
PermGroupWrite = 1 << 5
|
||||
PermGroupRead = 1 << 4
|
||||
|
||||
|
||||
// Others permissions (bits 3-0)
|
||||
PermOthersCreate = 1 << 3
|
||||
PermOthersDelete = 1 << 2
|
||||
PermOthersWrite = 1 << 1
|
||||
PermOthersRead = 1 << 0
|
||||
|
||||
|
||||
// Default permissions: Owner(1111), Group(0110), Others(0010)
|
||||
DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) |
|
||||
(PermGroupWrite | PermGroupRead) |
|
||||
@@ -231,57 +231,46 @@ type KVRangeResponse struct {
|
||||
|
||||
// Configuration
|
||||
type Config struct {
|
||||
NodeID string `yaml:"node_id"`
|
||||
BindAddress string `yaml:"bind_address"`
|
||||
Port int `yaml:"port"`
|
||||
DataDir string `yaml:"data_dir"`
|
||||
SeedNodes []string `yaml:"seed_nodes"`
|
||||
ReadOnly bool `yaml:"read_only"`
|
||||
LogLevel string `yaml:"log_level"`
|
||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||
SyncInterval int `yaml:"sync_interval"`
|
||||
CatchupInterval int `yaml:"catchup_interval"`
|
||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||
|
||||
NodeID string `yaml:"node_id"`
|
||||
BindAddress string `yaml:"bind_address"`
|
||||
Port int `yaml:"port"`
|
||||
DataDir string `yaml:"data_dir"`
|
||||
SeedNodes []string `yaml:"seed_nodes"`
|
||||
ReadOnly bool `yaml:"read_only"`
|
||||
LogLevel string `yaml:"log_level"`
|
||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||
SyncInterval int `yaml:"sync_interval"`
|
||||
CatchupInterval int `yaml:"catchup_interval"`
|
||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||
|
||||
// Database compression configuration
|
||||
CompressionEnabled bool `yaml:"compression_enabled"`
|
||||
CompressionLevel int `yaml:"compression_level"`
|
||||
|
||||
|
||||
// TTL configuration
|
||||
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
||||
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
||||
|
||||
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
||||
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
||||
|
||||
// Rate limiting configuration
|
||||
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
||||
RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format)
|
||||
|
||||
|
||||
// Tamper-evident logging configuration
|
||||
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
||||
|
||||
|
||||
// Backup system configuration
|
||||
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
||||
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
||||
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
||||
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
||||
|
||||
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
||||
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
||||
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
||||
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
||||
|
||||
// Feature toggles for optional functionalities
|
||||
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
||||
TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging
|
||||
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
|
||||
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
|
||||
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
||||
|
||||
// Anonymous access control (Issue #5)
|
||||
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
||||
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
||||
|
||||
// Cluster authentication (Issue #13)
|
||||
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
|
||||
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
|
||||
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
|
||||
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
|
||||
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
|
||||
}
|
||||
}
|
||||
@@ -22,4 +22,4 @@ func HashGroupName(groupname string) string {
|
||||
|
||||
func HashToken(token string) string {
|
||||
return HashSHA3512(token)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user