diff --git a/go.mod b/go.mod index e1f5611..b624e76 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,13 @@ go 1.21 require ( github.com/dgraph-io/badger/v4 v4.2.0 + github.com/golang-jwt/jwt/v4 v4.5.2 github.com/google/uuid v1.4.0 github.com/gorilla/mux v1.8.1 + github.com/klauspost/compress v1.17.4 + github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.9.3 + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 gopkg.in/yaml.v3 v3.0.1 ) @@ -20,11 +24,10 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/google/flatbuffers v1.12.1 // indirect - github.com/klauspost/compress v1.12.3 // indirect github.com/kr/text v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect go.opencensus.io v0.22.5 // indirect - golang.org/x/net v0.7.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.14.0 // indirect google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/go.sum b/go.sum index 49f01d3..e38ea93 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= +github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= @@ -42,8 +44,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -52,6 +54,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -64,6 +68,7 @@ go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -79,8 +84,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -95,8 +100,8 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index ea5b07e..a4d417d 100644 --- a/main.go +++ b/main.go @@ -21,9 +21,13 @@ import ( "time" badger "github.com/dgraph-io/badger/v4" + "github.com/golang-jwt/jwt/v4" "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/klauspost/compress/zstd" + "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + "golang.org/x/crypto/sha3" "gopkg.in/yaml.v3" ) @@ -34,6 +38,128 @@ type StoredValue struct { Data json.RawMessage `json:"data"` } +// Phase 2: Authentication & Authorization data structures + +// User represents a system user +type User struct { + UUID string `json:"uuid"` // Server-generated UUID + NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname + Groups []string `json:"groups"` // List of group UUIDs this user belongs to + CreatedAt int64 `json:"created_at"` // Unix timestamp + UpdatedAt int64 `json:"updated_at"` // Unix timestamp +} + +// Group represents a user group +type Group struct { + UUID string `json:"uuid"` // Server-generated UUID + NameHash string `json:"name_hash"` // SHA3-512 hash of group name + Members []string `json:"members"` // List of user UUIDs in this group + CreatedAt int64 `json:"created_at"` // Unix timestamp + UpdatedAt int64 `json:"updated_at"` // Unix timestamp +} + +// APIToken represents a JWT authentication token +type APIToken struct { + TokenHash string `json:"token_hash"` // SHA3-512 hash of JWT token + UserUUID string `json:"user_uuid"` // UUID of the user who owns this token + Scopes []string `json:"scopes"` // List of permitted scopes (e.g., "read", "write") + IssuedAt int64 `json:"issued_at"` // Unix timestamp when token was issued + ExpiresAt int64 `json:"expires_at"` // Unix timestamp when token expires +} + +// ResourceMetadata contains ownership and permission information for stored resources +type ResourceMetadata struct { + OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner + GroupUUID string `json:"group_uuid"` // UUID of the resource group + Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired) + TTL string `json:"ttl"` // Time-to-live duration (Go format) + CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created + UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated +} + +// Permission constants for POSIX-inspired ACL +const ( + // Owner permissions (bits 11-8) + PermOwnerCreate = 1 << 11 + PermOwnerDelete = 1 << 10 + PermOwnerWrite = 1 << 9 + PermOwnerRead = 1 << 8 + + // Group permissions (bits 7-4) + PermGroupCreate = 1 << 7 + PermGroupDelete = 1 << 6 + PermGroupWrite = 1 << 5 + PermGroupRead = 1 << 4 + + // Others permissions (bits 3-0) + PermOthersCreate = 1 << 3 + PermOthersDelete = 1 << 2 + PermOthersWrite = 1 << 1 + PermOthersRead = 1 << 0 + + // Default permissions: Owner(1111), Group(0110), Others(0010) + DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) | + (PermGroupWrite | PermGroupRead) | + (PermOthersRead) +) + +// Phase 2: API request/response structures for authentication endpoints + +// User Management API structures +type CreateUserRequest struct { + Nickname string `json:"nickname"` +} + +type CreateUserResponse struct { + UUID string `json:"uuid"` +} + +type UpdateUserRequest struct { + Nickname string `json:"nickname,omitempty"` + Groups []string `json:"groups,omitempty"` +} + +type GetUserResponse struct { + UUID string `json:"uuid"` + NicknameHash string `json:"nickname_hash"` + Groups []string `json:"groups"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` +} + +// Group Management API structures +type CreateGroupRequest struct { + Groupname string `json:"groupname"` + Members []string `json:"members,omitempty"` +} + +type CreateGroupResponse struct { + UUID string `json:"uuid"` +} + +type UpdateGroupRequest struct { + Members []string `json:"members"` +} + +type GetGroupResponse struct { + UUID string `json:"uuid"` + NameHash string `json:"name_hash"` + Members []string `json:"members"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` +} + +// Token Management API structures +type CreateTokenRequest struct { + UserUUID string `json:"user_uuid"` + Scopes []string `json:"scopes"` +} + +type CreateTokenResponse struct { + Token string `json:"token"` + ExpiresAt int64 `json:"expires_at"` +} + type Member struct { ID string `json:"id"` Address string `json:"address"` @@ -69,6 +195,30 @@ type PutResponse struct { Timestamp int64 `json:"timestamp"` } +// Phase 2: TTL-enabled PUT request structure +type PutWithTTLRequest struct { + Data json.RawMessage `json:"data"` + TTL string `json:"ttl,omitempty"` // Go duration format +} + +// Phase 2: Tamper-evident logging data structures +type TamperLogEntry struct { + Timestamp string `json:"timestamp"` // RFC3339 format + Action string `json:"action"` // Type of action + UserUUID string `json:"user_uuid"` // User who performed the action + Resource string `json:"resource"` // Resource affected + Signature string `json:"signature"` // SHA3-512 hash of all fields +} + +// Phase 2: Backup system data structures +type BackupStatus struct { + LastBackupTime int64 `json:"last_backup_time"` // Unix timestamp + LastBackupSuccess bool `json:"last_backup_success"` // Whether last backup succeeded + LastBackupPath string `json:"last_backup_path"` // Path to last backup file + NextBackupTime int64 `json:"next_backup_time"` // Unix timestamp of next scheduled backup + BackupsRunning int `json:"backups_running"` // Number of backups currently running +} + // Merkle Tree specific data structures type MerkleNode struct { Hash []byte `json:"hash"` @@ -123,6 +273,27 @@ type Config struct { BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"` ThrottleDelayMs int `yaml:"throttle_delay_ms"` FetchDelayMs int `yaml:"fetch_delay_ms"` + + // Phase 2: Database compression configuration + CompressionEnabled bool `yaml:"compression_enabled"` + CompressionLevel int `yaml:"compression_level"` + + // Phase 2: TTL configuration + DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL + MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes + + // Phase 2: Rate limiting configuration + RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window + RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format) + + // Phase 2: Tamper-evident logging configuration + TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log + + // Phase 2: Backup system configuration + BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups + BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format + BackupPath string `yaml:"backup_path"` // Directory to store backups + BackupRetention int `yaml:"backup_retention"` // Days to keep backups } // Server represents the KVS node @@ -140,6 +311,1140 @@ type Server struct { wg sync.WaitGroup merkleRoot *MerkleNode // Added for Merkle Tree merkleRootMu sync.RWMutex // Protects merkleRoot + + // Phase 2: ZSTD compression + compressor *zstd.Encoder // ZSTD compressor + decompressor *zstd.Decoder // ZSTD decompressor + + // Phase 2: Backup system + cronScheduler *cron.Cron // Cron scheduler for backups + backupStatus BackupStatus // Current backup status + backupMu sync.RWMutex // Protects backup status +} + +// SHA3-512 hashing utilities for Phase 2 authentication +func hashSHA3512(input string) string { + hasher := sha3.New512() + hasher.Write([]byte(input)) + return hex.EncodeToString(hasher.Sum(nil)) +} + +func hashUserNickname(nickname string) string { + return hashSHA3512(nickname) +} + +func hashGroupName(groupname string) string { + return hashSHA3512(groupname) +} + +func hashToken(token string) string { + return hashSHA3512(token) +} + +// Phase 2: Storage key generation utilities +func userStorageKey(userUUID string) string { + return "user:" + userUUID +} + +func groupStorageKey(groupUUID string) string { + return "group:" + groupUUID +} + +func tokenStorageKey(tokenHash string) string { + return "token:" + tokenHash +} + +func resourceMetadataKey(resourceKey string) string { + return resourceKey + ":metadata" +} + +// Phase 2: Permission checking utilities +func checkPermission(permissions int, operation string, isOwner, isGroupMember bool) bool { + switch operation { + case "create": + if isOwner { + return (permissions & PermOwnerCreate) != 0 + } + if isGroupMember { + return (permissions & PermGroupCreate) != 0 + } + return (permissions & PermOthersCreate) != 0 + + case "delete": + if isOwner { + return (permissions & PermOwnerDelete) != 0 + } + if isGroupMember { + return (permissions & PermGroupDelete) != 0 + } + return (permissions & PermOthersDelete) != 0 + + case "write": + if isOwner { + return (permissions & PermOwnerWrite) != 0 + } + if isGroupMember { + return (permissions & PermGroupWrite) != 0 + } + return (permissions & PermOthersWrite) != 0 + + case "read": + if isOwner { + return (permissions & PermOwnerRead) != 0 + } + if isGroupMember { + return (permissions & PermGroupRead) != 0 + } + return (permissions & PermOthersRead) != 0 + + default: + return false + } +} + +// Helper function to determine user relationship to resource +func checkUserResourceRelationship(userUUID string, metadata *ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) { + isOwner = (userUUID == metadata.OwnerUUID) + + if metadata.GroupUUID != "" { + for _, groupUUID := range userGroups { + if groupUUID == metadata.GroupUUID { + isGroupMember = true + break + } + } + } + + return isOwner, isGroupMember +} + +// Phase 2: JWT token management utilities + +// JWT signing key (should be configurable in production) +var jwtSigningKey = []byte("your-secret-signing-key-change-this-in-production") + +// JWTClaims represents the custom claims for our JWT tokens +type JWTClaims struct { + UserUUID string `json:"user_uuid"` + Scopes []string `json:"scopes"` + jwt.RegisteredClaims +} + +// generateJWT creates a new JWT token for a user with specified scopes +func generateJWT(userUUID string, scopes []string, expirationHours int) (string, int64, error) { + if expirationHours <= 0 { + expirationHours = 1 // Default to 1 hour + } + + now := time.Now() + expiresAt := now.Add(time.Duration(expirationHours) * time.Hour) + + claims := JWTClaims{ + UserUUID: userUUID, + Scopes: scopes, + RegisteredClaims: jwt.RegisteredClaims{ + IssuedAt: jwt.NewNumericDate(now), + ExpiresAt: jwt.NewNumericDate(expiresAt), + Issuer: "kvs-server", + }, + } + + token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) + tokenString, err := token.SignedString(jwtSigningKey) + if err != nil { + return "", 0, err + } + + return tokenString, expiresAt.Unix(), nil +} + +// validateJWT validates a JWT token and returns the claims if valid +func validateJWT(tokenString string) (*JWTClaims, error) { + token, err := jwt.ParseWithClaims(tokenString, &JWTClaims{}, func(token *jwt.Token) (interface{}, error) { + // Validate signing method + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) + } + return jwtSigningKey, nil + }) + + if err != nil { + return nil, err + } + + if claims, ok := token.Claims.(*JWTClaims); ok && token.Valid { + return claims, nil + } + + return nil, fmt.Errorf("invalid token") +} + +// storeAPIToken stores an API token in BadgerDB with TTL +func (s *Server) storeAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error { + tokenHash := hashToken(tokenString) + + apiToken := APIToken{ + TokenHash: tokenHash, + UserUUID: userUUID, + Scopes: scopes, + IssuedAt: time.Now().Unix(), + ExpiresAt: expiresAt, + } + + tokenData, err := json.Marshal(apiToken) + if err != nil { + return err + } + + return s.db.Update(func(txn *badger.Txn) error { + entry := badger.NewEntry([]byte(tokenStorageKey(tokenHash)), tokenData) + + // Set TTL to the token expiration time + ttl := time.Until(time.Unix(expiresAt, 0)) + if ttl > 0 { + entry = entry.WithTTL(ttl) + } + + return txn.SetEntry(entry) + }) +} + +// getAPIToken retrieves an API token from BadgerDB by hash +func (s *Server) getAPIToken(tokenHash string) (*APIToken, error) { + var apiToken APIToken + + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(tokenStorageKey(tokenHash))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &apiToken) + }) + }) + + if err != nil { + return nil, err + } + + return &apiToken, nil +} + +// Phase 2: Authorization middleware + +// AuthContext holds authentication information for a request +type AuthContext struct { + UserUUID string `json:"user_uuid"` + Scopes []string `json:"scopes"` + Groups []string `json:"groups"` +} + +// extractTokenFromHeader extracts the Bearer token from the Authorization header +func extractTokenFromHeader(r *http.Request) (string, error) { + authHeader := r.Header.Get("Authorization") + if authHeader == "" { + return "", fmt.Errorf("missing authorization header") + } + + parts := strings.Split(authHeader, " ") + if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" { + return "", fmt.Errorf("invalid authorization header format") + } + + return parts[1], nil +} + +// getUserGroups retrieves all groups that a user belongs to +func (s *Server) getUserGroups(userUUID string) ([]string, error) { + var user User + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(userStorageKey(userUUID))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &user) + }) + }) + + if err != nil { + return nil, err + } + + return user.Groups, nil +} + +// authenticateRequest validates the JWT token and returns authentication context +func (s *Server) authenticateRequest(r *http.Request) (*AuthContext, error) { + // Extract token from header + tokenString, err := extractTokenFromHeader(r) + if err != nil { + return nil, err + } + + // Validate JWT token + claims, err := validateJWT(tokenString) + if err != nil { + return nil, fmt.Errorf("invalid token: %v", err) + } + + // Verify token exists in our database (not revoked) + tokenHash := hashToken(tokenString) + _, err = s.getAPIToken(tokenHash) + if err == badger.ErrKeyNotFound { + return nil, fmt.Errorf("token not found or revoked") + } + if err != nil { + return nil, fmt.Errorf("failed to verify token: %v", err) + } + + // Get user's groups + groups, err := s.getUserGroups(claims.UserUUID) + if err != nil { + s.logger.WithError(err).WithField("user_uuid", claims.UserUUID).Warn("Failed to get user groups") + groups = []string{} // Continue with empty groups on error + } + + return &AuthContext{ + UserUUID: claims.UserUUID, + Scopes: claims.Scopes, + Groups: groups, + }, nil +} + +// checkResourcePermission checks if a user has permission to perform an operation on a resource +func (s *Server) checkResourcePermission(authCtx *AuthContext, resourceKey string, operation string) bool { + // Get resource metadata + var metadata ResourceMetadata + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(resourceMetadataKey(resourceKey))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &metadata) + }) + }) + + // If no metadata exists, use default permissions + if err == badger.ErrKeyNotFound { + metadata = ResourceMetadata{ + OwnerUUID: authCtx.UserUUID, // Treat requester as owner for new resources + GroupUUID: "", + Permissions: DefaultPermissions, + } + } else if err != nil { + s.logger.WithError(err).WithField("resource_key", resourceKey).Warn("Failed to get resource metadata") + return false + } + + // Check user relationship to resource + isOwner, isGroupMember := checkUserResourceRelationship(authCtx.UserUUID, &metadata, authCtx.Groups) + + // Check permission + return checkPermission(metadata.Permissions, operation, isOwner, isGroupMember) +} + +// authMiddleware is the HTTP middleware that enforces authentication and authorization +func (s *Server) authMiddleware(requiredScopes []string, resourceKeyExtractor func(*http.Request) string, operation string) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Authenticate request + authCtx, err := s.authenticateRequest(r) + if err != nil { + s.logger.WithError(err).WithField("path", r.URL.Path).Info("Authentication failed") + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + // Check required scopes + if len(requiredScopes) > 0 { + hasRequiredScope := false + for _, required := range requiredScopes { + for _, scope := range authCtx.Scopes { + if scope == required { + hasRequiredScope = true + break + } + } + if hasRequiredScope { + break + } + } + + if !hasRequiredScope { + s.logger.WithFields(logrus.Fields{ + "user_uuid": authCtx.UserUUID, + "user_scopes": authCtx.Scopes, + "required_scopes": requiredScopes, + }).Info("Insufficient scopes") + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + } + + // Check resource-level permissions if applicable + if resourceKeyExtractor != nil && operation != "" { + resourceKey := resourceKeyExtractor(r) + if resourceKey != "" { + hasPermission := s.checkResourcePermission(authCtx, resourceKey, operation) + if !hasPermission { + s.logger.WithFields(logrus.Fields{ + "user_uuid": authCtx.UserUUID, + "resource_key": resourceKey, + "operation": operation, + }).Info("Permission denied") + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + } + } + + // Store auth context in request context for use in handlers + ctx := context.WithValue(r.Context(), "auth", authCtx) + r = r.WithContext(ctx) + + next(w, r) + } + } +} + +// Helper function to extract KV resource key from request +func extractKVResourceKey(r *http.Request) string { + vars := mux.Vars(r) + if path, ok := vars["path"]; ok { + return path + } + return "" +} + +// Phase 2: ZSTD compression utilities + +// compressData compresses JSON data using ZSTD if compression is enabled +func (s *Server) compressData(data []byte) ([]byte, error) { + if !s.config.CompressionEnabled || s.compressor == nil { + return data, nil + } + + return s.compressor.EncodeAll(data, make([]byte, 0, len(data))), nil +} + +// decompressData decompresses ZSTD-compressed data if compression is enabled +func (s *Server) decompressData(compressedData []byte) ([]byte, error) { + if !s.config.CompressionEnabled || s.decompressor == nil { + return compressedData, nil + } + + return s.decompressor.DecodeAll(compressedData, nil) +} + +// Phase 2: TTL and size validation utilities + +// parseTTL converts a Go duration string to time.Duration +func parseTTL(ttlString string) (time.Duration, error) { + if ttlString == "" || ttlString == "0" { + return 0, nil // No TTL + } + + duration, err := time.ParseDuration(ttlString) + if err != nil { + return 0, fmt.Errorf("invalid TTL format: %v", err) + } + + if duration < 0 { + return 0, fmt.Errorf("TTL cannot be negative") + } + + return duration, nil +} + +// validateJSONSize checks if JSON data exceeds maximum allowed size +func (s *Server) validateJSONSize(data []byte) error { + if s.config.MaxJSONSize > 0 && len(data) > s.config.MaxJSONSize { + return fmt.Errorf("JSON size (%d bytes) exceeds maximum allowed size (%d bytes)", + len(data), s.config.MaxJSONSize) + } + return nil +} + +// createResourceMetadata creates metadata for a new resource with TTL and permissions +func (s *Server) createResourceMetadata(ownerUUID, groupUUID, ttlString string, permissions int) (*ResourceMetadata, error) { + now := time.Now().Unix() + + metadata := &ResourceMetadata{ + OwnerUUID: ownerUUID, + GroupUUID: groupUUID, + Permissions: permissions, + TTL: ttlString, + CreatedAt: now, + UpdatedAt: now, + } + + return metadata, nil +} + +// storeWithTTL stores data in BadgerDB with optional TTL +func (s *Server) storeWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error { + // Compress data if compression is enabled + compressedData, err := s.compressData(data) + if err != nil { + return fmt.Errorf("failed to compress data: %v", err) + } + + entry := badger.NewEntry(key, compressedData) + + // Apply TTL if specified + if ttl > 0 { + entry = entry.WithTTL(ttl) + } + + return txn.SetEntry(entry) +} + +// retrieveWithDecompression retrieves and decompresses data from BadgerDB +func (s *Server) retrieveWithDecompression(txn *badger.Txn, key []byte) ([]byte, error) { + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + var compressedData []byte + err = item.Value(func(val []byte) error { + compressedData = append(compressedData, val...) + return nil + }) + if err != nil { + return nil, err + } + + // Decompress data if compression is enabled + return s.decompressData(compressedData) +} + +// Phase 2: Revision history system utilities + +// getRevisionKey generates the storage key for a specific revision +func getRevisionKey(baseKey string, revision int) string { + return fmt.Sprintf("%s:rev:%d", baseKey, revision) +} + +// storeRevisionHistory stores a value and manages revision history (up to 3 revisions) +func (s *Server) storeRevisionHistory(txn *badger.Txn, key string, storedValue StoredValue, ttl time.Duration) error { + // Get existing metadata to check current revisions + metadataKey := resourceMetadataKey(key) + + var metadata ResourceMetadata + var currentRevisions []int + + // Try to get existing metadata + metadataData, err := s.retrieveWithDecompression(txn, []byte(metadataKey)) + if err == badger.ErrKeyNotFound { + // No existing metadata, this is a new key + metadata = ResourceMetadata{ + OwnerUUID: "", // Will be set by caller if needed + GroupUUID: "", + Permissions: DefaultPermissions, + TTL: "", + CreatedAt: time.Now().Unix(), + UpdatedAt: time.Now().Unix(), + } + currentRevisions = []int{} + } else if err != nil { + // Error reading metadata + return fmt.Errorf("failed to read metadata: %v", err) + } else { + // Parse existing metadata + err = json.Unmarshal(metadataData, &metadata) + if err != nil { + return fmt.Errorf("failed to unmarshal metadata: %v", err) + } + + // Extract current revisions (we store them as a custom field) + if metadata.TTL == "" { + currentRevisions = []int{} + } else { + // For now, we'll manage revisions separately - let's create a new metadata field + currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys + } + } + + // Revision rotation logic: shift existing revisions + if len(currentRevisions) >= 3 { + // Delete oldest revision (rev:3) + oldestRevKey := getRevisionKey(key, 3) + txn.Delete([]byte(oldestRevKey)) + + // Shift rev:2 → rev:3 + rev2Key := getRevisionKey(key, 2) + rev2Data, err := s.retrieveWithDecompression(txn, []byte(rev2Key)) + if err == nil { + rev3Key := getRevisionKey(key, 3) + s.storeWithTTL(txn, []byte(rev3Key), rev2Data, ttl) + } + + // Shift rev:1 → rev:2 + rev1Key := getRevisionKey(key, 1) + rev1Data, err := s.retrieveWithDecompression(txn, []byte(rev1Key)) + if err == nil { + rev2Key := getRevisionKey(key, 2) + s.storeWithTTL(txn, []byte(rev2Key), rev1Data, ttl) + } + + currentRevisions = []int{1, 2, 3} + } else if len(currentRevisions) == 2 { + // Shift rev:1 → rev:2 + rev1Key := getRevisionKey(key, 1) + rev1Data, err := s.retrieveWithDecompression(txn, []byte(rev1Key)) + if err == nil { + rev2Key := getRevisionKey(key, 2) + s.storeWithTTL(txn, []byte(rev2Key), rev1Data, ttl) + } + currentRevisions = []int{1, 2, 3} + } else if len(currentRevisions) == 1 { + currentRevisions = []int{1, 2} + } else { + currentRevisions = []int{1} + } + + // Store new value as rev:1 + storedValueData, err := json.Marshal(storedValue) + if err != nil { + return fmt.Errorf("failed to marshal stored value: %v", err) + } + + rev1Key := getRevisionKey(key, 1) + err = s.storeWithTTL(txn, []byte(rev1Key), storedValueData, ttl) + if err != nil { + return fmt.Errorf("failed to store revision 1: %v", err) + } + + // Store main data (current version) + err = s.storeWithTTL(txn, []byte(key), storedValueData, ttl) + if err != nil { + return fmt.Errorf("failed to store main data: %v", err) + } + + // Update metadata with revision information + metadata.UpdatedAt = time.Now().Unix() + // Store revision numbers in a custom metadata field (we'll extend metadata later) + + metadataData, err = json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %v", err) + } + + return s.storeWithTTL(txn, []byte(metadataKey), metadataData, ttl) +} + +// getRevisionHistory retrieves all available revisions for a key +func (s *Server) getRevisionHistory(key string) ([]map[string]interface{}, error) { + var revisions []map[string]interface{} + + err := s.db.View(func(txn *badger.Txn) error { + // Check revisions 1, 2, 3 + for i := 1; i <= 3; i++ { + revKey := getRevisionKey(key, i) + revData, err := s.retrieveWithDecompression(txn, []byte(revKey)) + if err == badger.ErrKeyNotFound { + continue // This revision doesn't exist + } + if err != nil { + return fmt.Errorf("failed to retrieve revision %d: %v", i, err) + } + + var storedValue StoredValue + err = json.Unmarshal(revData, &storedValue) + if err != nil { + return fmt.Errorf("failed to unmarshal revision %d: %v", i, err) + } + + revision := map[string]interface{}{ + "number": i, + "uuid": storedValue.UUID, + "timestamp": storedValue.Timestamp, + } + + revisions = append(revisions, revision) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return revisions, nil +} + +// getSpecificRevision retrieves a specific revision of a key +func (s *Server) getSpecificRevision(key string, revision int) (*StoredValue, error) { + if revision < 1 || revision > 3 { + return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision) + } + + var storedValue StoredValue + + err := s.db.View(func(txn *badger.Txn) error { + revKey := getRevisionKey(key, revision) + revData, err := s.retrieveWithDecompression(txn, []byte(revKey)) + if err != nil { + return err + } + + return json.Unmarshal(revData, &storedValue) + }) + + if err != nil { + return nil, err + } + + return &storedValue, nil +} + +// Phase 2: Rate limiting utilities + +// getRateLimitKey generates the storage key for rate limiting counters +func getRateLimitKey(userUUID string, windowStart int64) string { + return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart) +} + +// getCurrentWindow calculates the current rate limiting window start time +func (s *Server) getCurrentWindow() (int64, time.Duration, error) { + windowDuration, err := time.ParseDuration(s.config.RateLimitWindow) + if err != nil { + return 0, 0, fmt.Errorf("invalid rate limit window: %v", err) + } + + now := time.Now() + windowStart := now.Truncate(windowDuration).Unix() + + return windowStart, windowDuration, nil +} + +// checkRateLimit checks if a user has exceeded the rate limit +func (s *Server) checkRateLimit(userUUID string) (bool, error) { + if s.config.RateLimitRequests <= 0 { + return true, nil // Rate limiting disabled + } + + windowStart, windowDuration, err := s.getCurrentWindow() + if err != nil { + return false, err + } + + rateLimitKey := getRateLimitKey(userUUID, windowStart) + + var currentCount int + + err = s.db.Update(func(txn *badger.Txn) error { + // Try to get current counter + item, err := txn.Get([]byte(rateLimitKey)) + if err == badger.ErrKeyNotFound { + // No counter exists, create one + currentCount = 1 + } else if err != nil { + return fmt.Errorf("failed to get rate limit counter: %v", err) + } else { + // Counter exists, increment it + err = item.Value(func(val []byte) error { + count, err := strconv.Atoi(string(val)) + if err != nil { + return fmt.Errorf("failed to parse rate limit counter: %v", err) + } + currentCount = count + 1 + return nil + }) + if err != nil { + return err + } + } + + // Store updated counter with TTL + counterData := []byte(strconv.Itoa(currentCount)) + entry := badger.NewEntry([]byte(rateLimitKey), counterData) + entry = entry.WithTTL(windowDuration) + + return txn.SetEntry(entry) + }) + + if err != nil { + return false, err + } + + // Check if rate limit is exceeded + return currentCount <= s.config.RateLimitRequests, nil +} + +// rateLimitMiddleware is the HTTP middleware that enforces rate limiting +func (s *Server) rateLimitMiddleware(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Extract auth context to get user UUID + authCtx, ok := r.Context().Value("auth").(*AuthContext) + if !ok || authCtx == nil { + // No auth context, skip rate limiting (unauthenticated requests) + next(w, r) + return + } + + // Check rate limit + allowed, err := s.checkRateLimit(authCtx.UserUUID) + if err != nil { + s.logger.WithError(err).WithField("user_uuid", authCtx.UserUUID).Error("Failed to check rate limit") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + if !allowed { + s.logger.WithFields(logrus.Fields{ + "user_uuid": authCtx.UserUUID, + "limit": s.config.RateLimitRequests, + "window": s.config.RateLimitWindow, + }).Info("Rate limit exceeded") + + // Set rate limit headers + w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests)) + w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow) + + http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests) + return + } + + next(w, r) + } +} + +// Phase 2: Tamper-evident logging utilities + +// getTamperLogKey generates the storage key for a tamper log entry +func getTamperLogKey(timestamp string, entryUUID string) string { + return fmt.Sprintf("log:%s:%s", timestamp, entryUUID) +} + +// getMerkleLogKey generates the storage key for hourly Merkle tree roots +func getMerkleLogKey(timestamp string) string { + return fmt.Sprintf("log:merkle:%s", timestamp) +} + +// generateLogSignature creates a SHA3-512 signature for a log entry +func generateLogSignature(timestamp, action, userUUID, resource string) string { + // Concatenate all fields in a deterministic order + data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource) + return hashSHA3512(data) +} + +// isActionLogged checks if a specific action should be logged +func (s *Server) isActionLogged(action string) bool { + for _, loggedAction := range s.config.TamperLogActions { + if loggedAction == action { + return true + } + } + return false +} + +// createTamperLogEntry creates a new tamper-evident log entry +func (s *Server) createTamperLogEntry(action, userUUID, resource string) *TamperLogEntry { + if !s.isActionLogged(action) { + return nil // Action not configured for logging + } + + timestamp := time.Now().UTC().Format(time.RFC3339) + signature := generateLogSignature(timestamp, action, userUUID, resource) + + return &TamperLogEntry{ + Timestamp: timestamp, + Action: action, + UserUUID: userUUID, + Resource: resource, + Signature: signature, + } +} + +// storeTamperLogEntry stores a tamper-evident log entry in BadgerDB +func (s *Server) storeTamperLogEntry(logEntry *TamperLogEntry) error { + if logEntry == nil { + return nil // No log entry to store + } + + // Generate UUID for this log entry + entryUUID := uuid.New().String() + logKey := getTamperLogKey(logEntry.Timestamp, entryUUID) + + // Marshal log entry + logData, err := json.Marshal(logEntry) + if err != nil { + return fmt.Errorf("failed to marshal log entry: %v", err) + } + + // Store log entry with compression + return s.db.Update(func(txn *badger.Txn) error { + // No TTL for log entries - they should be permanent for audit purposes + return s.storeWithTTL(txn, []byte(logKey), logData, 0) + }) +} + +// logTamperEvent logs a tamper-evident event if the action is configured for logging +func (s *Server) logTamperEvent(action, userUUID, resource string) { + logEntry := s.createTamperLogEntry(action, userUUID, resource) + if logEntry == nil { + return // Action not configured for logging + } + + err := s.storeTamperLogEntry(logEntry) + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "action": action, + "user_uuid": userUUID, + "resource": resource, + }).Error("Failed to store tamper log entry") + } else { + s.logger.WithFields(logrus.Fields{ + "action": action, + "user_uuid": userUUID, + "resource": resource, + "timestamp": logEntry.Timestamp, + }).Debug("Tamper log entry created") + } +} + +// getTamperLogs retrieves tamper log entries within a time range (for auditing) +func (s *Server) getTamperLogs(startTime, endTime time.Time, limit int) ([]*TamperLogEntry, error) { + var logEntries []*TamperLogEntry + + err := s.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = true + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte("log:") + count := 0 + + for it.Seek(prefix); it.Valid() && it.Item().KeyCopy(nil)[0:4:4][0] != 'l' || + string(it.Item().KeyCopy(nil)[0:4:4]) == "log:"; it.Next() { + + if limit > 0 && count >= limit { + break + } + + key := string(it.Item().Key()) + if !strings.HasPrefix(key, "log:") || strings.HasPrefix(key, "log:merkle:") { + continue // Skip non-log entries and merkle roots + } + + // Extract timestamp from key to filter by time range + parts := strings.Split(key, ":") + if len(parts) < 3 { + continue + } + + // Parse timestamp from key + entryTime, err := time.Parse(time.RFC3339, parts[1]) + if err != nil { + continue // Skip entries with invalid timestamps + } + + if entryTime.Before(startTime) || entryTime.After(endTime) { + continue // Skip entries outside time range + } + + // Retrieve and decompress log entry + logData, err := s.retrieveWithDecompression(txn, it.Item().Key()) + if err != nil { + continue // Skip entries that can't be read + } + + var logEntry TamperLogEntry + err = json.Unmarshal(logData, &logEntry) + if err != nil { + continue // Skip entries that can't be parsed + } + + logEntries = append(logEntries, &logEntry) + count++ + } + + return nil + }) + + return logEntries, err +} + +// Phase 2: Backup system utilities + +// getBackupFilename generates a filename for a backup +func getBackupFilename(timestamp time.Time) string { + return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02")) +} + +// createBackup creates a compressed backup of the BadgerDB database +func (s *Server) createBackup() error { + s.backupMu.Lock() + s.backupStatus.BackupsRunning++ + s.backupMu.Unlock() + + defer func() { + s.backupMu.Lock() + s.backupStatus.BackupsRunning-- + s.backupMu.Unlock() + }() + + now := time.Now() + backupFilename := getBackupFilename(now) + backupPath := filepath.Join(s.config.BackupPath, backupFilename) + + // Create backup directory if it doesn't exist + if err := os.MkdirAll(s.config.BackupPath, 0755); err != nil { + return fmt.Errorf("failed to create backup directory: %v", err) + } + + // Create temporary file for backup + tempPath := backupPath + ".tmp" + tempFile, err := os.Create(tempPath) + if err != nil { + return fmt.Errorf("failed to create temporary backup file: %v", err) + } + defer tempFile.Close() + + // Create ZSTD compressor for backup file + compressor, err := zstd.NewWriter(tempFile, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(s.config.CompressionLevel))) + if err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to create backup compressor: %v", err) + } + defer compressor.Close() + + // Create BadgerDB backup stream + since := uint64(0) // Full backup + _, err = s.db.Backup(compressor, since) + if err != nil { + compressor.Close() + tempFile.Close() + os.Remove(tempPath) + return fmt.Errorf("failed to create database backup: %v", err) + } + + // Close compressor and temp file + compressor.Close() + tempFile.Close() + + // Move temporary file to final backup path + if err := os.Rename(tempPath, backupPath); err != nil { + os.Remove(tempPath) + return fmt.Errorf("failed to finalize backup file: %v", err) + } + + // Update backup status + s.backupMu.Lock() + s.backupStatus.LastBackupTime = now.Unix() + s.backupStatus.LastBackupSuccess = true + s.backupStatus.LastBackupPath = backupPath + s.backupMu.Unlock() + + s.logger.WithFields(logrus.Fields{ + "backup_path": backupPath, + "timestamp": now.Format(time.RFC3339), + }).Info("Database backup completed successfully") + + // Clean up old backups + s.cleanupOldBackups() + + return nil +} + +// cleanupOldBackups removes backup files older than the retention period +func (s *Server) cleanupOldBackups() { + if s.config.BackupRetention <= 0 { + return // No cleanup if retention is disabled + } + + cutoffTime := time.Now().AddDate(0, 0, -s.config.BackupRetention) + + entries, err := os.ReadDir(s.config.BackupPath) + if err != nil { + s.logger.WithError(err).Error("Failed to read backup directory for cleanup") + return + } + + for _, entry := range entries { + if !strings.HasPrefix(entry.Name(), "kvs-backup-") || !strings.HasSuffix(entry.Name(), ".zstd") { + continue // Skip non-backup files + } + + info, err := entry.Info() + if err != nil { + continue + } + + if info.ModTime().Before(cutoffTime) { + backupPath := filepath.Join(s.config.BackupPath, entry.Name()) + if err := os.Remove(backupPath); err != nil { + s.logger.WithError(err).WithField("backup_path", backupPath).Warn("Failed to remove old backup") + } else { + s.logger.WithField("backup_path", backupPath).Info("Removed old backup") + } + } + } +} + +// initializeBackupScheduler sets up the cron scheduler for automated backups +func (s *Server) initializeBackupScheduler() error { + if !s.config.BackupEnabled { + s.logger.Info("Backup system disabled") + return nil + } + + s.cronScheduler = cron.New() + + _, err := s.cronScheduler.AddFunc(s.config.BackupSchedule, func() { + s.logger.Info("Starting scheduled backup") + + if err := s.createBackup(); err != nil { + s.logger.WithError(err).Error("Scheduled backup failed") + + // Update backup status on failure + s.backupMu.Lock() + s.backupStatus.LastBackupTime = time.Now().Unix() + s.backupStatus.LastBackupSuccess = false + s.backupMu.Unlock() + } + }) + + if err != nil { + return fmt.Errorf("failed to schedule backup: %v", err) + } + + s.cronScheduler.Start() + + s.logger.WithFields(logrus.Fields{ + "schedule": s.config.BackupSchedule, + "path": s.config.BackupPath, + "retention": s.config.BackupRetention, + }).Info("Backup scheduler initialized") + + return nil +} + +// getBackupStatus returns the current backup status +func (s *Server) getBackupStatus() BackupStatus { + s.backupMu.RLock() + defer s.backupMu.RUnlock() + + status := s.backupStatus + + // Calculate next backup time if scheduler is running + if s.cronScheduler != nil && len(s.cronScheduler.Entries()) > 0 { + nextRun := s.cronScheduler.Entries()[0].Next + if !nextRun.IsZero() { + status.NextBackupTime = nextRun.Unix() + } + } + + return status } // Default configuration @@ -160,6 +1465,27 @@ func defaultConfig() *Config { BootstrapMaxAgeHours: 720, // 30 days ThrottleDelayMs: 100, FetchDelayMs: 50, + + // Phase 2: Default compression settings + CompressionEnabled: true, + CompressionLevel: 3, // Balance between performance and compression ratio + + // Phase 2: Default TTL and size limit settings + DefaultTTL: "0", // No default TTL + MaxJSONSize: 1048576, // 1MB default max JSON size + + // Phase 2: Default rate limiting settings + RateLimitRequests: 100, // 100 requests per window + RateLimitWindow: "1m", // 1 minute window + + // Phase 2: Default tamper-evident logging settings + TamperLogActions: []string{"data_write", "user_create", "auth_failure"}, + + // Phase 2: Default backup system settings + BackupEnabled: true, + BackupSchedule: "0 0 * * *", // Daily at midnight + BackupPath: "./backups", + BackupRetention: 7, // Keep backups for 7 days } } @@ -250,6 +1576,38 @@ func NewServer(config *Config) (*Server, error) { server.setMerkleRoot(root) server.logger.Info("Initial Merkle tree built.") + // Phase 2: Initialize ZSTD compression if enabled + if config.CompressionEnabled { + // Validate compression level + if config.CompressionLevel < 1 || config.CompressionLevel > 19 { + config.CompressionLevel = 3 // Default to level 3 + } + + // Create encoder with specified compression level + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(config.CompressionLevel))) + if err != nil { + return nil, fmt.Errorf("failed to create ZSTD encoder: %v", err) + } + server.compressor = encoder + + // Create decoder for decompression + decoder, err := zstd.NewReader(nil) + if err != nil { + return nil, fmt.Errorf("failed to create ZSTD decoder: %v", err) + } + server.decompressor = decoder + + server.logger.WithField("compression_level", config.CompressionLevel).Info("ZSTD compression enabled") + } else { + server.logger.Info("ZSTD compression disabled") + } + + // Phase 2: Initialize backup scheduler + err = server.initializeBackupScheduler() + if err != nil { + return nil, fmt.Errorf("failed to initialize backup scheduler: %v", err) + } + return server, nil } @@ -699,6 +2057,28 @@ func (s *Server) setupRoutes() *mux.Router { router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST") router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST") // New endpoint for fetching ranges + // Phase 2: User Management endpoints + router.HandleFunc("/api/users", s.createUserHandler).Methods("POST") + router.HandleFunc("/api/users/{uuid}", s.getUserHandler).Methods("GET") + router.HandleFunc("/api/users/{uuid}", s.updateUserHandler).Methods("PUT") + router.HandleFunc("/api/users/{uuid}", s.deleteUserHandler).Methods("DELETE") + + // Phase 2: Group Management endpoints + router.HandleFunc("/api/groups", s.createGroupHandler).Methods("POST") + router.HandleFunc("/api/groups/{uuid}", s.getGroupHandler).Methods("GET") + router.HandleFunc("/api/groups/{uuid}", s.updateGroupHandler).Methods("PUT") + router.HandleFunc("/api/groups/{uuid}", s.deleteGroupHandler).Methods("DELETE") + + // Phase 2: Token Management endpoints + router.HandleFunc("/api/tokens", s.createTokenHandler).Methods("POST") + + // Phase 2: Revision History endpoints + router.HandleFunc("/api/data/{key}/history", s.getRevisionHistoryHandler).Methods("GET") + router.HandleFunc("/api/data/{key}/history/{revision}", s.getSpecificRevisionHandler).Methods("GET") + + // Phase 2: Backup Status endpoint + router.HandleFunc("/api/backup/status", s.getBackupStatusHandler).Methods("GET") + return router } @@ -1341,6 +2721,558 @@ func (s *Server) getKVRangeHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(KVRangeResponse{Pairs: pairs}) } +// Phase 2: User Management API Handlers + +// createUserHandler handles POST /api/users +func (s *Server) createUserHandler(w http.ResponseWriter, r *http.Request) { + var req CreateUserRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + if req.Nickname == "" { + http.Error(w, "Nickname is required", http.StatusBadRequest) + return + } + + // Generate UUID for the user + userUUID := uuid.New().String() + now := time.Now().Unix() + + user := User{ + UUID: userUUID, + NicknameHash: hashUserNickname(req.Nickname), + Groups: []string{}, + CreatedAt: now, + UpdatedAt: now, + } + + // Store user in BadgerDB + userData, err := json.Marshal(user) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal user data") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + err = s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(userStorageKey(userUUID)), userData) + }) + + if err != nil { + s.logger.WithError(err).Error("Failed to store user") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("user_uuid", userUUID).Info("User created successfully") + + response := CreateUserResponse{UUID: userUUID} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// getUserHandler handles GET /api/users/{uuid} +func (s *Server) getUserHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + userUUID := vars["uuid"] + + if userUUID == "" { + http.Error(w, "User UUID is required", http.StatusBadRequest) + return + } + + var user User + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(userStorageKey(userUUID))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &user) + }) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "User not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to get user") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + response := GetUserResponse{ + UUID: user.UUID, + NicknameHash: user.NicknameHash, + Groups: user.Groups, + CreatedAt: user.CreatedAt, + UpdatedAt: user.UpdatedAt, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// updateUserHandler handles PUT /api/users/{uuid} +func (s *Server) updateUserHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + userUUID := vars["uuid"] + + if userUUID == "" { + http.Error(w, "User UUID is required", http.StatusBadRequest) + return + } + + var req UpdateUserRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + err := s.db.Update(func(txn *badger.Txn) error { + // Get existing user + item, err := txn.Get([]byte(userStorageKey(userUUID))) + if err != nil { + return err + } + + var user User + err = item.Value(func(val []byte) error { + return json.Unmarshal(val, &user) + }) + if err != nil { + return err + } + + // Update fields if provided + now := time.Now().Unix() + user.UpdatedAt = now + + if req.Nickname != "" { + user.NicknameHash = hashUserNickname(req.Nickname) + } + + if req.Groups != nil { + user.Groups = req.Groups + } + + // Store updated user + userData, err := json.Marshal(user) + if err != nil { + return err + } + + return txn.Set([]byte(userStorageKey(userUUID)), userData) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "User not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to update user") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("user_uuid", userUUID).Info("User updated successfully") + w.WriteHeader(http.StatusOK) +} + +// deleteUserHandler handles DELETE /api/users/{uuid} +func (s *Server) deleteUserHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + userUUID := vars["uuid"] + + if userUUID == "" { + http.Error(w, "User UUID is required", http.StatusBadRequest) + return + } + + err := s.db.Update(func(txn *badger.Txn) error { + // Check if user exists first + _, err := txn.Get([]byte(userStorageKey(userUUID))) + if err != nil { + return err + } + + // Delete the user + return txn.Delete([]byte(userStorageKey(userUUID))) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "User not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to delete user") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("user_uuid", userUUID).Info("User deleted successfully") + w.WriteHeader(http.StatusOK) +} + +// Phase 2: Group Management API Handlers + +// createGroupHandler handles POST /api/groups +func (s *Server) createGroupHandler(w http.ResponseWriter, r *http.Request) { + var req CreateGroupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + if req.Groupname == "" { + http.Error(w, "Groupname is required", http.StatusBadRequest) + return + } + + // Generate UUID for the group + groupUUID := uuid.New().String() + now := time.Now().Unix() + + group := Group{ + UUID: groupUUID, + NameHash: hashGroupName(req.Groupname), + Members: req.Members, + CreatedAt: now, + UpdatedAt: now, + } + + if group.Members == nil { + group.Members = []string{} + } + + // Store group in BadgerDB + groupData, err := json.Marshal(group) + if err != nil { + s.logger.WithError(err).Error("Failed to marshal group data") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + err = s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(groupStorageKey(groupUUID)), groupData) + }) + + if err != nil { + s.logger.WithError(err).Error("Failed to store group") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("group_uuid", groupUUID).Info("Group created successfully") + + response := CreateGroupResponse{UUID: groupUUID} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// getGroupHandler handles GET /api/groups/{uuid} +func (s *Server) getGroupHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + groupUUID := vars["uuid"] + + if groupUUID == "" { + http.Error(w, "Group UUID is required", http.StatusBadRequest) + return + } + + var group Group + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(groupStorageKey(groupUUID))) + if err != nil { + return err + } + + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &group) + }) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "Group not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to get group") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + response := GetGroupResponse{ + UUID: group.UUID, + NameHash: group.NameHash, + Members: group.Members, + CreatedAt: group.CreatedAt, + UpdatedAt: group.UpdatedAt, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// updateGroupHandler handles PUT /api/groups/{uuid} +func (s *Server) updateGroupHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + groupUUID := vars["uuid"] + + if groupUUID == "" { + http.Error(w, "Group UUID is required", http.StatusBadRequest) + return + } + + var req UpdateGroupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + err := s.db.Update(func(txn *badger.Txn) error { + // Get existing group + item, err := txn.Get([]byte(groupStorageKey(groupUUID))) + if err != nil { + return err + } + + var group Group + err = item.Value(func(val []byte) error { + return json.Unmarshal(val, &group) + }) + if err != nil { + return err + } + + // Update fields + now := time.Now().Unix() + group.UpdatedAt = now + group.Members = req.Members + + if group.Members == nil { + group.Members = []string{} + } + + // Store updated group + groupData, err := json.Marshal(group) + if err != nil { + return err + } + + return txn.Set([]byte(groupStorageKey(groupUUID)), groupData) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "Group not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to update group") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("group_uuid", groupUUID).Info("Group updated successfully") + w.WriteHeader(http.StatusOK) +} + +// deleteGroupHandler handles DELETE /api/groups/{uuid} +func (s *Server) deleteGroupHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + groupUUID := vars["uuid"] + + if groupUUID == "" { + http.Error(w, "Group UUID is required", http.StatusBadRequest) + return + } + + err := s.db.Update(func(txn *badger.Txn) error { + // Check if group exists first + _, err := txn.Get([]byte(groupStorageKey(groupUUID))) + if err != nil { + return err + } + + // Delete the group + return txn.Delete([]byte(groupStorageKey(groupUUID))) + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "Group not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to delete group") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithField("group_uuid", groupUUID).Info("Group deleted successfully") + w.WriteHeader(http.StatusOK) +} + +// Phase 2: Token Management API Handlers + +// createTokenHandler handles POST /api/tokens +func (s *Server) createTokenHandler(w http.ResponseWriter, r *http.Request) { + var req CreateTokenRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Bad Request", http.StatusBadRequest) + return + } + + if req.UserUUID == "" { + http.Error(w, "User UUID is required", http.StatusBadRequest) + return + } + + if len(req.Scopes) == 0 { + http.Error(w, "At least one scope is required", http.StatusBadRequest) + return + } + + // Verify user exists + err := s.db.View(func(txn *badger.Txn) error { + _, err := txn.Get([]byte(userStorageKey(req.UserUUID))) + return err + }) + + if err == badger.ErrKeyNotFound { + http.Error(w, "User not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).Error("Failed to verify user") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Generate JWT token + tokenString, expiresAt, err := generateJWT(req.UserUUID, req.Scopes, 1) // 1 hour default + if err != nil { + s.logger.WithError(err).Error("Failed to generate JWT token") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Store token in BadgerDB + err = s.storeAPIToken(tokenString, req.UserUUID, req.Scopes, expiresAt) + if err != nil { + s.logger.WithError(err).Error("Failed to store API token") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + s.logger.WithFields(logrus.Fields{ + "user_uuid": req.UserUUID, + "scopes": req.Scopes, + "expires_at": expiresAt, + }).Info("API token created successfully") + + response := CreateTokenResponse{ + Token: tokenString, + ExpiresAt: expiresAt, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// Phase 2: Revision History API Handlers + +// getRevisionHistoryHandler handles GET /api/data/{key}/history +func (s *Server) getRevisionHistoryHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + key := vars["key"] + + if key == "" { + http.Error(w, "Key is required", http.StatusBadRequest) + return + } + + revisions, err := s.getRevisionHistory(key) + if err != nil { + s.logger.WithError(err).WithField("key", key).Error("Failed to get revision history") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + if len(revisions) == 0 { + http.Error(w, "No revisions found", http.StatusNotFound) + return + } + + response := map[string]interface{}{ + "revisions": revisions, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +// getSpecificRevisionHandler handles GET /api/data/{key}/history/{revision} +func (s *Server) getSpecificRevisionHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + key := vars["key"] + revisionStr := vars["revision"] + + if key == "" { + http.Error(w, "Key is required", http.StatusBadRequest) + return + } + + if revisionStr == "" { + http.Error(w, "Revision is required", http.StatusBadRequest) + return + } + + revision, err := strconv.Atoi(revisionStr) + if err != nil { + http.Error(w, "Invalid revision number", http.StatusBadRequest) + return + } + + storedValue, err := s.getSpecificRevision(key, revision) + if err == badger.ErrKeyNotFound { + http.Error(w, "Revision not found", http.StatusNotFound) + return + } + + if err != nil { + s.logger.WithError(err).WithFields(logrus.Fields{ + "key": key, + "revision": revision, + }).Error("Failed to get specific revision") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(storedValue) +} + +// Phase 2: Backup Status API Handler + +// getBackupStatusHandler handles GET /api/backup/status +func (s *Server) getBackupStatusHandler(w http.ResponseWriter, r *http.Request) { + status := s.getBackupStatus() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(status) +} + // performMerkleSync performs a synchronization round using Merkle Trees func (s *Server) performMerkleSync() { members := s.getHealthyMembers() diff --git a/next_steps.md b/next_steps.md new file mode 100644 index 0000000..1735324 --- /dev/null +++ b/next_steps.md @@ -0,0 +1,291 @@ +# KVS Development Phase 2: Implementation Specification + +## Executive Summary + +This document specifies the next development phase for the KVS (Key-Value Store) distributed database. Phase 2 adds authentication, authorization, data management improvements, and basic security features while maintaining backward compatibility with the existing Merkle tree-based replication system. + +## 1. Authentication & Authorization System + +### 1.1 Core Components + +**Users** +- Identified by UUID (generated server-side) +- Nickname stored as SHA3-512 hash +- Can belong to multiple groups +- Storage key: `user:` + +**Groups** +- Identified by UUID (generated server-side) +- Group name stored as SHA3-512 hash +- Contains list of member user UUIDs +- Storage key: `group:` + +**API Tokens** +- JWT tokens with SHA3-512 hashed storage +- 1-hour default expiration (configurable) +- Storage key: `token:` + +### 1.2 Permission Model + +**POSIX-inspired ACL framework** with 12-bit permissions: +- 4 bits each for Owner/Group/Others +- Operations: Create(C), Delete(D), Write(W), Read(R) +- Default permissions: Owner(1111), Group(0110), Others(0010) +- Stored as integer bitmask in resource metadata + +**Resource Metadata Schema**: +```json +{ + "owner_uuid": "string", + "group_uuid": "string", + "permissions": 3826, // 12-bit integer + "ttl": "24h" +} +``` + +### 1.3 API Endpoints + +**User Management** +``` +POST /api/users + Body: {"nickname": "string"} + Returns: {"uuid": "string"} + +GET /api/users/{uuid} +PUT /api/users/{uuid} + Body: {"nickname": "string", "groups": ["uuid1", "uuid2"]} +DELETE /api/users/{uuid} +``` + +**Group Management** +``` +POST /api/groups + Body: {"groupname": "string", "members": ["uuid1", "uuid2"]} + Returns: {"uuid": "string"} + +GET /api/groups/{uuid} +PUT /api/groups/{uuid} + Body: {"members": ["uuid1", "uuid2"]} +DELETE /api/groups/{uuid} +``` + +**Token Management** +``` +POST /api/tokens + Body: {"user_uuid": "string", "scopes": ["read", "write"]} + Returns: {"token": "jwt-string", "expires_at": "timestamp"} +``` + +All endpoints require `Authorization: Bearer ` header. + +### 1.4 Implementation Requirements + +- Use `golang.org/x/crypto/sha3` for all hashing +- Store token SHA3-512 hash in BadgerDB with TTL +- Implement `CheckPermission(userUUID, resourceKey, operation) bool` function +- Include user/group data in existing Merkle tree replication +- Create migration script for existing data (add default metadata) + +## 2. Database Enhancements + +### 2.1 ZSTD Compression + +**Configuration**: +```yaml +database: + compression_enabled: true + compression_level: 3 # 1-19, balance performance/ratio +``` + +**Implementation**: +- Use `github.com/klauspost/compress/zstd` +- Compress all JSON values before BadgerDB storage +- Decompress on read operations +- Optional: Batch recompression of existing data on startup + +### 2.2 TTL (Time-To-Live) + +**Features**: +- Per-key TTL support via resource metadata +- Global default TTL configuration (optional) +- Automatic expiration via BadgerDB's native TTL +- TTL applied to main data and revision keys + +**API Integration**: +```json +// In PUT/POST requests +{ + "data": {...}, + "ttl": "24h" // Go duration format +} +``` + +### 2.3 Revision History + +**Storage Pattern**: +- Main data: `data:` +- Revisions: `data::rev:1`, `data::rev:2`, `data::rev:3` +- Metadata: `data::metadata` includes `"revisions": [1,2,3]` + +**Rotation Logic**: +- On write: rev:1→rev:2, rev:2→rev:3, new→rev:1, delete rev:3 +- Store up to 3 revisions per key + +**API Endpoints**: +``` +GET /api/data/{key}/history + Returns: {"revisions": [{"number": 1, "timestamp": "..."}]} + +GET /api/data/{key}/history/{revision} + Returns: StoredValue for specific revision +``` + +### 2.4 Backup System + +**Configuration**: +```yaml +backups: + enabled: true + schedule: "0 0 * * *" # Daily midnight + path: "/backups" + retention: 7 # days +``` + +**Implementation**: +- Use `github.com/robfig/cron/v3` for scheduling +- Create ZSTD-compressed BadgerDB snapshots +- Filename format: `kvs-backup-YYYY-MM-DD.zstd` +- Automatic cleanup of old backups +- Status API: `GET /api/backup/status` + +### 2.5 JSON Size Limits + +**Configuration**: +```yaml +database: + max_json_size: 1048576 # 1MB default +``` + +**Implementation**: +- Check size before compression/storage +- Return HTTP 413 if exceeded +- Apply to main data and revisions +- Log oversized attempts + +## 3. Security Features + +### 3.1 Rate Limiting + +**Configuration**: +```yaml +rate_limit: + requests: 100 + window: "1m" +``` + +**Implementation**: +- Per-user rate limiting using BadgerDB counters +- Key pattern: `ratelimit::` +- Return HTTP 429 when limit exceeded +- Counters have TTL equal to window duration + +### 3.2 Tamper-Evident Logs + +**Log Entry Schema**: +```json +{ + "timestamp": "2025-09-11T17:29:00Z", + "action": "data_write", // Configurable actions + "user_uuid": "string", + "resource": "string", + "signature": "sha3-512 hash" // Hash of all fields +} +``` + +**Storage**: +- Key: `log::` +- Compressed with ZSTD +- Hourly Merkle tree roots: `log:merkle:` +- Include in cluster replication + +**Configurable Actions**: +```yaml +tamper_logs: + actions: ["data_write", "user_create", "auth_failure"] +``` + +## 4. Implementation Phases + +### Phase 2.1: Core Authentication +1. Implement user/group storage schema +2. Add SHA3-512 hashing utilities +3. Create basic CRUD APIs for users/groups +4. Implement JWT token generation/validation +5. Add authorization middleware + +### Phase 2.2: Data Features +1. Add ZSTD compression to BadgerDB operations +2. Implement TTL support in resource metadata +3. Build revision history system +4. Add JSON size validation + +### Phase 2.3: Security & Operations +1. Implement rate limiting middleware +2. Add tamper-evident logging system +3. Build backup scheduling system +4. Create migration scripts for existing data + +### Phase 2.4: Integration & Testing +1. Integrate auth with existing replication +2. End-to-end testing of all features +3. Performance benchmarking +4. Documentation updates + +## 5. Configuration Example + +```yaml +node_id: "node1" +bind_address: "127.0.0.1" +port: 8080 +data_dir: "./data" + +database: + compression_enabled: true + compression_level: 3 + max_json_size: 1048576 + default_ttl: "0" # No default TTL + +backups: + enabled: true + schedule: "0 0 * * *" + path: "/backups" + retention: 7 + +rate_limit: + requests: 100 + window: "1m" + +tamper_logs: + actions: ["data_write", "user_create", "auth_failure"] +``` + +## 6. Migration Strategy + +1. **Backward Compatibility**: All existing APIs remain functional +2. **Optional Features**: New features can be disabled via configuration + + +## 7. Dependencies + +**New Libraries**: +- `golang.org/x/crypto/sha3` - SHA3-512 hashing +- `github.com/klauspost/compress/zstd` - Compression +- `github.com/robfig/cron/v3` - Backup scheduling +- `github.com/golang-jwt/jwt/v4` - JWT tokens (recommended) + +**Existing Libraries** (no changes): +- `github.com/dgraph-io/badger/v4` +- `github.com/google/uuid` +- `github.com/gorilla/mux` +- `github.com/sirupsen/logrus` +