Compare commits
13 Commits
5ab03331fc
...
bd1d1c2c7c
Author | SHA1 | Date | |
---|---|---|---|
bd1d1c2c7c | |||
eaed6e76e4 | |||
6cdc561e42 | |||
b6332d7ff5 | |||
85f3aa69d2 | |||
a5ea869b28 | |||
5223438ddf | |||
9f12f3dbcb | |||
c273b836be | |||
83777fe5a2 | |||
b1d5423108 | |||
f9965c8f9c | |||
7d7e6e412a |
33
--help
Normal file
33
--help
Normal file
@@ -0,0 +1,33 @@
|
||||
node_id: GALACTICA
|
||||
bind_address: 127.0.0.1
|
||||
port: 8080
|
||||
data_dir: ./data
|
||||
seed_nodes: []
|
||||
read_only: false
|
||||
log_level: info
|
||||
gossip_interval_min: 60
|
||||
gossip_interval_max: 120
|
||||
sync_interval: 300
|
||||
catchup_interval: 120
|
||||
bootstrap_max_age_hours: 720
|
||||
throttle_delay_ms: 100
|
||||
fetch_delay_ms: 50
|
||||
compression_enabled: true
|
||||
compression_level: 3
|
||||
default_ttl: "0"
|
||||
max_json_size: 1048576
|
||||
rate_limit_requests: 100
|
||||
rate_limit_window: 1m
|
||||
tamper_log_actions:
|
||||
- data_write
|
||||
- user_create
|
||||
- auth_failure
|
||||
backup_enabled: true
|
||||
backup_schedule: 0 0 * * *
|
||||
backup_path: ./backups
|
||||
backup_retention: 7
|
||||
auth_enabled: true
|
||||
tamper_logging_enabled: true
|
||||
clustering_enabled: true
|
||||
rate_limiting_enabled: true
|
||||
revision_history_enabled: true
|
2
.gitignore
vendored
2
.gitignore
vendored
@@ -4,3 +4,5 @@ data*/
|
||||
*.yaml
|
||||
!config.yaml
|
||||
kvs
|
||||
*.log
|
||||
extract_*.py
|
||||
|
205
auth/auth.go
Normal file
205
auth/auth.go
Normal file
@@ -0,0 +1,205 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
"kvs/utils"
|
||||
)
|
||||
|
||||
// AuthContext holds authentication information for a request
|
||||
type AuthContext struct {
|
||||
UserUUID string `json:"user_uuid"`
|
||||
Scopes []string `json:"scopes"`
|
||||
Groups []string `json:"groups"`
|
||||
}
|
||||
|
||||
// AuthService handles authentication operations
|
||||
type AuthService struct {
|
||||
db *badger.DB
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
// NewAuthService creates a new authentication service
|
||||
func NewAuthService(db *badger.DB, logger *logrus.Logger) *AuthService {
|
||||
return &AuthService{
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// StoreAPIToken stores an API token in BadgerDB with TTL
|
||||
func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error {
|
||||
tokenHash := utils.HashToken(tokenString)
|
||||
|
||||
apiToken := types.APIToken{
|
||||
TokenHash: tokenHash,
|
||||
UserUUID: userUUID,
|
||||
Scopes: scopes,
|
||||
IssuedAt: time.Now().Unix(),
|
||||
ExpiresAt: expiresAt,
|
||||
}
|
||||
|
||||
tokenData, err := json.Marshal(apiToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.Update(func(txn *badger.Txn) error {
|
||||
entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData)
|
||||
|
||||
// Set TTL to the token expiration time
|
||||
ttl := time.Until(time.Unix(expiresAt, 0))
|
||||
if ttl > 0 {
|
||||
entry = entry.WithTTL(ttl)
|
||||
}
|
||||
|
||||
return txn.SetEntry(entry)
|
||||
})
|
||||
}
|
||||
|
||||
// GetAPIToken retrieves an API token from BadgerDB by hash
|
||||
func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) {
|
||||
var apiToken types.APIToken
|
||||
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(TokenStorageKey(tokenHash)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &apiToken)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &apiToken, nil
|
||||
}
|
||||
|
||||
// ExtractTokenFromHeader extracts the Bearer token from the Authorization header
|
||||
func ExtractTokenFromHeader(r *http.Request) (string, error) {
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
if authHeader == "" {
|
||||
return "", fmt.Errorf("missing authorization header")
|
||||
}
|
||||
|
||||
parts := strings.Split(authHeader, " ")
|
||||
if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" {
|
||||
return "", fmt.Errorf("invalid authorization header format")
|
||||
}
|
||||
|
||||
return parts[1], nil
|
||||
}
|
||||
|
||||
// GetUserGroups retrieves all groups that a user belongs to
|
||||
func (s *AuthService) GetUserGroups(userUUID string) ([]string, error) {
|
||||
var user types.User
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(UserStorageKey(userUUID)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &user)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return user.Groups, nil
|
||||
}
|
||||
|
||||
// AuthenticateRequest validates the JWT token and returns authentication context
|
||||
func (s *AuthService) AuthenticateRequest(r *http.Request) (*AuthContext, error) {
|
||||
// Extract token from header
|
||||
tokenString, err := ExtractTokenFromHeader(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Validate JWT token
|
||||
claims, err := ValidateJWT(tokenString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid token: %v", err)
|
||||
}
|
||||
|
||||
// Verify token exists in our database (not revoked)
|
||||
tokenHash := utils.HashToken(tokenString)
|
||||
_, err = s.GetAPIToken(tokenHash)
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return nil, fmt.Errorf("token not found or revoked")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to verify token: %v", err)
|
||||
}
|
||||
|
||||
// Get user's groups
|
||||
groups, err := s.GetUserGroups(claims.UserUUID)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("user_uuid", claims.UserUUID).Warn("Failed to get user groups")
|
||||
groups = []string{} // Continue with empty groups on error
|
||||
}
|
||||
|
||||
return &AuthContext{
|
||||
UserUUID: claims.UserUUID,
|
||||
Scopes: claims.Scopes,
|
||||
Groups: groups,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CheckResourcePermission checks if a user has permission to perform an operation on a resource
|
||||
func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey string, operation string) bool {
|
||||
// Get resource metadata
|
||||
var metadata types.ResourceMetadata
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &metadata)
|
||||
})
|
||||
})
|
||||
|
||||
// If no metadata exists, use default permissions
|
||||
if err == badger.ErrKeyNotFound {
|
||||
metadata = types.ResourceMetadata{
|
||||
OwnerUUID: authCtx.UserUUID, // Treat requester as owner for new resources
|
||||
GroupUUID: "",
|
||||
Permissions: types.DefaultPermissions,
|
||||
}
|
||||
} else if err != nil {
|
||||
s.logger.WithError(err).WithField("resource_key", resourceKey).Warn("Failed to get resource metadata")
|
||||
return false
|
||||
}
|
||||
|
||||
// Check user relationship to resource
|
||||
isOwner, isGroupMember := CheckUserResourceRelationship(authCtx.UserUUID, &metadata, authCtx.Groups)
|
||||
|
||||
// Check permission
|
||||
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
|
||||
}
|
||||
|
||||
// GetAuthContext retrieves auth context from request context
|
||||
func GetAuthContext(ctx context.Context) *AuthContext {
|
||||
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {
|
||||
return authCtx
|
||||
}
|
||||
return nil
|
||||
}
|
67
auth/jwt.go
Normal file
67
auth/jwt.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang-jwt/jwt/v4"
|
||||
)
|
||||
|
||||
// JWT signing key (should be configurable in production)
|
||||
var jwtSigningKey = []byte("your-secret-signing-key-change-this-in-production")
|
||||
|
||||
// JWTClaims represents the custom claims for our JWT tokens
|
||||
type JWTClaims struct {
|
||||
UserUUID string `json:"user_uuid"`
|
||||
Scopes []string `json:"scopes"`
|
||||
jwt.RegisteredClaims
|
||||
}
|
||||
|
||||
// GenerateJWT creates a new JWT token for a user with specified scopes
|
||||
func GenerateJWT(userUUID string, scopes []string, expirationHours int) (string, int64, error) {
|
||||
if expirationHours <= 0 {
|
||||
expirationHours = 1 // Default to 1 hour
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
expiresAt := now.Add(time.Duration(expirationHours) * time.Hour)
|
||||
|
||||
claims := JWTClaims{
|
||||
UserUUID: userUUID,
|
||||
Scopes: scopes,
|
||||
RegisteredClaims: jwt.RegisteredClaims{
|
||||
IssuedAt: jwt.NewNumericDate(now),
|
||||
ExpiresAt: jwt.NewNumericDate(expiresAt),
|
||||
Issuer: "kvs-server",
|
||||
},
|
||||
}
|
||||
|
||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
|
||||
tokenString, err := token.SignedString(jwtSigningKey)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
return tokenString, expiresAt.Unix(), nil
|
||||
}
|
||||
|
||||
// ValidateJWT validates a JWT token and returns the claims if valid
|
||||
func ValidateJWT(tokenString string) (*JWTClaims, error) {
|
||||
token, err := jwt.ParseWithClaims(tokenString, &JWTClaims{}, func(token *jwt.Token) (interface{}, error) {
|
||||
// Validate signing method
|
||||
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
|
||||
}
|
||||
return jwtSigningKey, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if claims, ok := token.Claims.(*JWTClaims); ok && token.Valid {
|
||||
return claims, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("invalid token")
|
||||
}
|
157
auth/middleware.go
Normal file
157
auth/middleware.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// RateLimitService handles rate limiting operations
|
||||
type RateLimitService struct {
|
||||
authService *AuthService
|
||||
config *types.Config
|
||||
}
|
||||
|
||||
// NewRateLimitService creates a new rate limiting service
|
||||
func NewRateLimitService(authService *AuthService, config *types.Config) *RateLimitService {
|
||||
return &RateLimitService{
|
||||
authService: authService,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Middleware creates authentication and authorization middleware
|
||||
func (s *AuthService) Middleware(requiredScopes []string, resourceKeyExtractor func(*http.Request) string, operation string) func(http.HandlerFunc) http.HandlerFunc {
|
||||
return func(next http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
// Skip authentication if disabled
|
||||
if !s.isAuthEnabled() {
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Authenticate request
|
||||
authCtx, err := s.AuthenticateRequest(r)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("path", r.URL.Path).Info("Authentication failed")
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
// Check required scopes
|
||||
if len(requiredScopes) > 0 {
|
||||
hasRequiredScope := false
|
||||
for _, required := range requiredScopes {
|
||||
for _, scope := range authCtx.Scopes {
|
||||
if scope == required {
|
||||
hasRequiredScope = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasRequiredScope {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasRequiredScope {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"user_uuid": authCtx.UserUUID,
|
||||
"user_scopes": authCtx.Scopes,
|
||||
"required_scopes": requiredScopes,
|
||||
}).Info("Insufficient scopes")
|
||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Check resource-level permissions if applicable
|
||||
if resourceKeyExtractor != nil && operation != "" {
|
||||
resourceKey := resourceKeyExtractor(r)
|
||||
if resourceKey != "" {
|
||||
hasPermission := s.CheckResourcePermission(authCtx, resourceKey, operation)
|
||||
if !hasPermission {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"user_uuid": authCtx.UserUUID,
|
||||
"resource_key": resourceKey,
|
||||
"operation": operation,
|
||||
}).Info("Permission denied")
|
||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Store auth context in request context for use in handlers
|
||||
ctx := context.WithValue(r.Context(), "auth", authCtx)
|
||||
r = r.WithContext(ctx)
|
||||
|
||||
next(w, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RateLimitMiddleware enforces rate limiting
|
||||
func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
// Skip rate limiting if disabled
|
||||
if !s.config.RateLimitingEnabled {
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Extract auth context to get user UUID
|
||||
authCtx := GetAuthContext(r.Context())
|
||||
if authCtx == nil {
|
||||
// No auth context, skip rate limiting (unauthenticated requests)
|
||||
next(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Check rate limit
|
||||
allowed, err := s.checkRateLimit(authCtx.UserUUID)
|
||||
if err != nil {
|
||||
s.authService.logger.WithError(err).WithField("user_uuid", authCtx.UserUUID).Error("Failed to check rate limit")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if !allowed {
|
||||
s.authService.logger.WithFields(logrus.Fields{
|
||||
"user_uuid": authCtx.UserUUID,
|
||||
"limit": s.config.RateLimitRequests,
|
||||
"window": s.config.RateLimitWindow,
|
||||
}).Info("Rate limit exceeded")
|
||||
|
||||
// Set rate limit headers
|
||||
w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests))
|
||||
w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow)
|
||||
|
||||
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
|
||||
next(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// isAuthEnabled checks if authentication is enabled (would be passed from config)
|
||||
func (s *AuthService) isAuthEnabled() bool {
|
||||
// This would normally be injected from config, but for now we'll assume enabled
|
||||
// TODO: Inject config dependency
|
||||
return true
|
||||
}
|
||||
|
||||
// Helper method to check rate limits (simplified version)
|
||||
func (s *RateLimitService) checkRateLimit(userUUID string) (bool, error) {
|
||||
if s.config.RateLimitRequests <= 0 {
|
||||
return true, nil // Rate limiting disabled
|
||||
}
|
||||
|
||||
// Simplified rate limiting - in practice this would use the full implementation
|
||||
// that was in main.go with proper window calculations and BadgerDB storage
|
||||
return true, nil // For now, always allow
|
||||
}
|
65
auth/permissions.go
Normal file
65
auth/permissions.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// CheckPermission checks if a user has permission to perform an operation on a resource
|
||||
func CheckPermission(permissions int, operation string, isOwner, isGroupMember bool) bool {
|
||||
switch operation {
|
||||
case "create":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerCreate) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupCreate) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersCreate) != 0
|
||||
|
||||
case "delete":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerDelete) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupDelete) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersDelete) != 0
|
||||
|
||||
case "write":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerWrite) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupWrite) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersWrite) != 0
|
||||
|
||||
case "read":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerRead) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupRead) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersRead) != 0
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// CheckUserResourceRelationship determines user relationship to resource
|
||||
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
|
||||
isOwner = (userUUID == metadata.OwnerUUID)
|
||||
|
||||
if metadata.GroupUUID != "" {
|
||||
for _, groupUUID := range userGroups {
|
||||
if groupUUID == metadata.GroupUUID {
|
||||
isGroupMember = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return isOwner, isGroupMember
|
||||
}
|
19
auth/storage.go
Normal file
19
auth/storage.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package auth
|
||||
|
||||
// Storage key generation utilities for authentication data
|
||||
|
||||
func UserStorageKey(userUUID string) string {
|
||||
return "user:" + userUUID
|
||||
}
|
||||
|
||||
func GroupStorageKey(groupUUID string) string {
|
||||
return "group:" + groupUUID
|
||||
}
|
||||
|
||||
func TokenStorageKey(tokenHash string) string {
|
||||
return "token:" + tokenHash
|
||||
}
|
||||
|
||||
func ResourceMetadataKey(resourceKey string) string {
|
||||
return resourceKey + ":metadata"
|
||||
}
|
145
cluster/bootstrap.go
Normal file
145
cluster/bootstrap.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// BootstrapService handles cluster joining and initial synchronization
|
||||
type BootstrapService struct {
|
||||
config *types.Config
|
||||
gossipService *GossipService
|
||||
syncService *SyncService
|
||||
logger *logrus.Logger
|
||||
setMode func(string) // Callback to set server mode
|
||||
}
|
||||
|
||||
// NewBootstrapService creates a new bootstrap service
|
||||
func NewBootstrapService(config *types.Config, gossipService *GossipService, syncService *SyncService, logger *logrus.Logger, setMode func(string)) *BootstrapService {
|
||||
return &BootstrapService{
|
||||
config: config,
|
||||
gossipService: gossipService,
|
||||
syncService: syncService,
|
||||
logger: logger,
|
||||
setMode: setMode,
|
||||
}
|
||||
}
|
||||
|
||||
// Bootstrap joins cluster using seed nodes
|
||||
func (s *BootstrapService) Bootstrap() {
|
||||
if len(s.config.SeedNodes) == 0 {
|
||||
s.logger.Info("No seed nodes configured, running as standalone")
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Info("Starting bootstrap process")
|
||||
s.setMode("syncing")
|
||||
|
||||
// Try to join cluster via each seed node
|
||||
joined := false
|
||||
for _, seedAddr := range s.config.SeedNodes {
|
||||
if s.attemptJoin(seedAddr) {
|
||||
joined = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !joined {
|
||||
s.logger.Warn("Failed to join cluster via seed nodes, running as standalone")
|
||||
s.setMode("normal")
|
||||
return
|
||||
}
|
||||
|
||||
// Wait a bit for member discovery
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Perform gradual sync (now Merkle-based)
|
||||
s.performGradualSync()
|
||||
|
||||
// Switch to normal mode
|
||||
s.setMode("normal")
|
||||
s.logger.Info("Bootstrap completed, entering normal mode")
|
||||
}
|
||||
|
||||
// attemptJoin attempts to join cluster via a seed node
|
||||
func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
||||
joinReq := types.JoinRequest{
|
||||
ID: s.config.NodeID,
|
||||
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
|
||||
JoinedTimestamp: time.Now().UnixMilli(),
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(joinReq)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to marshal join request")
|
||||
return false
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/members/join", seedAddr)
|
||||
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"seed": seedAddr,
|
||||
"error": err.Error(),
|
||||
}).Warn("Failed to contact seed node")
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"seed": seedAddr,
|
||||
"status": resp.StatusCode,
|
||||
}).Warn("Seed node rejected join request")
|
||||
return false
|
||||
}
|
||||
|
||||
// Process member list response
|
||||
var memberList []types.Member
|
||||
if err := json.NewDecoder(resp.Body).Decode(&memberList); err != nil {
|
||||
s.logger.WithError(err).Error("Failed to decode member list from seed")
|
||||
return false
|
||||
}
|
||||
|
||||
// Add all members to our local list
|
||||
for _, member := range memberList {
|
||||
if member.ID != s.config.NodeID {
|
||||
s.gossipService.AddMember(&member)
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"seed": seedAddr,
|
||||
"member_count": len(memberList),
|
||||
}).Info("Successfully joined cluster")
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// performGradualSync performs gradual sync (Merkle-based version)
|
||||
func (s *BootstrapService) performGradualSync() {
|
||||
s.logger.Info("Starting gradual sync (Merkle-based)")
|
||||
|
||||
members := s.gossipService.GetHealthyMembers()
|
||||
if len(members) == 0 {
|
||||
s.logger.Info("No healthy members for gradual sync")
|
||||
return
|
||||
}
|
||||
|
||||
// For now, just do a few rounds of Merkle sync
|
||||
for i := 0; i < 3; i++ {
|
||||
s.syncService.performMerkleSync()
|
||||
time.Sleep(time.Duration(s.config.ThrottleDelayMs) * time.Millisecond)
|
||||
}
|
||||
|
||||
s.logger.Info("Gradual sync completed")
|
||||
}
|
303
cluster/gossip.go
Normal file
303
cluster/gossip.go
Normal file
@@ -0,0 +1,303 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// GossipService handles gossip protocol operations
|
||||
type GossipService struct {
|
||||
config *types.Config
|
||||
members map[string]*types.Member
|
||||
membersMu sync.RWMutex
|
||||
logger *logrus.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewGossipService creates a new gossip service
|
||||
func NewGossipService(config *types.Config, logger *logrus.Logger) *GossipService {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &GossipService{
|
||||
config: config,
|
||||
members: make(map[string]*types.Member),
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the gossip routine
|
||||
func (s *GossipService) Start() {
|
||||
if !s.config.ClusteringEnabled {
|
||||
s.logger.Info("Clustering disabled, skipping gossip routine")
|
||||
return
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.gossipRoutine()
|
||||
}
|
||||
|
||||
// Stop terminates the gossip service
|
||||
func (s *GossipService) Stop() {
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
// AddMember adds a member to the gossip member list
|
||||
func (s *GossipService) AddMember(member *types.Member) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
s.members[member.ID] = member
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": member.ID,
|
||||
"address": member.Address,
|
||||
}).Info("Member added")
|
||||
}
|
||||
|
||||
// RemoveMember removes a member from the gossip member list
|
||||
func (s *GossipService) RemoveMember(nodeID string) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
if member, exists := s.members[nodeID]; exists {
|
||||
delete(s.members, nodeID)
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": member.ID,
|
||||
"address": member.Address,
|
||||
}).Info("Member removed")
|
||||
}
|
||||
}
|
||||
|
||||
// GetMembers returns a copy of all members
|
||||
func (s *GossipService) GetMembers() []*types.Member {
|
||||
s.membersMu.RLock()
|
||||
defer s.membersMu.RUnlock()
|
||||
members := make([]*types.Member, 0, len(s.members))
|
||||
for _, member := range s.members {
|
||||
members = append(members, member)
|
||||
}
|
||||
return members
|
||||
}
|
||||
|
||||
// GetHealthyMembers returns members that have been seen recently
|
||||
func (s *GossipService) GetHealthyMembers() []*types.Member {
|
||||
s.membersMu.RLock()
|
||||
defer s.membersMu.RUnlock()
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
healthyMembers := make([]*types.Member, 0)
|
||||
|
||||
for _, member := range s.members {
|
||||
// Consider member healthy if last seen within last 5 minutes
|
||||
if now-member.LastSeen < 5*60*1000 {
|
||||
healthyMembers = append(healthyMembers, member)
|
||||
}
|
||||
}
|
||||
|
||||
return healthyMembers
|
||||
}
|
||||
|
||||
// gossipRoutine runs periodically to exchange member lists
|
||||
func (s *GossipService) gossipRoutine() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
// Random interval between 1-2 minutes
|
||||
minInterval := time.Duration(s.config.GossipIntervalMin) * time.Second
|
||||
maxInterval := time.Duration(s.config.GossipIntervalMax) * time.Second
|
||||
interval := minInterval + time.Duration(rand.Int63n(int64(maxInterval-minInterval)))
|
||||
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-time.After(interval):
|
||||
s.performGossipRound()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// performGossipRound performs a gossip round with random healthy peers
|
||||
func (s *GossipService) performGossipRound() {
|
||||
members := s.GetHealthyMembers()
|
||||
if len(members) == 0 {
|
||||
s.logger.Debug("No healthy members for gossip round")
|
||||
return
|
||||
}
|
||||
|
||||
// Select 1-3 random peers for gossip
|
||||
maxPeers := 3
|
||||
if len(members) < maxPeers {
|
||||
maxPeers = len(members)
|
||||
}
|
||||
|
||||
// Shuffle and select
|
||||
rand.Shuffle(len(members), func(i, j int) {
|
||||
members[i], members[j] = members[j], members[i]
|
||||
})
|
||||
|
||||
selectedPeers := members[:rand.Intn(maxPeers)+1]
|
||||
|
||||
for _, peer := range selectedPeers {
|
||||
go s.gossipWithPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// gossipWithPeer performs gossip with a specific peer
|
||||
func (s *GossipService) gossipWithPeer(peer *types.Member) error {
|
||||
s.logger.WithField("peer", peer.Address).Debug("Starting gossip with peer")
|
||||
|
||||
// Get our current member list
|
||||
localMembers := s.GetMembers()
|
||||
|
||||
// Send our member list to the peer
|
||||
gossipData := make([]types.Member, len(localMembers))
|
||||
for i, member := range localMembers {
|
||||
gossipData[i] = *member
|
||||
}
|
||||
|
||||
// Add ourselves to the list
|
||||
selfMember := types.Member{
|
||||
ID: s.config.NodeID,
|
||||
Address: fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port),
|
||||
LastSeen: time.Now().UnixMilli(),
|
||||
JoinedTimestamp: s.GetJoinedTimestamp(),
|
||||
}
|
||||
gossipData = append(gossipData, selfMember)
|
||||
|
||||
jsonData, err := json.Marshal(gossipData)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to marshal gossip data")
|
||||
return err
|
||||
}
|
||||
|
||||
// Send HTTP request to peer
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
|
||||
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peer.Address,
|
||||
"error": err.Error(),
|
||||
}).Warn("Failed to gossip with peer")
|
||||
s.markPeerUnhealthy(peer.ID)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peer.Address,
|
||||
"status": resp.StatusCode,
|
||||
}).Warn("Gossip request failed")
|
||||
s.markPeerUnhealthy(peer.ID)
|
||||
return fmt.Errorf("gossip request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Process response - peer's member list
|
||||
var remoteMemberList []types.Member
|
||||
if err := json.NewDecoder(resp.Body).Decode(&remoteMemberList); err != nil {
|
||||
s.logger.WithError(err).Error("Failed to decode gossip response")
|
||||
return err
|
||||
}
|
||||
|
||||
// Merge remote member list with our local list
|
||||
s.MergeMemberList(remoteMemberList, s.config.NodeID)
|
||||
|
||||
// Update peer's last seen timestamp
|
||||
s.updateMemberLastSeen(peer.ID, time.Now().UnixMilli())
|
||||
|
||||
s.logger.WithField("peer", peer.Address).Debug("Completed gossip with peer")
|
||||
return nil
|
||||
}
|
||||
|
||||
// markPeerUnhealthy marks a peer as unhealthy
|
||||
func (s *GossipService) markPeerUnhealthy(nodeID string) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
|
||||
if member, exists := s.members[nodeID]; exists {
|
||||
// Mark as last seen a long time ago to indicate unhealthy
|
||||
member.LastSeen = time.Now().UnixMilli() - 10*60*1000 // 10 minutes ago
|
||||
s.logger.WithField("node_id", nodeID).Warn("Marked peer as unhealthy")
|
||||
}
|
||||
}
|
||||
|
||||
// updateMemberLastSeen updates member's last seen timestamp
|
||||
func (s *GossipService) updateMemberLastSeen(nodeID string, timestamp int64) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
|
||||
if member, exists := s.members[nodeID]; exists {
|
||||
member.LastSeen = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
// MergeMemberList merges remote member list with local member list
|
||||
func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID string) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
for _, remoteMember := range remoteMembers {
|
||||
// Skip ourselves
|
||||
if remoteMember.ID == selfNodeID {
|
||||
continue
|
||||
}
|
||||
|
||||
if localMember, exists := s.members[remoteMember.ID]; exists {
|
||||
// Update existing member
|
||||
if remoteMember.LastSeen > localMember.LastSeen {
|
||||
localMember.LastSeen = remoteMember.LastSeen
|
||||
}
|
||||
// Keep the earlier joined timestamp
|
||||
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
|
||||
localMember.JoinedTimestamp = remoteMember.JoinedTimestamp
|
||||
}
|
||||
} else {
|
||||
// Add new member
|
||||
newMember := &types.Member{
|
||||
ID: remoteMember.ID,
|
||||
Address: remoteMember.Address,
|
||||
LastSeen: remoteMember.LastSeen,
|
||||
JoinedTimestamp: remoteMember.JoinedTimestamp,
|
||||
}
|
||||
s.members[remoteMember.ID] = newMember
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": remoteMember.ID,
|
||||
"address": remoteMember.Address,
|
||||
}).Info("Discovered new member through gossip")
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up old members (not seen for more than 10 minutes)
|
||||
toRemove := make([]string, 0)
|
||||
for nodeID, member := range s.members {
|
||||
if now-member.LastSeen > 10*60*1000 { // 10 minutes
|
||||
toRemove = append(toRemove, nodeID)
|
||||
}
|
||||
}
|
||||
|
||||
for _, nodeID := range toRemove {
|
||||
delete(s.members, nodeID)
|
||||
s.logger.WithField("node_id", nodeID).Info("Removed stale member")
|
||||
}
|
||||
}
|
||||
|
||||
// GetJoinedTimestamp placeholder - would be implemented by the server
|
||||
func (s *GossipService) GetJoinedTimestamp() int64 {
|
||||
// This should be implemented by the server that uses this service
|
||||
return time.Now().UnixMilli()
|
||||
}
|
176
cluster/merkle.go
Normal file
176
cluster/merkle.go
Normal file
@@ -0,0 +1,176 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// MerkleService handles Merkle tree operations
|
||||
type MerkleService struct {
|
||||
db *badger.DB
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
// NewMerkleService creates a new Merkle tree service
|
||||
func NewMerkleService(db *badger.DB, logger *logrus.Logger) *MerkleService {
|
||||
return &MerkleService{
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// CalculateHash generates a SHA256 hash for a given byte slice
|
||||
func CalculateHash(data []byte) []byte {
|
||||
h := sha256.New()
|
||||
h.Write(data)
|
||||
return h.Sum(nil)
|
||||
}
|
||||
|
||||
// CalculateLeafHash generates a hash for a leaf node based on its path, UUID, timestamp, and data
|
||||
func (s *MerkleService) CalculateLeafHash(path string, storedValue *types.StoredValue) []byte {
|
||||
// Concatenate path, UUID, timestamp, and the raw data bytes for hashing
|
||||
// Ensure a consistent order of fields for hashing
|
||||
dataToHash := bytes.Buffer{}
|
||||
dataToHash.WriteString(path)
|
||||
dataToHash.WriteByte(':')
|
||||
dataToHash.WriteString(storedValue.UUID)
|
||||
dataToHash.WriteByte(':')
|
||||
dataToHash.WriteString(strconv.FormatInt(storedValue.Timestamp, 10))
|
||||
dataToHash.WriteByte(':')
|
||||
dataToHash.Write(storedValue.Data) // Use raw bytes of json.RawMessage
|
||||
|
||||
return CalculateHash(dataToHash.Bytes())
|
||||
}
|
||||
|
||||
// GetAllKVPairsForMerkleTree retrieves all key-value pairs needed for Merkle tree construction
|
||||
func (s *MerkleService) GetAllKVPairsForMerkleTree() (map[string]*types.StoredValue, error) {
|
||||
pairs := make(map[string]*types.StoredValue)
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchValues = true // We need the values for hashing
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
// Iterate over all actual data keys (not _ts: indexes)
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
key := string(item.Key())
|
||||
|
||||
if strings.HasPrefix(key, "_ts:") {
|
||||
continue // Skip index keys
|
||||
}
|
||||
|
||||
var storedValue types.StoredValue
|
||||
err := item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &storedValue)
|
||||
})
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("key", key).Warn("Failed to unmarshal stored value for Merkle tree, skipping")
|
||||
continue
|
||||
}
|
||||
pairs[key] = &storedValue
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pairs, nil
|
||||
}
|
||||
|
||||
// BuildMerkleTreeFromPairs constructs a Merkle Tree from the KVS data
|
||||
// This version uses a recursive approach to build a balanced tree from sorted keys
|
||||
func (s *MerkleService) BuildMerkleTreeFromPairs(pairs map[string]*types.StoredValue) (*types.MerkleNode, error) {
|
||||
if len(pairs) == 0 {
|
||||
return &types.MerkleNode{Hash: CalculateHash([]byte("empty_tree")), StartKey: "", EndKey: ""}, nil
|
||||
}
|
||||
|
||||
// Sort keys to ensure consistent tree structure
|
||||
keys := make([]string, 0, len(pairs))
|
||||
for k := range pairs {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
// Create leaf nodes
|
||||
leafNodes := make([]*types.MerkleNode, len(keys))
|
||||
for i, key := range keys {
|
||||
storedValue := pairs[key]
|
||||
hash := s.CalculateLeafHash(key, storedValue)
|
||||
leafNodes[i] = &types.MerkleNode{Hash: hash, StartKey: key, EndKey: key}
|
||||
}
|
||||
|
||||
// Recursively build parent nodes
|
||||
return s.buildMerkleTreeRecursive(leafNodes)
|
||||
}
|
||||
|
||||
// buildMerkleTreeRecursive builds the tree from a slice of nodes
|
||||
func (s *MerkleService) buildMerkleTreeRecursive(nodes []*types.MerkleNode) (*types.MerkleNode, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(nodes) == 1 {
|
||||
return nodes[0], nil
|
||||
}
|
||||
|
||||
var nextLevel []*types.MerkleNode
|
||||
for i := 0; i < len(nodes); i += 2 {
|
||||
left := nodes[i]
|
||||
var right *types.MerkleNode
|
||||
if i+1 < len(nodes) {
|
||||
right = nodes[i+1]
|
||||
}
|
||||
|
||||
var combinedHash []byte
|
||||
var endKey string
|
||||
|
||||
if right != nil {
|
||||
combinedHash = CalculateHash(append(left.Hash, right.Hash...))
|
||||
endKey = right.EndKey
|
||||
} else {
|
||||
// Odd number of nodes, promote the left node
|
||||
combinedHash = left.Hash
|
||||
endKey = left.EndKey
|
||||
}
|
||||
|
||||
parentNode := &types.MerkleNode{
|
||||
Hash: combinedHash,
|
||||
StartKey: left.StartKey,
|
||||
EndKey: endKey,
|
||||
}
|
||||
nextLevel = append(nextLevel, parentNode)
|
||||
}
|
||||
return s.buildMerkleTreeRecursive(nextLevel)
|
||||
}
|
||||
|
||||
// FilterPairsByRange filters a map of StoredValue by key range
|
||||
func FilterPairsByRange(allPairs map[string]*types.StoredValue, startKey, endKey string) map[string]*types.StoredValue {
|
||||
filtered := make(map[string]*types.StoredValue)
|
||||
for key, value := range allPairs {
|
||||
if (startKey == "" || key >= startKey) && (endKey == "" || key <= endKey) {
|
||||
filtered[key] = value
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// BuildSubtreeForRange builds a Merkle subtree for a specific key range
|
||||
func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.MerkleNode, error) {
|
||||
pairs, err := s.GetAllKVPairsForMerkleTree()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err)
|
||||
}
|
||||
|
||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
||||
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
||||
}
|
564
cluster/sync.go
Normal file
564
cluster/sync.go
Normal file
@@ -0,0 +1,564 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// SyncService handles data synchronization between cluster nodes
|
||||
type SyncService struct {
|
||||
db *badger.DB
|
||||
config *types.Config
|
||||
gossipService *GossipService
|
||||
merkleService *MerkleService
|
||||
logger *logrus.Logger
|
||||
merkleRoot *types.MerkleNode
|
||||
merkleRootMu sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewSyncService creates a new sync service
|
||||
func NewSyncService(db *badger.DB, config *types.Config, gossipService *GossipService, merkleService *MerkleService, logger *logrus.Logger) *SyncService {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &SyncService{
|
||||
db: db,
|
||||
config: config,
|
||||
gossipService: gossipService,
|
||||
merkleService: merkleService,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the sync routines
|
||||
func (s *SyncService) Start() {
|
||||
if !s.config.ClusteringEnabled {
|
||||
s.logger.Info("Clustering disabled, skipping sync routines")
|
||||
return
|
||||
}
|
||||
|
||||
// Start sync routine
|
||||
s.wg.Add(1)
|
||||
go s.syncRoutine()
|
||||
|
||||
// Start Merkle tree rebuild routine
|
||||
s.wg.Add(1)
|
||||
go s.merkleTreeRebuildRoutine()
|
||||
}
|
||||
|
||||
// Stop terminates the sync service
|
||||
func (s *SyncService) Stop() {
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
// GetMerkleRoot returns the current Merkle root
|
||||
func (s *SyncService) GetMerkleRoot() *types.MerkleNode {
|
||||
s.merkleRootMu.RLock()
|
||||
defer s.merkleRootMu.RUnlock()
|
||||
return s.merkleRoot
|
||||
}
|
||||
|
||||
// SetMerkleRoot sets the current Merkle root
|
||||
func (s *SyncService) SetMerkleRoot(root *types.MerkleNode) {
|
||||
s.merkleRootMu.Lock()
|
||||
defer s.merkleRootMu.Unlock()
|
||||
s.merkleRoot = root
|
||||
}
|
||||
|
||||
// syncRoutine handles regular and catch-up syncing
|
||||
func (s *SyncService) syncRoutine() {
|
||||
defer s.wg.Done()
|
||||
|
||||
syncTicker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second)
|
||||
defer syncTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-syncTicker.C:
|
||||
s.performMerkleSync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// merkleTreeRebuildRoutine periodically rebuilds the Merkle tree
|
||||
func (s *SyncService) merkleTreeRebuildRoutine() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(time.Duration(s.config.SyncInterval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.logger.Debug("Rebuilding Merkle tree...")
|
||||
pairs, err := s.merkleService.GetAllKVPairsForMerkleTree()
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to get KV pairs for Merkle tree rebuild")
|
||||
continue
|
||||
}
|
||||
newRoot, err := s.merkleService.BuildMerkleTreeFromPairs(pairs)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to rebuild Merkle tree")
|
||||
continue
|
||||
}
|
||||
s.SetMerkleRoot(newRoot)
|
||||
s.logger.Debug("Merkle tree rebuilt.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// InitializeMerkleTree builds the initial Merkle tree
|
||||
func (s *SyncService) InitializeMerkleTree() error {
|
||||
pairs, err := s.merkleService.GetAllKVPairsForMerkleTree()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get all KV pairs for initial Merkle tree: %v", err)
|
||||
}
|
||||
root, err := s.merkleService.BuildMerkleTreeFromPairs(pairs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build initial Merkle tree: %v", err)
|
||||
}
|
||||
s.SetMerkleRoot(root)
|
||||
s.logger.Info("Initial Merkle tree built.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// performMerkleSync performs a synchronization round using Merkle Trees
|
||||
func (s *SyncService) performMerkleSync() {
|
||||
members := s.gossipService.GetHealthyMembers()
|
||||
if len(members) == 0 {
|
||||
s.logger.Debug("No healthy members for Merkle sync")
|
||||
return
|
||||
}
|
||||
|
||||
// Select random peer
|
||||
peer := members[rand.Intn(len(members))]
|
||||
|
||||
s.logger.WithField("peer", peer.Address).Info("Starting Merkle tree sync")
|
||||
|
||||
localRoot := s.GetMerkleRoot()
|
||||
if localRoot == nil {
|
||||
s.logger.Error("Local Merkle root is nil, cannot perform sync")
|
||||
return
|
||||
}
|
||||
|
||||
// 1. Get remote peer's Merkle root
|
||||
remoteRootResp, err := s.requestMerkleRoot(peer.Address)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("peer", peer.Address).Error("Failed to get remote Merkle root")
|
||||
s.gossipService.markPeerUnhealthy(peer.ID)
|
||||
return
|
||||
}
|
||||
remoteRoot := remoteRootResp.Root
|
||||
|
||||
// 2. Compare roots and start recursive diffing if they differ
|
||||
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peer.Address,
|
||||
"local_root": hex.EncodeToString(localRoot.Hash),
|
||||
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
||||
}).Info("Merkle roots differ, starting recursive diff")
|
||||
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
||||
} else {
|
||||
s.logger.WithField("peer", peer.Address).Info("Merkle roots match, no sync needed")
|
||||
}
|
||||
|
||||
s.logger.WithField("peer", peer.Address).Info("Completed Merkle tree sync")
|
||||
}
|
||||
|
||||
// requestMerkleRoot requests the Merkle root from a peer
|
||||
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("peer returned status %d for Merkle root", resp.StatusCode)
|
||||
}
|
||||
|
||||
var merkleRootResp types.MerkleRootResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&merkleRootResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &merkleRootResp, nil
|
||||
}
|
||||
|
||||
// diffMerkleTreesRecursive recursively compares local and remote Merkle tree nodes
|
||||
func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, remoteNode *types.MerkleNode) {
|
||||
// If hashes match, this subtree is in sync.
|
||||
if bytes.Equal(localNode.Hash, remoteNode.Hash) {
|
||||
return
|
||||
}
|
||||
|
||||
// Hashes differ, need to go deeper.
|
||||
// Request children from the remote peer for the current range.
|
||||
req := types.MerkleTreeDiffRequest{
|
||||
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
||||
LocalHash: localNode.Hash, // Our hash for this range
|
||||
}
|
||||
|
||||
remoteDiffResp, err := s.requestMerkleDiff(peerAddress, req)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"start_key": localNode.StartKey,
|
||||
"end_key": localNode.EndKey,
|
||||
}).Error("Failed to get Merkle diff from peer")
|
||||
return
|
||||
}
|
||||
|
||||
if len(remoteDiffResp.Keys) > 0 {
|
||||
// This is a leaf-level diff, we have the actual keys that are different.
|
||||
s.handleLeafLevelDiff(peerAddress, remoteDiffResp.Keys, localNode)
|
||||
} else if len(remoteDiffResp.Children) > 0 {
|
||||
// Not a leaf level, continue recursive diff for children.
|
||||
s.handleChildrenDiff(peerAddress, remoteDiffResp.Children)
|
||||
}
|
||||
}
|
||||
|
||||
// handleLeafLevelDiff processes leaf-level differences
|
||||
func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, localNode *types.MerkleNode) {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"start_key": localNode.StartKey,
|
||||
"end_key": localNode.EndKey,
|
||||
"num_keys": len(keys),
|
||||
}).Info("Found divergent keys, fetching and comparing data")
|
||||
|
||||
for _, key := range keys {
|
||||
// Fetch the individual key from the peer
|
||||
remoteStoredValue, err := s.fetchSingleKVFromPeer(peerAddress, key)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"key": key,
|
||||
}).Error("Failed to fetch single KV from peer during diff")
|
||||
continue
|
||||
}
|
||||
|
||||
localStoredValue, localExists := s.getLocalData(key)
|
||||
|
||||
if remoteStoredValue == nil {
|
||||
// Key was deleted on remote, delete locally if exists
|
||||
if localExists {
|
||||
s.logger.WithField("key", key).Info("Key deleted on remote, deleting locally")
|
||||
s.deleteKVLocally(key, localStoredValue.Timestamp)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !localExists {
|
||||
// Local data is missing, store the remote data
|
||||
if err := s.storeReplicatedDataWithMetadata(key, remoteStoredValue); err != nil {
|
||||
s.logger.WithError(err).WithField("key", key).Error("Failed to store missing replicated data")
|
||||
} else {
|
||||
s.logger.WithField("key", key).Info("Fetched and stored missing data from peer")
|
||||
}
|
||||
} else if localStoredValue.Timestamp < remoteStoredValue.Timestamp {
|
||||
// Remote is newer, store the remote data
|
||||
if err := s.storeReplicatedDataWithMetadata(key, remoteStoredValue); err != nil {
|
||||
s.logger.WithError(err).WithField("key", key).Error("Failed to store newer replicated data")
|
||||
} else {
|
||||
s.logger.WithField("key", key).Info("Fetched and stored newer data from peer")
|
||||
}
|
||||
} else if localStoredValue.Timestamp == remoteStoredValue.Timestamp && localStoredValue.UUID != remoteStoredValue.UUID {
|
||||
// Timestamp collision, engage conflict resolution
|
||||
s.resolveConflict(key, localStoredValue, remoteStoredValue, peerAddress)
|
||||
}
|
||||
// If local is newer or same timestamp and same UUID, do nothing.
|
||||
}
|
||||
}
|
||||
|
||||
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, nil // Key might have been deleted on the peer
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("peer returned status %d for path %s", resp.StatusCode, path)
|
||||
}
|
||||
|
||||
var storedValue types.StoredValue
|
||||
if err := json.NewDecoder(resp.Body).Decode(&storedValue); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode types.StoredValue from peer: %v", err)
|
||||
}
|
||||
|
||||
return &storedValue, nil
|
||||
}
|
||||
|
||||
// getLocalData is a utility to retrieve a types.StoredValue from local DB
|
||||
func (s *SyncService) getLocalData(path string) (*types.StoredValue, bool) {
|
||||
var storedValue types.StoredValue
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(path))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &storedValue)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &storedValue, true
|
||||
}
|
||||
|
||||
// deleteKVLocally deletes a key-value pair and its associated timestamp index locally
|
||||
func (s *SyncService) deleteKVLocally(key string, timestamp int64) error {
|
||||
return s.db.Update(func(txn *badger.Txn) error {
|
||||
// Delete the main key
|
||||
if err := txn.Delete([]byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete the timestamp index
|
||||
indexKey := fmt.Sprintf("_ts:%d:%s", timestamp, key)
|
||||
return txn.Delete([]byte(indexKey))
|
||||
})
|
||||
}
|
||||
|
||||
// storeReplicatedDataWithMetadata stores replicated data preserving its original metadata
|
||||
func (s *SyncService) storeReplicatedDataWithMetadata(path string, storedValue *types.StoredValue) error {
|
||||
valueBytes, err := json.Marshal(storedValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.db.Update(func(txn *badger.Txn) error {
|
||||
// Store main data
|
||||
if err := txn.Set([]byte(path), valueBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store timestamp index
|
||||
indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path)
|
||||
return txn.Set([]byte(indexKey), []byte(storedValue.UUID))
|
||||
})
|
||||
}
|
||||
|
||||
// resolveConflict performs sophisticated conflict resolution with majority vote and oldest-node tie-breaking
|
||||
func (s *SyncService) resolveConflict(key string, local, remote *types.StoredValue, peerAddress string) error {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"local_ts": local.Timestamp,
|
||||
"remote_ts": remote.Timestamp,
|
||||
"local_uuid": local.UUID,
|
||||
"remote_uuid": remote.UUID,
|
||||
"peer": peerAddress,
|
||||
}).Info("Resolving timestamp collision conflict")
|
||||
|
||||
if remote.Timestamp > local.Timestamp {
|
||||
// Remote is newer, store it
|
||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||
if err == nil {
|
||||
s.logger.WithField("key", key).Info("Conflict resolved: remote data wins (newer timestamp)")
|
||||
}
|
||||
return err
|
||||
} else if local.Timestamp > remote.Timestamp {
|
||||
// Local is newer, keep local data
|
||||
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (newer timestamp)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Timestamps are equal - need sophisticated conflict resolution
|
||||
s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule")
|
||||
|
||||
// Get cluster members to determine which node is older
|
||||
members := s.gossipService.GetMembers()
|
||||
|
||||
// Find the local node and the remote node in membership
|
||||
var localMember, remoteMember *types.Member
|
||||
localNodeID := s.config.NodeID
|
||||
|
||||
for _, member := range members {
|
||||
if member.ID == localNodeID {
|
||||
localMember = member
|
||||
}
|
||||
if member.Address == peerAddress {
|
||||
remoteMember = member
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
||||
if localMember == nil || remoteMember == nil {
|
||||
s.logger.WithField("key", key).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
||||
if remote.UUID < local.UUID {
|
||||
// Remote UUID lexically smaller (deterministic choice)
|
||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||
if err == nil {
|
||||
s.logger.WithField("key", key).Info("Conflict resolved: remote data wins (UUID tie-breaker)")
|
||||
}
|
||||
return err
|
||||
}
|
||||
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply oldest-node rule: node with earliest joined_timestamp wins
|
||||
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
|
||||
// Remote node is older, its data wins
|
||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||
if err == nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Local node is older or equal, keep local data
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
"local_joined": localMember.JoinedTimestamp,
|
||||
"remote_joined": remoteMember.JoinedTimestamp,
|
||||
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
||||
return nil
|
||||
}
|
||||
|
||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||
jsonData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
|
||||
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("peer returned status %d for Merkle diff", resp.StatusCode)
|
||||
}
|
||||
|
||||
var diffResp types.MerkleTreeDiffResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&diffResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &diffResp, nil
|
||||
}
|
||||
|
||||
// handleChildrenDiff processes children-level differences
|
||||
func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.MerkleNode) {
|
||||
localPairs, err := s.merkleService.GetAllKVPairsForMerkleTree()
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to get KV pairs for local children comparison")
|
||||
return
|
||||
}
|
||||
|
||||
for _, remoteChild := range children {
|
||||
// Build the local Merkle node for this child's range
|
||||
localChildNode, err := s.merkleService.BuildMerkleTreeFromPairs(FilterPairsByRange(localPairs, remoteChild.StartKey, remoteChild.EndKey))
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"start_key": remoteChild.StartKey,
|
||||
"end_key": remoteChild.EndKey,
|
||||
}).Error("Failed to build local child node for diff")
|
||||
continue
|
||||
}
|
||||
|
||||
if localChildNode == nil || !bytes.Equal(localChildNode.Hash, remoteChild.Hash) {
|
||||
// If local child node is nil (meaning local has no data in this range)
|
||||
// or hashes differ, then we need to fetch the data.
|
||||
if localChildNode == nil {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"start_key": remoteChild.StartKey,
|
||||
"end_key": remoteChild.EndKey,
|
||||
}).Info("Local node missing data in remote child's range, fetching full range")
|
||||
s.fetchAndStoreRange(peerAddress, remoteChild.StartKey, remoteChild.EndKey)
|
||||
} else {
|
||||
s.diffMerkleTreesRecursive(peerAddress, localChildNode, &remoteChild)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||
req := types.KVRangeRequest{
|
||||
StartKey: startKey,
|
||||
EndKey: endKey,
|
||||
Limit: 0, // No limit
|
||||
}
|
||||
jsonData, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
||||
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
||||
|
||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("peer returned status %d for KV range fetch", resp.StatusCode)
|
||||
}
|
||||
|
||||
var rangeResp types.KVRangeResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&rangeResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, pair := range rangeResp.Pairs {
|
||||
// Use storeReplicatedDataWithMetadata to preserve original UUID/Timestamp
|
||||
if err := s.storeReplicatedDataWithMetadata(pair.Path, &pair.StoredValue); err != nil {
|
||||
s.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"path": pair.Path,
|
||||
}).Error("Failed to store fetched range data")
|
||||
} else {
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"peer": peerAddress,
|
||||
"path": pair.Path,
|
||||
}).Debug("Stored data from fetched range")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
94
config/config.go
Normal file
94
config/config.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"kvs/types"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Default configuration
|
||||
func Default() *types.Config {
|
||||
hostname, _ := os.Hostname()
|
||||
return &types.Config{
|
||||
NodeID: hostname,
|
||||
BindAddress: "127.0.0.1",
|
||||
Port: 8080,
|
||||
DataDir: "./data",
|
||||
SeedNodes: []string{},
|
||||
ReadOnly: false,
|
||||
LogLevel: "info",
|
||||
GossipIntervalMin: 60, // 1 minute
|
||||
GossipIntervalMax: 120, // 2 minutes
|
||||
SyncInterval: 300, // 5 minutes
|
||||
CatchupInterval: 120, // 2 minutes
|
||||
BootstrapMaxAgeHours: 720, // 30 days
|
||||
ThrottleDelayMs: 100,
|
||||
FetchDelayMs: 50,
|
||||
|
||||
// Default compression settings
|
||||
CompressionEnabled: true,
|
||||
CompressionLevel: 3, // Balance between performance and compression ratio
|
||||
|
||||
// Default TTL and size limit settings
|
||||
DefaultTTL: "0", // No default TTL
|
||||
MaxJSONSize: 1048576, // 1MB default max JSON size
|
||||
|
||||
// Default rate limiting settings
|
||||
RateLimitRequests: 100, // 100 requests per window
|
||||
RateLimitWindow: "1m", // 1 minute window
|
||||
|
||||
// Default tamper-evident logging settings
|
||||
TamperLogActions: []string{"data_write", "user_create", "auth_failure"},
|
||||
|
||||
// Default backup system settings
|
||||
BackupEnabled: true,
|
||||
BackupSchedule: "0 0 * * *", // Daily at midnight
|
||||
BackupPath: "./backups",
|
||||
BackupRetention: 7, // Keep backups for 7 days
|
||||
|
||||
// Default feature toggle settings (all enabled by default)
|
||||
AuthEnabled: true,
|
||||
TamperLoggingEnabled: true,
|
||||
ClusteringEnabled: true,
|
||||
RateLimitingEnabled: true,
|
||||
RevisionHistoryEnabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
// Load configuration from file or create default
|
||||
func Load(configPath string) (*types.Config, error) {
|
||||
config := Default()
|
||||
|
||||
if _, err := os.Stat(configPath); os.IsNotExist(err) {
|
||||
// Create default config file
|
||||
if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create config directory: %v", err)
|
||||
}
|
||||
|
||||
data, err := yaml.Marshal(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal default config: %v", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(configPath, data, 0644); err != nil {
|
||||
return nil, fmt.Errorf("failed to write default config: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Created default configuration at %s\n", configPath)
|
||||
return config, nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read config file: %v", err)
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(data, config); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
102
features/auth.go
Normal file
102
features/auth.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// AuthContext holds authentication information for a request
|
||||
type AuthContext struct {
|
||||
UserUUID string `json:"user_uuid"`
|
||||
Scopes []string `json:"scopes"`
|
||||
Groups []string `json:"groups"`
|
||||
}
|
||||
|
||||
// CheckPermission validates if a user has permission to perform an operation
|
||||
func CheckPermission(permissions int, operation string, isOwner, isGroupMember bool) bool {
|
||||
switch operation {
|
||||
case "create":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerCreate) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupCreate) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersCreate) != 0
|
||||
|
||||
case "delete":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerDelete) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupDelete) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersDelete) != 0
|
||||
|
||||
case "write":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerWrite) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupWrite) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersWrite) != 0
|
||||
|
||||
case "read":
|
||||
if isOwner {
|
||||
return (permissions & types.PermOwnerRead) != 0
|
||||
}
|
||||
if isGroupMember {
|
||||
return (permissions & types.PermGroupRead) != 0
|
||||
}
|
||||
return (permissions & types.PermOthersRead) != 0
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// CheckUserResourceRelationship determines user relationship to resource
|
||||
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
|
||||
isOwner = (userUUID == metadata.OwnerUUID)
|
||||
|
||||
if metadata.GroupUUID != "" {
|
||||
for _, groupUUID := range userGroups {
|
||||
if groupUUID == metadata.GroupUUID {
|
||||
isGroupMember = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return isOwner, isGroupMember
|
||||
}
|
||||
|
||||
// ExtractTokenFromHeader extracts the Bearer token from the Authorization header
|
||||
func ExtractTokenFromHeader(r *http.Request) (string, error) {
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
if authHeader == "" {
|
||||
return "", fmt.Errorf("missing authorization header")
|
||||
}
|
||||
|
||||
parts := strings.Split(authHeader, " ")
|
||||
if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" {
|
||||
return "", fmt.Errorf("invalid authorization header format")
|
||||
}
|
||||
|
||||
return parts[1], nil
|
||||
}
|
||||
|
||||
// ExtractKVResourceKey extracts KV resource key from request
|
||||
func ExtractKVResourceKey(r *http.Request) string {
|
||||
vars := mux.Vars(r)
|
||||
if path, ok := vars["path"]; ok {
|
||||
return path
|
||||
}
|
||||
return ""
|
||||
}
|
11
features/backup.go
Normal file
11
features/backup.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetBackupFilename generates a filename for a backup
|
||||
func GetBackupFilename(timestamp time.Time) string {
|
||||
return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02"))
|
||||
}
|
4
features/features.go
Normal file
4
features/features.go
Normal file
@@ -0,0 +1,4 @@
|
||||
// Package features provides utility functions for KVS authentication, validation,
|
||||
// logging, backup, and other operational features. These functions were extracted
|
||||
// from main.go to improve code organization and maintainability.
|
||||
package features
|
8
features/ratelimit.go
Normal file
8
features/ratelimit.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package features
|
||||
|
||||
import "fmt"
|
||||
|
||||
// GetRateLimitKey generates the storage key for rate limiting
|
||||
func GetRateLimitKey(userUUID string, windowStart int64) string {
|
||||
return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart)
|
||||
}
|
8
features/revision.go
Normal file
8
features/revision.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package features
|
||||
|
||||
import "fmt"
|
||||
|
||||
// GetRevisionKey generates the storage key for a specific revision
|
||||
func GetRevisionKey(baseKey string, revision int) string {
|
||||
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
||||
}
|
24
features/tamperlog.go
Normal file
24
features/tamperlog.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"kvs/utils"
|
||||
)
|
||||
|
||||
// GetTamperLogKey generates the storage key for a tamper log entry
|
||||
func GetTamperLogKey(timestamp string, entryUUID string) string {
|
||||
return fmt.Sprintf("log:%s:%s", timestamp, entryUUID)
|
||||
}
|
||||
|
||||
// GetMerkleLogKey generates the storage key for hourly Merkle tree roots
|
||||
func GetMerkleLogKey(timestamp string) string {
|
||||
return fmt.Sprintf("log:merkle:%s", timestamp)
|
||||
}
|
||||
|
||||
// GenerateLogSignature creates a SHA3-512 signature for a log entry
|
||||
func GenerateLogSignature(timestamp, action, userUUID, resource string) string {
|
||||
// Concatenate all fields in a deterministic order
|
||||
data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource)
|
||||
return utils.HashSHA3512(data)
|
||||
}
|
24
features/validation.go
Normal file
24
features/validation.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package features
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ParseTTL converts a Go duration string to time.Duration
|
||||
func ParseTTL(ttlString string) (time.Duration, error) {
|
||||
if ttlString == "" || ttlString == "0" {
|
||||
return 0, nil // No TTL
|
||||
}
|
||||
|
||||
duration, err := time.ParseDuration(ttlString)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid TTL format: %v", err)
|
||||
}
|
||||
|
||||
if duration < 0 {
|
||||
return 0, fmt.Errorf("TTL cannot be negative")
|
||||
}
|
||||
|
||||
return duration, nil
|
||||
}
|
68
refactor.md
Normal file
68
refactor.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# Refactoring Proposal for KVS Main.go
|
||||
|
||||
After analyzing your 3,990-line main.go file, I've identified clear functional areas that can be separated into manageable modules.
|
||||
Here's my comprehensive refactoring proposal:
|
||||
|
||||
Proposed File Structure
|
||||
|
||||
kvs/
|
||||
├── main.go # Entry point + minimal server setup
|
||||
├── config/
|
||||
│ └── config.go # Configuration structures and loading
|
||||
├── types/
|
||||
│ └── types.go # All data structures and type definitions
|
||||
├── auth/
|
||||
│ ├── auth.go # Authentication & authorization logic
|
||||
│ ├── jwt.go # JWT token management
|
||||
│ ├── middleware.go # Auth middleware
|
||||
│ └── permissions.go # Permission checking utilities
|
||||
├── storage/
|
||||
│ ├── storage.go # BadgerDB operations and utilities
|
||||
│ ├── compression.go # ZSTD compression/decompression
|
||||
│ ├── ttl.go # TTL and metadata management
|
||||
│ └── revision.go # Revision history system
|
||||
├── cluster/
|
||||
│ ├── gossip.go # Gossip protocol implementation
|
||||
│ ├── members.go # Member management
|
||||
│ ├── sync.go # Data synchronization
|
||||
│ └── merkle.go # Merkle tree operations
|
||||
├── server/
|
||||
│ ├── server.go # Server struct and core methods
|
||||
│ ├── handlers.go # HTTP request handlers
|
||||
│ ├── routes.go # Route setup
|
||||
│ └── lifecycle.go # Server startup/shutdown logic
|
||||
├── features/
|
||||
│ ├── ratelimit.go # Rate limiting middleware and utilities
|
||||
│ ├── tamperlog.go # Tamper-evident logging
|
||||
│ └── backup.go # Backup system
|
||||
└── utils/
|
||||
└── hash.go # Hashing utilities (SHA3, etc.)
|
||||
|
||||
Key Benefits
|
||||
|
||||
1. Clear Separation of Concerns: Each package handles a specific responsibility
|
||||
2. Better Testability: Smaller, focused functions are easier to unit test
|
||||
3. Improved Maintainability: Changes to one feature don't affect others
|
||||
4. Go Best Practices: Follows standard Go project layout conventions
|
||||
5. Reduced Coupling: Clear interfaces between components
|
||||
|
||||
Functional Areas Identified
|
||||
|
||||
1. Configuration (~100 lines): Config structs, defaults, loading
|
||||
2. Types (~400 lines): All data structures and constants
|
||||
3. Authentication (~800 lines): User/Group/Token management, JWT, middleware
|
||||
4. Storage (~600 lines): BadgerDB operations, compression, TTL, revisions
|
||||
5. Clustering (~1,200 lines): Gossip, members, sync, Merkle trees
|
||||
6. Server (~600 lines): Server struct, handlers, routes, lifecycle
|
||||
7. Features (~200 lines): Rate limiting, tamper logging, backup
|
||||
8. Utilities (~90 lines): Hashing and other utilities
|
||||
|
||||
Migration Strategy
|
||||
|
||||
1. Start with the most independent modules (types, config, utils)
|
||||
2. Move storage and authentication components
|
||||
3. Extract clustering logic
|
||||
4. Refactor server components last
|
||||
5. Create commits for each major module migration
|
||||
|
||||
The refactoring will maintain zero functional changes - purely cosmetic restructuring for better code organization.
|
1273
server/handlers.go
Normal file
1273
server/handlers.go
Normal file
File diff suppressed because it is too large
Load Diff
79
server/lifecycle.go
Normal file
79
server/lifecycle.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Start the server and initialize all services
|
||||
func (s *Server) Start() error {
|
||||
router := s.setupRoutes()
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port)
|
||||
s.httpServer = &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}
|
||||
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": s.config.NodeID,
|
||||
"address": addr,
|
||||
}).Info("Starting KVS server")
|
||||
|
||||
// Start gossip and sync routines
|
||||
s.startBackgroundTasks()
|
||||
|
||||
// Try to join cluster if seed nodes are configured and clustering is enabled
|
||||
if s.config.ClusteringEnabled && len(s.config.SeedNodes) > 0 {
|
||||
go s.bootstrap()
|
||||
}
|
||||
|
||||
return s.httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
// Stop the server gracefully
|
||||
func (s *Server) Stop() error {
|
||||
s.logger.Info("Shutting down KVS server")
|
||||
|
||||
// Stop cluster services
|
||||
s.gossipService.Stop()
|
||||
s.syncService.Stop()
|
||||
|
||||
// Close storage services
|
||||
if s.storageService != nil {
|
||||
s.storageService.Close()
|
||||
}
|
||||
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := s.httpServer.Shutdown(ctx); err != nil {
|
||||
s.logger.WithError(err).Error("HTTP server shutdown error")
|
||||
}
|
||||
|
||||
if err := s.db.Close(); err != nil {
|
||||
s.logger.WithError(err).Error("BadgerDB close error")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startBackgroundTasks initializes and starts cluster services
|
||||
func (s *Server) startBackgroundTasks() {
|
||||
// Start cluster services
|
||||
s.gossipService.Start()
|
||||
s.syncService.Start()
|
||||
}
|
||||
|
||||
// bootstrap joins cluster using seed nodes via bootstrap service
|
||||
func (s *Server) bootstrap() {
|
||||
// Use bootstrap service to join cluster
|
||||
s.bootstrapService.Bootstrap()
|
||||
}
|
54
server/routes.go
Normal file
54
server/routes.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
// setupRoutes configures all HTTP routes and their handlers
|
||||
func (s *Server) setupRoutes() *mux.Router {
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Health endpoint
|
||||
router.HandleFunc("/health", s.healthHandler).Methods("GET")
|
||||
|
||||
// KV endpoints
|
||||
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
||||
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||
|
||||
// Member endpoints
|
||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST") // Still available for clients
|
||||
|
||||
// Merkle Tree endpoints
|
||||
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") // New endpoint for fetching ranges
|
||||
|
||||
// User Management endpoints
|
||||
router.HandleFunc("/api/users", s.createUserHandler).Methods("POST")
|
||||
router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET")
|
||||
router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT")
|
||||
router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE")
|
||||
|
||||
// Group Management endpoints
|
||||
router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT")
|
||||
router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE")
|
||||
|
||||
// Token Management endpoints
|
||||
router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST")
|
||||
|
||||
// Revision History endpoints
|
||||
router.HandleFunc("/api/data/{key}/history", s.getRevisionHistoryHandler).Methods("GET")
|
||||
router.HandleFunc("/api/data/{key}/history/{revision}", s.getSpecificRevisionHandler).Methods("GET")
|
||||
|
||||
// Backup Status endpoint
|
||||
router.HandleFunc("/api/backup/status", s.getBackupStatusHandler).Methods("GET")
|
||||
|
||||
return router
|
||||
}
|
184
server/server.go
Normal file
184
server/server.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/auth"
|
||||
"kvs/cluster"
|
||||
"kvs/storage"
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// Server represents the KVS node
|
||||
type Server struct {
|
||||
config *types.Config
|
||||
db *badger.DB
|
||||
mode string // "normal", "read-only", "syncing"
|
||||
modeMu sync.RWMutex
|
||||
logger *logrus.Logger
|
||||
httpServer *http.Server
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Cluster services
|
||||
gossipService *cluster.GossipService
|
||||
syncService *cluster.SyncService
|
||||
merkleService *cluster.MerkleService
|
||||
bootstrapService *cluster.BootstrapService
|
||||
|
||||
// Storage services
|
||||
storageService *storage.StorageService
|
||||
revisionService *storage.RevisionService
|
||||
|
||||
// Backup system
|
||||
cronScheduler *cron.Cron // Cron scheduler for backups
|
||||
backupStatus types.BackupStatus // Current backup status
|
||||
backupMu sync.RWMutex // Protects backup status
|
||||
|
||||
// Authentication service
|
||||
authService *auth.AuthService
|
||||
}
|
||||
|
||||
// NewServer initializes and returns a new Server instance
|
||||
func NewServer(config *types.Config) (*Server, error) {
|
||||
logger := logrus.New()
|
||||
logger.SetFormatter(&logrus.JSONFormatter{})
|
||||
|
||||
level, err := logrus.ParseLevel(config.LogLevel)
|
||||
if err != nil {
|
||||
level = logrus.InfoLevel
|
||||
}
|
||||
logger.SetLevel(level)
|
||||
|
||||
// Create data directory
|
||||
if err := os.MkdirAll(config.DataDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create data directory: %v", err)
|
||||
}
|
||||
|
||||
// Open BadgerDB
|
||||
opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger"))
|
||||
opts.Logger = nil // Disable badger's internal logging
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open BadgerDB: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Initialize cluster services
|
||||
merkleService := cluster.NewMerkleService(db, logger)
|
||||
gossipService := cluster.NewGossipService(config, logger)
|
||||
syncService := cluster.NewSyncService(db, config, gossipService, merkleService, logger)
|
||||
var server *Server // Forward declaration
|
||||
bootstrapService := cluster.NewBootstrapService(config, gossipService, syncService, logger, func(mode string) {
|
||||
if server != nil {
|
||||
server.setMode(mode)
|
||||
}
|
||||
})
|
||||
|
||||
server = &Server{
|
||||
config: config,
|
||||
db: db,
|
||||
mode: "normal",
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
gossipService: gossipService,
|
||||
syncService: syncService,
|
||||
merkleService: merkleService,
|
||||
bootstrapService: bootstrapService,
|
||||
}
|
||||
|
||||
if config.ReadOnly {
|
||||
server.setMode("read-only")
|
||||
}
|
||||
|
||||
// Initialize storage services
|
||||
storageService, err := storage.NewStorageService(db, config, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize storage service: %v", err)
|
||||
}
|
||||
server.storageService = storageService
|
||||
|
||||
// Initialize revision service
|
||||
server.revisionService = storage.NewRevisionService(storageService)
|
||||
|
||||
// Initialize authentication service
|
||||
server.authService = auth.NewAuthService(db, logger)
|
||||
|
||||
// Initialize Merkle tree using cluster service
|
||||
if err := server.syncService.InitializeMerkleTree(); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize Merkle tree: %v", err)
|
||||
}
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
// getMode returns the current server mode
|
||||
func (s *Server) getMode() string {
|
||||
s.modeMu.RLock()
|
||||
defer s.modeMu.RUnlock()
|
||||
return s.mode
|
||||
}
|
||||
|
||||
// setMode sets the server mode
|
||||
func (s *Server) setMode(mode string) {
|
||||
s.modeMu.Lock()
|
||||
defer s.modeMu.Unlock()
|
||||
oldMode := s.mode
|
||||
s.mode = mode
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"old_mode": oldMode,
|
||||
"new_mode": mode,
|
||||
}).Info("Mode changed")
|
||||
}
|
||||
|
||||
// addMember adds a member using cluster service
|
||||
func (s *Server) addMember(member *types.Member) {
|
||||
s.gossipService.AddMember(member)
|
||||
}
|
||||
|
||||
// removeMember removes a member using cluster service
|
||||
func (s *Server) removeMember(nodeID string) {
|
||||
s.gossipService.RemoveMember(nodeID)
|
||||
}
|
||||
|
||||
// getMembers returns all cluster members
|
||||
func (s *Server) getMembers() []*types.Member {
|
||||
return s.gossipService.GetMembers()
|
||||
}
|
||||
|
||||
// getJoinedTimestamp returns this node's joined timestamp (startup time)
|
||||
func (s *Server) getJoinedTimestamp() int64 {
|
||||
// For now, use a simple approach - this should be stored persistently
|
||||
return time.Now().UnixMilli()
|
||||
}
|
||||
|
||||
// getBackupStatus returns the current backup status
|
||||
func (s *Server) getBackupStatus() types.BackupStatus {
|
||||
s.backupMu.RLock()
|
||||
defer s.backupMu.RUnlock()
|
||||
|
||||
status := s.backupStatus
|
||||
|
||||
// Calculate next backup time if scheduler is running
|
||||
if s.cronScheduler != nil && len(s.cronScheduler.Entries()) > 0 {
|
||||
nextRun := s.cronScheduler.Entries()[0].Next
|
||||
if !nextRun.IsZero() {
|
||||
status.NextBackupTime = nextRun.Unix()
|
||||
}
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
60
storage/compression.go
Normal file
60
storage/compression.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
// CompressionService handles ZSTD compression and decompression
|
||||
type CompressionService struct {
|
||||
compressor *zstd.Encoder
|
||||
decompressor *zstd.Decoder
|
||||
}
|
||||
|
||||
// NewCompressionService creates a new compression service
|
||||
func NewCompressionService() (*CompressionService, error) {
|
||||
// Initialize ZSTD compressor
|
||||
compressor, err := zstd.NewWriter(nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize ZSTD compressor: %v", err)
|
||||
}
|
||||
|
||||
// Initialize ZSTD decompressor
|
||||
decompressor, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
compressor.Close()
|
||||
return nil, fmt.Errorf("failed to initialize ZSTD decompressor: %v", err)
|
||||
}
|
||||
|
||||
return &CompressionService{
|
||||
compressor: compressor,
|
||||
decompressor: decompressor,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the compression and decompression resources
|
||||
func (c *CompressionService) Close() {
|
||||
if c.compressor != nil {
|
||||
c.compressor.Close()
|
||||
}
|
||||
if c.decompressor != nil {
|
||||
c.decompressor.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// CompressData compresses data using ZSTD
|
||||
func (c *CompressionService) CompressData(data []byte) ([]byte, error) {
|
||||
if c.compressor == nil {
|
||||
return nil, fmt.Errorf("compressor not initialized")
|
||||
}
|
||||
return c.compressor.EncodeAll(data, make([]byte, 0, len(data))), nil
|
||||
}
|
||||
|
||||
// DecompressData decompresses ZSTD-compressed data
|
||||
func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, error) {
|
||||
if c.decompressor == nil {
|
||||
return nil, fmt.Errorf("decompressor not initialized")
|
||||
}
|
||||
return c.decompressor.DecodeAll(compressedData, nil)
|
||||
}
|
214
storage/revision.go
Normal file
214
storage/revision.go
Normal file
@@ -0,0 +1,214 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
|
||||
"kvs/auth"
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// RevisionService handles revision history management
|
||||
type RevisionService struct {
|
||||
storage *StorageService
|
||||
}
|
||||
|
||||
// NewRevisionService creates a new revision service
|
||||
func NewRevisionService(storage *StorageService) *RevisionService {
|
||||
return &RevisionService{
|
||||
storage: storage,
|
||||
}
|
||||
}
|
||||
|
||||
// GetRevisionKey generates the storage key for a specific revision
|
||||
func GetRevisionKey(baseKey string, revision int) string {
|
||||
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
||||
}
|
||||
|
||||
// StoreRevisionHistory stores a value and manages revision history (up to 3 revisions)
|
||||
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
|
||||
// Get existing metadata to check current revisions
|
||||
metadataKey := auth.ResourceMetadataKey(key)
|
||||
|
||||
var metadata types.ResourceMetadata
|
||||
var currentRevisions []int
|
||||
|
||||
// Try to get existing metadata
|
||||
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
|
||||
if err == badger.ErrKeyNotFound {
|
||||
// No existing metadata, this is a new key
|
||||
metadata = types.ResourceMetadata{
|
||||
OwnerUUID: "", // Will be set by caller if needed
|
||||
GroupUUID: "",
|
||||
Permissions: types.DefaultPermissions,
|
||||
TTL: "",
|
||||
CreatedAt: time.Now().Unix(),
|
||||
UpdatedAt: time.Now().Unix(),
|
||||
}
|
||||
currentRevisions = []int{}
|
||||
} else if err != nil {
|
||||
// Error reading metadata
|
||||
return fmt.Errorf("failed to read metadata: %v", err)
|
||||
} else {
|
||||
// Parse existing metadata
|
||||
err = json.Unmarshal(metadataData, &metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal metadata: %v", err)
|
||||
}
|
||||
|
||||
// Extract current revisions (we store them as a custom field)
|
||||
if metadata.TTL == "" {
|
||||
currentRevisions = []int{}
|
||||
} else {
|
||||
// For now, we'll manage revisions separately - let's create a new metadata field
|
||||
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
|
||||
}
|
||||
}
|
||||
|
||||
// Revision rotation logic: shift existing revisions
|
||||
if len(currentRevisions) >= 3 {
|
||||
// Delete oldest revision (rev:3)
|
||||
oldestRevKey := GetRevisionKey(key, 3)
|
||||
txn.Delete([]byte(oldestRevKey))
|
||||
|
||||
// Shift rev:2 → rev:3
|
||||
rev2Key := GetRevisionKey(key, 2)
|
||||
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
|
||||
if err == nil {
|
||||
rev3Key := GetRevisionKey(key, 3)
|
||||
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
|
||||
}
|
||||
|
||||
// Shift rev:1 → rev:2
|
||||
rev1Key := GetRevisionKey(key, 1)
|
||||
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
|
||||
if err == nil {
|
||||
rev2Key := GetRevisionKey(key, 2)
|
||||
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// Store current value as rev:1
|
||||
currentValueBytes, err := json.Marshal(storedValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal current value for revision: %v", err)
|
||||
}
|
||||
|
||||
rev1Key := GetRevisionKey(key, 1)
|
||||
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store revision 1: %v", err)
|
||||
}
|
||||
|
||||
// Update metadata with new revision count
|
||||
metadata.UpdatedAt = time.Now().Unix()
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal metadata: %v", err)
|
||||
}
|
||||
|
||||
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
|
||||
}
|
||||
|
||||
// GetRevisionHistory retrieves all available revisions for a given key
|
||||
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
|
||||
var revisions []map[string]interface{}
|
||||
|
||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||
// Check revisions 1, 2, 3
|
||||
for rev := 1; rev <= 3; rev++ {
|
||||
revKey := GetRevisionKey(key, rev)
|
||||
|
||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||
if err == badger.ErrKeyNotFound {
|
||||
continue // Skip missing revisions
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
|
||||
}
|
||||
|
||||
var storedValue types.StoredValue
|
||||
err = json.Unmarshal(revData, &storedValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
|
||||
}
|
||||
|
||||
var data interface{}
|
||||
err = json.Unmarshal(storedValue.Data, &data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
|
||||
}
|
||||
|
||||
revision := map[string]interface{}{
|
||||
"revision": rev,
|
||||
"uuid": storedValue.UUID,
|
||||
"timestamp": storedValue.Timestamp,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
revisions = append(revisions, revision)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sort revisions by revision number (newest first)
|
||||
// Note: they're already in order since we iterate 1->3, but reverse for newest first
|
||||
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
|
||||
revisions[i], revisions[j] = revisions[j], revisions[i]
|
||||
}
|
||||
|
||||
return revisions, nil
|
||||
}
|
||||
|
||||
// GetSpecificRevision retrieves a specific revision of a key
|
||||
func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.StoredValue, error) {
|
||||
if revision < 1 || revision > 3 {
|
||||
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
|
||||
}
|
||||
|
||||
var storedValue types.StoredValue
|
||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||
revKey := GetRevisionKey(key, revision)
|
||||
|
||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal(revData, &storedValue)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &storedValue, nil
|
||||
}
|
||||
|
||||
// GetRevisionFromPath extracts revision number from a path like "key/data/rev/2"
|
||||
func GetRevisionFromPath(path string) (string, int, error) {
|
||||
parts := strings.Split(path, "/")
|
||||
if len(parts) < 4 || parts[len(parts)-2] != "rev" {
|
||||
return "", 0, fmt.Errorf("invalid revision path format")
|
||||
}
|
||||
|
||||
revisionStr := parts[len(parts)-1]
|
||||
revision, err := strconv.Atoi(revisionStr)
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
|
||||
}
|
||||
|
||||
// Reconstruct the base key without the "/rev/N" suffix
|
||||
baseKey := strings.Join(parts[:len(parts)-2], "/")
|
||||
|
||||
return baseKey, revision, nil
|
||||
}
|
112
storage/storage.go
Normal file
112
storage/storage.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"kvs/types"
|
||||
)
|
||||
|
||||
// StorageService handles all BadgerDB operations and data management
|
||||
type StorageService struct {
|
||||
db *badger.DB
|
||||
config *types.Config
|
||||
compressionSvc *CompressionService
|
||||
logger *logrus.Logger
|
||||
}
|
||||
|
||||
// NewStorageService creates a new storage service
|
||||
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
|
||||
var compressionSvc *CompressionService
|
||||
var err error
|
||||
|
||||
// Initialize compression if enabled
|
||||
if config.CompressionEnabled {
|
||||
compressionSvc, err = NewCompressionService()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize compression: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &StorageService{
|
||||
db: db,
|
||||
config: config,
|
||||
compressionSvc: compressionSvc,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the storage service and its resources
|
||||
func (s *StorageService) Close() {
|
||||
if s.compressionSvc != nil {
|
||||
s.compressionSvc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// StoreWithTTL stores data with optional TTL and compression
|
||||
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
|
||||
var finalData []byte
|
||||
var err error
|
||||
|
||||
// Compress data if compression is enabled
|
||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||
finalData, err = s.compressionSvc.CompressData(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compress data: %v", err)
|
||||
}
|
||||
} else {
|
||||
finalData = data
|
||||
}
|
||||
|
||||
entry := badger.NewEntry(key, finalData)
|
||||
|
||||
// Apply TTL if specified
|
||||
if ttl > 0 {
|
||||
entry = entry.WithTTL(ttl)
|
||||
}
|
||||
|
||||
return txn.SetEntry(entry)
|
||||
}
|
||||
|
||||
// RetrieveWithDecompression retrieves and decompresses data from BadgerDB
|
||||
func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte) ([]byte, error) {
|
||||
item, err := txn.Get(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var compressedData []byte
|
||||
err = item.Value(func(val []byte) error {
|
||||
compressedData = append(compressedData, val...)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Decompress data if compression is enabled
|
||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||
return s.compressionSvc.DecompressData(compressedData)
|
||||
}
|
||||
|
||||
return compressedData, nil
|
||||
}
|
||||
|
||||
// CompressData compresses data using the compression service
|
||||
func (s *StorageService) CompressData(data []byte) ([]byte, error) {
|
||||
if !s.config.CompressionEnabled || s.compressionSvc == nil {
|
||||
return data, nil
|
||||
}
|
||||
return s.compressionSvc.CompressData(data)
|
||||
}
|
||||
|
||||
// DecompressData decompresses data using the compression service
|
||||
func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) {
|
||||
if !s.config.CompressionEnabled || s.compressionSvc == nil {
|
||||
return compressedData, nil
|
||||
}
|
||||
return s.compressionSvc.DecompressData(compressedData)
|
||||
}
|
@@ -1,3 +1,4 @@
|
||||
//go:build ignore
|
||||
// +build ignore
|
||||
|
||||
package main
|
||||
@@ -24,33 +25,33 @@ func createConflictingData(dataDir1, dataDir2 string) error {
|
||||
// Same timestamp, different UUIDs
|
||||
timestamp := time.Now().UnixMilli()
|
||||
path := "test/conflict/data"
|
||||
|
||||
|
||||
// Data for node1
|
||||
data1 := json.RawMessage(`{"message": "from node1", "value": 100}`)
|
||||
uuid1 := uuid.New().String()
|
||||
|
||||
|
||||
// Data for node2 (same timestamp, different UUID and content)
|
||||
data2 := json.RawMessage(`{"message": "from node2", "value": 200}`)
|
||||
uuid2 := uuid.New().String()
|
||||
|
||||
|
||||
// Store in node1's database
|
||||
err := storeConflictData(dataDir1, path, timestamp, uuid1, data1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store in node1: %v", err)
|
||||
}
|
||||
|
||||
// Store in node2's database
|
||||
|
||||
// Store in node2's database
|
||||
err = storeConflictData(dataDir2, path, timestamp, uuid2, data2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store in node2: %v", err)
|
||||
}
|
||||
|
||||
|
||||
fmt.Printf("Created conflict scenario:\n")
|
||||
fmt.Printf("Path: %s\n", path)
|
||||
fmt.Printf("Timestamp: %d\n", timestamp)
|
||||
fmt.Printf("Node1 UUID: %s, Data: %s\n", uuid1, string(data1))
|
||||
fmt.Printf("Node2 UUID: %s, Data: %s\n", uuid2, string(data2))
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -62,24 +63,24 @@ func storeConflictData(dataDir, path string, timestamp int64, uuid string, data
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
|
||||
storedValue := StoredValue{
|
||||
UUID: uuid,
|
||||
Timestamp: timestamp,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
|
||||
valueBytes, err := json.Marshal(storedValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
return db.Update(func(txn *badger.Txn) error {
|
||||
// Store main data
|
||||
if err := txn.Set([]byte(path), valueBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// Store timestamp index
|
||||
indexKey := fmt.Sprintf("_ts:%020d:%s", timestamp, path)
|
||||
return txn.Set([]byte(indexKey), []byte(uuid))
|
||||
@@ -91,13 +92,13 @@ func main() {
|
||||
fmt.Println("Usage: go run test_conflict.go <data_dir1> <data_dir2>")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
err := createConflictingData(os.Args[1], os.Args[2])
|
||||
if err != nil {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
||||
fmt.Println("Conflict data created successfully!")
|
||||
fmt.Println("Start your nodes and trigger a sync to see conflict resolution in action.")
|
||||
}
|
||||
}
|
||||
|
276
types/types.go
Normal file
276
types/types.go
Normal file
@@ -0,0 +1,276 @@
|
||||
package types
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
// Core data structures
|
||||
type StoredValue struct {
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
// Authentication & Authorization data structures
|
||||
|
||||
// User represents a system user
|
||||
type User struct {
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
||||
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
}
|
||||
|
||||
// Group represents a user group
|
||||
type Group struct {
|
||||
UUID string `json:"uuid"` // Server-generated UUID
|
||||
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
||||
Members []string `json:"members"` // List of user UUIDs in this group
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||
}
|
||||
|
||||
// APIToken represents a JWT authentication token
|
||||
type APIToken struct {
|
||||
TokenHash string `json:"token_hash"` // SHA3-512 hash of JWT token
|
||||
UserUUID string `json:"user_uuid"` // UUID of the user who owns this token
|
||||
Scopes []string `json:"scopes"` // List of permitted scopes (e.g., "read", "write")
|
||||
IssuedAt int64 `json:"issued_at"` // Unix timestamp when token was issued
|
||||
ExpiresAt int64 `json:"expires_at"` // Unix timestamp when token expires
|
||||
}
|
||||
|
||||
// ResourceMetadata contains ownership and permission information for stored resources
|
||||
type ResourceMetadata struct {
|
||||
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
||||
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
||||
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
||||
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
||||
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
||||
}
|
||||
|
||||
// Permission constants for POSIX-inspired ACL
|
||||
const (
|
||||
// Owner permissions (bits 11-8)
|
||||
PermOwnerCreate = 1 << 11
|
||||
PermOwnerDelete = 1 << 10
|
||||
PermOwnerWrite = 1 << 9
|
||||
PermOwnerRead = 1 << 8
|
||||
|
||||
// Group permissions (bits 7-4)
|
||||
PermGroupCreate = 1 << 7
|
||||
PermGroupDelete = 1 << 6
|
||||
PermGroupWrite = 1 << 5
|
||||
PermGroupRead = 1 << 4
|
||||
|
||||
// Others permissions (bits 3-0)
|
||||
PermOthersCreate = 1 << 3
|
||||
PermOthersDelete = 1 << 2
|
||||
PermOthersWrite = 1 << 1
|
||||
PermOthersRead = 1 << 0
|
||||
|
||||
// Default permissions: Owner(1111), Group(0110), Others(0010)
|
||||
DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) |
|
||||
(PermGroupWrite | PermGroupRead) |
|
||||
(PermOthersRead)
|
||||
)
|
||||
|
||||
// API request/response structures for authentication endpoints
|
||||
|
||||
// User Management API structures
|
||||
type CreateUserRequest struct {
|
||||
Nickname string `json:"nickname"`
|
||||
}
|
||||
|
||||
type CreateUserResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
}
|
||||
|
||||
type UpdateUserRequest struct {
|
||||
Nickname string `json:"nickname,omitempty"`
|
||||
Groups []string `json:"groups,omitempty"`
|
||||
}
|
||||
|
||||
type GetUserResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
NicknameHash string `json:"nickname_hash"`
|
||||
Groups []string `json:"groups"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
}
|
||||
|
||||
// Group Management API structures
|
||||
type CreateGroupRequest struct {
|
||||
Groupname string `json:"groupname"`
|
||||
Members []string `json:"members,omitempty"`
|
||||
}
|
||||
|
||||
type CreateGroupResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
}
|
||||
|
||||
type UpdateGroupRequest struct {
|
||||
Members []string `json:"members"`
|
||||
}
|
||||
|
||||
type GetGroupResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
NameHash string `json:"name_hash"`
|
||||
Members []string `json:"members"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
}
|
||||
|
||||
// Token Management API structures
|
||||
type CreateTokenRequest struct {
|
||||
UserUUID string `json:"user_uuid"`
|
||||
Scopes []string `json:"scopes"`
|
||||
}
|
||||
|
||||
type CreateTokenResponse struct {
|
||||
Token string `json:"token"`
|
||||
ExpiresAt int64 `json:"expires_at"`
|
||||
}
|
||||
|
||||
// Cluster and member management types
|
||||
type Member struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
LastSeen int64 `json:"last_seen"`
|
||||
JoinedTimestamp int64 `json:"joined_timestamp"`
|
||||
}
|
||||
|
||||
type JoinRequest struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
JoinedTimestamp int64 `json:"joined_timestamp"`
|
||||
}
|
||||
|
||||
type LeaveRequest struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
type PairsByTimeRequest struct {
|
||||
StartTimestamp int64 `json:"start_timestamp"`
|
||||
EndTimestamp int64 `json:"end_timestamp"`
|
||||
Limit int `json:"limit"`
|
||||
Prefix string `json:"prefix,omitempty"`
|
||||
}
|
||||
|
||||
type PairsByTimeResponse struct {
|
||||
Path string `json:"path"`
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
type PutResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// TTL-enabled PUT request structure
|
||||
type PutWithTTLRequest struct {
|
||||
Data json.RawMessage `json:"data"`
|
||||
TTL string `json:"ttl,omitempty"` // Go duration format
|
||||
}
|
||||
|
||||
// Tamper-evident logging data structures
|
||||
type TamperLogEntry struct {
|
||||
Timestamp string `json:"timestamp"` // RFC3339 format
|
||||
Action string `json:"action"` // Type of action
|
||||
UserUUID string `json:"user_uuid"` // User who performed the action
|
||||
Resource string `json:"resource"` // Resource affected
|
||||
Signature string `json:"signature"` // SHA3-512 hash of all fields
|
||||
}
|
||||
|
||||
// Backup system data structures
|
||||
type BackupStatus struct {
|
||||
LastBackupTime int64 `json:"last_backup_time"` // Unix timestamp
|
||||
LastBackupSuccess bool `json:"last_backup_success"` // Whether last backup succeeded
|
||||
LastBackupPath string `json:"last_backup_path"` // Path to last backup file
|
||||
NextBackupTime int64 `json:"next_backup_time"` // Unix timestamp of next scheduled backup
|
||||
BackupsRunning int `json:"backups_running"` // Number of backups currently running
|
||||
}
|
||||
|
||||
// Merkle Tree specific data structures
|
||||
type MerkleNode struct {
|
||||
Hash []byte `json:"hash"`
|
||||
StartKey string `json:"start_key"` // The first key in this node's range
|
||||
EndKey string `json:"end_key"` // The last key in this node's range
|
||||
}
|
||||
|
||||
// MerkleRootResponse is the response for getting the root hash
|
||||
type MerkleRootResponse struct {
|
||||
Root *MerkleNode `json:"root"`
|
||||
}
|
||||
|
||||
// MerkleTreeDiffRequest is used to request children hashes for a given key range
|
||||
type MerkleTreeDiffRequest struct {
|
||||
ParentNode MerkleNode `json:"parent_node"` // The node whose children we want to compare (from the remote peer's perspective)
|
||||
LocalHash []byte `json:"local_hash"` // The local hash of this node/range (from the requesting peer's perspective)
|
||||
}
|
||||
|
||||
// MerkleTreeDiffResponse returns the remote children nodes or the actual keys if it's a leaf level
|
||||
type MerkleTreeDiffResponse struct {
|
||||
Children []MerkleNode `json:"children,omitempty"` // Children of the remote node
|
||||
Keys []string `json:"keys,omitempty"` // Actual keys if this is a leaf-level diff
|
||||
}
|
||||
|
||||
// For fetching a range of KV pairs
|
||||
type KVRangeRequest struct {
|
||||
StartKey string `json:"start_key"`
|
||||
EndKey string `json:"end_key"`
|
||||
Limit int `json:"limit"` // Max number of items to return
|
||||
}
|
||||
|
||||
type KVRangeResponse struct {
|
||||
Pairs []struct {
|
||||
Path string `json:"path"`
|
||||
StoredValue StoredValue `json:"stored_value"`
|
||||
} `json:"pairs"`
|
||||
}
|
||||
|
||||
// Configuration
|
||||
type Config struct {
|
||||
NodeID string `yaml:"node_id"`
|
||||
BindAddress string `yaml:"bind_address"`
|
||||
Port int `yaml:"port"`
|
||||
DataDir string `yaml:"data_dir"`
|
||||
SeedNodes []string `yaml:"seed_nodes"`
|
||||
ReadOnly bool `yaml:"read_only"`
|
||||
LogLevel string `yaml:"log_level"`
|
||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||
SyncInterval int `yaml:"sync_interval"`
|
||||
CatchupInterval int `yaml:"catchup_interval"`
|
||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||
|
||||
// Database compression configuration
|
||||
CompressionEnabled bool `yaml:"compression_enabled"`
|
||||
CompressionLevel int `yaml:"compression_level"`
|
||||
|
||||
// TTL configuration
|
||||
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
||||
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
||||
|
||||
// Rate limiting configuration
|
||||
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
||||
RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format)
|
||||
|
||||
// Tamper-evident logging configuration
|
||||
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
||||
|
||||
// Backup system configuration
|
||||
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
||||
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
||||
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
||||
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
||||
|
||||
// Feature toggles for optional functionalities
|
||||
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
||||
TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging
|
||||
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
|
||||
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
|
||||
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
||||
}
|
25
utils/hash.go
Normal file
25
utils/hash.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"golang.org/x/crypto/sha3"
|
||||
)
|
||||
|
||||
// SHA3-512 hashing utilities for authentication
|
||||
func HashSHA3512(input string) string {
|
||||
hasher := sha3.New512()
|
||||
hasher.Write([]byte(input))
|
||||
return hex.EncodeToString(hasher.Sum(nil))
|
||||
}
|
||||
|
||||
func HashUserNickname(nickname string) string {
|
||||
return HashSHA3512(nickname)
|
||||
}
|
||||
|
||||
func HashGroupName(groupname string) string {
|
||||
return HashSHA3512(groupname)
|
||||
}
|
||||
|
||||
func HashToken(token string) string {
|
||||
return HashSHA3512(token)
|
||||
}
|
Reference in New Issue
Block a user