forked from ryyst/kalzu-value-store
Compare commits
3 Commits
kalzu/issu
...
metadata-a
Author | SHA1 | Date | |
---|---|---|---|
377af163f0 | |||
852275945c | |||
c7dcebb894 |
108
auth/auth.go
108
auth/auth.go
@@ -41,7 +41,7 @@ func NewAuthService(db *badger.DB, logger *logrus.Logger, config *types.Config)
|
|||||||
// StoreAPIToken stores an API token in BadgerDB with TTL
|
// StoreAPIToken stores an API token in BadgerDB with TTL
|
||||||
func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error {
|
func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes []string, expiresAt int64) error {
|
||||||
tokenHash := utils.HashToken(tokenString)
|
tokenHash := utils.HashToken(tokenString)
|
||||||
|
|
||||||
apiToken := types.APIToken{
|
apiToken := types.APIToken{
|
||||||
TokenHash: tokenHash,
|
TokenHash: tokenHash,
|
||||||
UserUUID: userUUID,
|
UserUUID: userUUID,
|
||||||
@@ -57,13 +57,13 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes
|
|||||||
|
|
||||||
return s.db.Update(func(txn *badger.Txn) error {
|
return s.db.Update(func(txn *badger.Txn) error {
|
||||||
entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData)
|
entry := badger.NewEntry([]byte(TokenStorageKey(tokenHash)), tokenData)
|
||||||
|
|
||||||
// Set TTL to the token expiration time
|
// Set TTL to the token expiration time
|
||||||
ttl := time.Until(time.Unix(expiresAt, 0))
|
ttl := time.Until(time.Unix(expiresAt, 0))
|
||||||
if ttl > 0 {
|
if ttl > 0 {
|
||||||
entry = entry.WithTTL(ttl)
|
entry = entry.WithTTL(ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
return txn.SetEntry(entry)
|
return txn.SetEntry(entry)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -71,7 +71,7 @@ func (s *AuthService) StoreAPIToken(tokenString string, userUUID string, scopes
|
|||||||
// GetAPIToken retrieves an API token from BadgerDB by hash
|
// GetAPIToken retrieves an API token from BadgerDB by hash
|
||||||
func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) {
|
func (s *AuthService) GetAPIToken(tokenHash string) (*types.APIToken, error) {
|
||||||
var apiToken types.APIToken
|
var apiToken types.APIToken
|
||||||
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
err := s.db.View(func(txn *badger.Txn) error {
|
||||||
item, err := txn.Get([]byte(TokenStorageKey(tokenHash)))
|
item, err := txn.Get([]byte(TokenStorageKey(tokenHash)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -198,61 +198,12 @@ func (s *AuthService) CheckResourcePermission(authCtx *AuthContext, resourceKey
|
|||||||
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
|
return CheckPermission(metadata.Permissions, operation, isOwner, isGroupMember)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAuthContext retrieves auth context from request context
|
// GetResourceMetadata retrieves metadata for a resource
|
||||||
func GetAuthContext(ctx context.Context) *AuthContext {
|
func (s *AuthService) GetResourceMetadata(resourceKey string) (*types.ResourceMetadata, error) {
|
||||||
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {
|
|
||||||
return authCtx
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasUsers checks if any users exist in the database
|
|
||||||
func (s *AuthService) HasUsers() (bool, error) {
|
|
||||||
var hasUsers bool
|
|
||||||
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
|
||||||
opts := badger.DefaultIteratorOptions
|
|
||||||
opts.PrefetchValues = false // We only need to check if keys exist
|
|
||||||
iterator := txn.NewIterator(opts)
|
|
||||||
defer iterator.Close()
|
|
||||||
|
|
||||||
// Look for any key starting with "user:"
|
|
||||||
prefix := []byte("user:")
|
|
||||||
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
|
|
||||||
hasUsers = true
|
|
||||||
return nil // Found at least one user, can exit early
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return hasUsers, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// StoreResourceMetadata stores or updates resource metadata in BadgerDB
|
|
||||||
func (s *AuthService) StoreResourceMetadata(path string, metadata *types.ResourceMetadata) error {
|
|
||||||
now := time.Now().Unix()
|
|
||||||
if metadata.CreatedAt == 0 {
|
|
||||||
metadata.CreatedAt = now
|
|
||||||
}
|
|
||||||
metadata.UpdatedAt = now
|
|
||||||
|
|
||||||
metadataData, err := json.Marshal(metadata)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.db.Update(func(txn *badger.Txn) error {
|
|
||||||
return txn.Set([]byte(ResourceMetadataKey(path)), metadataData)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetResourceMetadata retrieves resource metadata from BadgerDB
|
|
||||||
func (s *AuthService) GetResourceMetadata(path string) (*types.ResourceMetadata, error) {
|
|
||||||
var metadata types.ResourceMetadata
|
var metadata types.ResourceMetadata
|
||||||
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
err := s.db.View(func(txn *badger.Txn) error {
|
||||||
item, err := txn.Get([]byte(ResourceMetadataKey(path)))
|
item, err := txn.Get([]byte(ResourceMetadataKey(resourceKey)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -268,3 +219,46 @@ func (s *AuthService) GetResourceMetadata(path string) (*types.ResourceMetadata,
|
|||||||
|
|
||||||
return &metadata, nil
|
return &metadata, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetResourceMetadata stores metadata for a resource
|
||||||
|
func (s *AuthService) SetResourceMetadata(resourceKey string, metadata *types.ResourceMetadata) error {
|
||||||
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal metadata: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.db.Update(func(txn *badger.Txn) error {
|
||||||
|
return txn.Set([]byte(ResourceMetadataKey(resourceKey)), metadataBytes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAuthContext retrieves auth context from request context
|
||||||
|
func GetAuthContext(ctx context.Context) *AuthContext {
|
||||||
|
if authCtx, ok := ctx.Value("auth").(*AuthContext); ok {
|
||||||
|
return authCtx
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasUsers checks if any users exist in the database
|
||||||
|
func (s *AuthService) HasUsers() (bool, error) {
|
||||||
|
var hasUsers bool
|
||||||
|
|
||||||
|
err := s.db.View(func(txn *badger.Txn) error {
|
||||||
|
opts := badger.DefaultIteratorOptions
|
||||||
|
opts.PrefetchValues = false // We only need to check if keys exist
|
||||||
|
iterator := txn.NewIterator(opts)
|
||||||
|
defer iterator.Close()
|
||||||
|
|
||||||
|
// Look for any key starting with "user:"
|
||||||
|
prefix := []byte("user:")
|
||||||
|
for iterator.Seek(prefix); iterator.ValidForPrefix(prefix); iterator.Next() {
|
||||||
|
hasUsers = true
|
||||||
|
return nil // Found at least one user, can exit early
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return hasUsers, err
|
||||||
|
}
|
||||||
|
77
auth/cluster.go
Normal file
77
auth/cluster.go
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package auth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClusterAuthService handles authentication for inter-cluster communication
|
||||||
|
type ClusterAuthService struct {
|
||||||
|
clusterSecret string
|
||||||
|
logger *logrus.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClusterAuthService creates a new cluster authentication service
|
||||||
|
func NewClusterAuthService(clusterSecret string, logger *logrus.Logger) *ClusterAuthService {
|
||||||
|
return &ClusterAuthService{
|
||||||
|
clusterSecret: clusterSecret,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Middleware validates cluster authentication headers
|
||||||
|
func (s *ClusterAuthService) Middleware(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Extract authentication headers
|
||||||
|
clusterSecret := r.Header.Get("X-Cluster-Secret")
|
||||||
|
nodeID := r.Header.Get("X-Node-ID")
|
||||||
|
|
||||||
|
// Log authentication attempt
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
"method": r.Method,
|
||||||
|
}).Debug("Cluster authentication attempt")
|
||||||
|
|
||||||
|
// Validate cluster secret
|
||||||
|
if clusterSecret == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Cluster-Secret header")
|
||||||
|
http.Error(w, "Unauthorized: Missing cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if clusterSecret != s.clusterSecret {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Invalid cluster secret")
|
||||||
|
http.Error(w, "Unauthorized: Invalid cluster secret", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate node ID is present
|
||||||
|
if nodeID == "" {
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"remote_addr": r.RemoteAddr,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Warn("Missing X-Node-ID header")
|
||||||
|
http.Error(w, "Unauthorized: Missing node ID", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Authentication successful
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"node_id": nodeID,
|
||||||
|
"path": r.URL.Path,
|
||||||
|
}).Debug("Cluster authentication successful")
|
||||||
|
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
@@ -64,4 +64,4 @@ func ValidateJWT(tokenString string) (*JWTClaims, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("invalid token")
|
return nil, fmt.Errorf("invalid token")
|
||||||
}
|
}
|
||||||
|
@@ -33,7 +33,7 @@ func (s *AuthService) Middleware(requiredScopes []string, resourceKeyExtractor f
|
|||||||
next(w, r)
|
next(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Authenticate request
|
// Authenticate request
|
||||||
authCtx, err := s.AuthenticateRequest(r)
|
authCtx, err := s.AuthenticateRequest(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -102,7 +102,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
|||||||
next(w, r)
|
next(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract auth context to get user UUID
|
// Extract auth context to get user UUID
|
||||||
authCtx := GetAuthContext(r.Context())
|
authCtx := GetAuthContext(r.Context())
|
||||||
if authCtx == nil {
|
if authCtx == nil {
|
||||||
@@ -110,7 +110,7 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
|||||||
next(w, r)
|
next(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check rate limit
|
// Check rate limit
|
||||||
allowed, err := s.checkRateLimit(authCtx.UserUUID)
|
allowed, err := s.checkRateLimit(authCtx.UserUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -118,22 +118,22 @@ func (s *RateLimitService) RateLimitMiddleware(next http.HandlerFunc) http.Handl
|
|||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !allowed {
|
if !allowed {
|
||||||
s.authService.logger.WithFields(logrus.Fields{
|
s.authService.logger.WithFields(logrus.Fields{
|
||||||
"user_uuid": authCtx.UserUUID,
|
"user_uuid": authCtx.UserUUID,
|
||||||
"limit": s.config.RateLimitRequests,
|
"limit": s.config.RateLimitRequests,
|
||||||
"window": s.config.RateLimitWindow,
|
"window": s.config.RateLimitWindow,
|
||||||
}).Info("Rate limit exceeded")
|
}).Info("Rate limit exceeded")
|
||||||
|
|
||||||
// Set rate limit headers
|
// Set rate limit headers
|
||||||
w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests))
|
w.Header().Set("X-Rate-Limit-Limit", strconv.Itoa(s.config.RateLimitRequests))
|
||||||
w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow)
|
w.Header().Set("X-Rate-Limit-Window", s.config.RateLimitWindow)
|
||||||
|
|
||||||
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
|
http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
next(w, r)
|
next(w, r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -151,8 +151,8 @@ func (s *RateLimitService) checkRateLimit(userUUID string) (bool, error) {
|
|||||||
if s.config.RateLimitRequests <= 0 {
|
if s.config.RateLimitRequests <= 0 {
|
||||||
return true, nil // Rate limiting disabled
|
return true, nil // Rate limiting disabled
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simplified rate limiting - in practice this would use the full implementation
|
// Simplified rate limiting - in practice this would use the full implementation
|
||||||
// that was in main.go with proper window calculations and BadgerDB storage
|
// that was in main.go with proper window calculations and BadgerDB storage
|
||||||
return true, nil // For now, always allow
|
return true, nil // For now, always allow
|
||||||
}
|
}
|
||||||
|
@@ -15,7 +15,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
|||||||
return (permissions & types.PermGroupCreate) != 0
|
return (permissions & types.PermGroupCreate) != 0
|
||||||
}
|
}
|
||||||
return (permissions & types.PermOthersCreate) != 0
|
return (permissions & types.PermOthersCreate) != 0
|
||||||
|
|
||||||
case "delete":
|
case "delete":
|
||||||
if isOwner {
|
if isOwner {
|
||||||
return (permissions & types.PermOwnerDelete) != 0
|
return (permissions & types.PermOwnerDelete) != 0
|
||||||
@@ -24,7 +24,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
|||||||
return (permissions & types.PermGroupDelete) != 0
|
return (permissions & types.PermGroupDelete) != 0
|
||||||
}
|
}
|
||||||
return (permissions & types.PermOthersDelete) != 0
|
return (permissions & types.PermOthersDelete) != 0
|
||||||
|
|
||||||
case "write":
|
case "write":
|
||||||
if isOwner {
|
if isOwner {
|
||||||
return (permissions & types.PermOwnerWrite) != 0
|
return (permissions & types.PermOwnerWrite) != 0
|
||||||
@@ -33,7 +33,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
|||||||
return (permissions & types.PermGroupWrite) != 0
|
return (permissions & types.PermGroupWrite) != 0
|
||||||
}
|
}
|
||||||
return (permissions & types.PermOthersWrite) != 0
|
return (permissions & types.PermOthersWrite) != 0
|
||||||
|
|
||||||
case "read":
|
case "read":
|
||||||
if isOwner {
|
if isOwner {
|
||||||
return (permissions & types.PermOwnerRead) != 0
|
return (permissions & types.PermOwnerRead) != 0
|
||||||
@@ -42,7 +42,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
|||||||
return (permissions & types.PermGroupRead) != 0
|
return (permissions & types.PermGroupRead) != 0
|
||||||
}
|
}
|
||||||
return (permissions & types.PermOthersRead) != 0
|
return (permissions & types.PermOthersRead) != 0
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -51,7 +51,7 @@ func CheckPermission(permissions int, operation string, isOwner, isGroupMember b
|
|||||||
// CheckUserResourceRelationship determines user relationship to resource
|
// CheckUserResourceRelationship determines user relationship to resource
|
||||||
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
|
func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMetadata, userGroups []string) (isOwner, isGroupMember bool) {
|
||||||
isOwner = (userUUID == metadata.OwnerUUID)
|
isOwner = (userUUID == metadata.OwnerUUID)
|
||||||
|
|
||||||
if metadata.GroupUUID != "" {
|
if metadata.GroupUUID != "" {
|
||||||
for _, groupUUID := range userGroups {
|
for _, groupUUID := range userGroups {
|
||||||
if groupUUID == metadata.GroupUUID {
|
if groupUUID == metadata.GroupUUID {
|
||||||
@@ -60,6 +60,6 @@ func CheckUserResourceRelationship(userUUID string, metadata *types.ResourceMeta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return isOwner, isGroupMember
|
return isOwner, isGroupMember
|
||||||
}
|
}
|
||||||
|
@@ -16,4 +16,4 @@ func TokenStorageKey(tokenHash string) string {
|
|||||||
|
|
||||||
func ResourceMetadataKey(resourceKey string) string {
|
func ResourceMetadataKey(resourceKey string) string {
|
||||||
return resourceKey + ":metadata"
|
return resourceKey + ":metadata"
|
||||||
}
|
}
|
||||||
|
@@ -82,10 +82,19 @@ func (s *BootstrapService) attemptJoin(seedAddr string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/join", seedAddr)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/join", protocol, seedAddr)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create join request")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"seed": seedAddr,
|
"seed": seedAddr,
|
||||||
@@ -142,4 +151,4 @@ func (s *BootstrapService) performGradualSync() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.logger.Info("Gradual sync completed")
|
s.logger.Info("Gradual sync completed")
|
||||||
}
|
}
|
||||||
|
@@ -17,13 +17,13 @@ import (
|
|||||||
|
|
||||||
// GossipService handles gossip protocol operations
|
// GossipService handles gossip protocol operations
|
||||||
type GossipService struct {
|
type GossipService struct {
|
||||||
config *types.Config
|
config *types.Config
|
||||||
members map[string]*types.Member
|
members map[string]*types.Member
|
||||||
membersMu sync.RWMutex
|
membersMu sync.RWMutex
|
||||||
logger *logrus.Logger
|
logger *logrus.Logger
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGossipService creates a new gossip service
|
// NewGossipService creates a new gossip service
|
||||||
@@ -44,7 +44,7 @@ func (s *GossipService) Start() {
|
|||||||
s.logger.Info("Clustering disabled, skipping gossip routine")
|
s.logger.Info("Clustering disabled, skipping gossip routine")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.gossipRoutine()
|
go s.gossipRoutine()
|
||||||
}
|
}
|
||||||
@@ -181,11 +181,20 @@ func (s *GossipService) gossipWithPeer(peer *types.Member) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send HTTP request to peer
|
// Send HTTP request to peer with cluster authentication
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/members/gossip", peer.Address)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/members/gossip", protocol, peer.Address)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).Error("Failed to create gossip request")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"peer": peer.Address,
|
"peer": peer.Address,
|
||||||
@@ -300,4 +309,4 @@ func (s *GossipService) MergeMemberList(remoteMembers []types.Member, selfNodeID
|
|||||||
func (s *GossipService) GetJoinedTimestamp() int64 {
|
func (s *GossipService) GetJoinedTimestamp() int64 {
|
||||||
// This should be implemented by the server that uses this service
|
// This should be implemented by the server that uses this service
|
||||||
return time.Now().UnixMilli()
|
return time.Now().UnixMilli()
|
||||||
}
|
}
|
||||||
|
43
cluster/http_client.go
Normal file
43
cluster/http_client.go
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"kvs/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewAuthenticatedHTTPClient creates an HTTP client configured for cluster authentication
|
||||||
|
func NewAuthenticatedHTTPClient(config *types.Config, timeout time.Duration) *http.Client {
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure TLS if enabled
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: config.ClusterTLSSkipVerify,
|
||||||
|
}
|
||||||
|
|
||||||
|
client.Transport = &http.Transport{
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddClusterAuthHeaders adds authentication headers to an HTTP request
|
||||||
|
func AddClusterAuthHeaders(req *http.Request, config *types.Config) {
|
||||||
|
req.Header.Set("X-Cluster-Secret", config.ClusterSecret)
|
||||||
|
req.Header.Set("X-Node-ID", config.NodeID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetProtocol returns the appropriate protocol (http or https) based on TLS configuration
|
||||||
|
func GetProtocol(config *types.Config) string {
|
||||||
|
if config.ClusterTLSEnabled {
|
||||||
|
return "https"
|
||||||
|
}
|
||||||
|
return "http"
|
||||||
|
}
|
@@ -170,162 +170,7 @@ func (s *MerkleService) BuildSubtreeForRange(startKey, endKey string) (*types.Me
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err)
|
return nil, fmt.Errorf("failed to get KV pairs for subtree: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
||||||
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
return s.BuildMerkleTreeFromPairs(filteredPairs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetKeysInRange retrieves all keys within a given range using the Merkle tree
|
|
||||||
// This traverses the tree to find leaf nodes in the range without loading full values
|
|
||||||
func (s *MerkleService) GetKeysInRange(startKey, endKey string, limit int) ([]string, error) {
|
|
||||||
pairs, err := s.GetAllKVPairsForMerkleTree()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
filteredPairs := FilterPairsByRange(pairs, startKey, endKey)
|
|
||||||
keys := make([]string, 0, len(filteredPairs))
|
|
||||||
for k := range filteredPairs {
|
|
||||||
keys = append(keys, k)
|
|
||||||
}
|
|
||||||
sort.Strings(keys)
|
|
||||||
|
|
||||||
if limit > 0 && len(keys) > limit {
|
|
||||||
keys = keys[:limit]
|
|
||||||
return keys, nil // Note: Truncation handled in handler
|
|
||||||
}
|
|
||||||
|
|
||||||
return keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetKeysInPrefix retrieves keys that match a prefix (for _ls)
|
|
||||||
func (s *MerkleService) GetKeysInPrefix(prefix string, limit int) ([]string, error) {
|
|
||||||
// Compute endKey as the next lexicographical prefix
|
|
||||||
endKey := prefix + "~" // Simple sentinel for prefix range [prefix, prefix~]
|
|
||||||
|
|
||||||
keys, err := s.GetKeysInRange(prefix, endKey, limit)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter to direct children only (strip prefix and ensure no deeper nesting)
|
|
||||||
directChildren := make([]string, 0, len(keys))
|
|
||||||
for _, key := range keys {
|
|
||||||
if strings.HasPrefix(key, prefix) {
|
|
||||||
subpath := strings.TrimPrefix(key, prefix)
|
|
||||||
if subpath != "" && !strings.Contains(subpath, "/") { // Direct child: no further "/"
|
|
||||||
directChildren = append(directChildren, subpath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Strings(directChildren)
|
|
||||||
|
|
||||||
if limit > 0 && len(directChildren) > limit {
|
|
||||||
directChildren = directChildren[:limit]
|
|
||||||
}
|
|
||||||
|
|
||||||
return directChildren, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetTreeForPrefix builds a recursive tree for a prefix
|
|
||||||
func (s *MerkleService) GetTreeForPrefix(prefix string, maxDepth int, limit int) (*KeyTreeResponse, error) {
|
|
||||||
if maxDepth <= 0 {
|
|
||||||
maxDepth = 5 // Default safety limit
|
|
||||||
}
|
|
||||||
|
|
||||||
tree := &KeyTreeResponse{
|
|
||||||
Path: prefix,
|
|
||||||
}
|
|
||||||
|
|
||||||
var buildTree func(string, int) error
|
|
||||||
var total int
|
|
||||||
|
|
||||||
buildTree = func(currentPrefix string, depth int) error {
|
|
||||||
if depth > maxDepth || total >= limit {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get direct children
|
|
||||||
childrenKeys, err := s.GetKeysInPrefix(currentPrefix, limit-total)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeChildren := make([]interface{}, 0, len(childrenKeys))
|
|
||||||
for _, subkey := range childrenKeys {
|
|
||||||
total++
|
|
||||||
if total >= limit {
|
|
||||||
tree.Truncated = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fullKey := currentPrefix + subkey
|
|
||||||
// Get timestamp for this key
|
|
||||||
timestamp, err := s.getTimestampForKey(fullKey)
|
|
||||||
if err != nil {
|
|
||||||
timestamp = 0 // Fallback
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if this has children (simple check: query subprefix)
|
|
||||||
subPrefix := fullKey + "/"
|
|
||||||
subChildrenKeys, _ := s.GetKeysInPrefix(subPrefix, 1) // Probe for existence
|
|
||||||
|
|
||||||
if len(subChildrenKeys) > 0 && depth < maxDepth {
|
|
||||||
// Recursive node
|
|
||||||
subTree := &KeyTreeNode{
|
|
||||||
Subkey: subkey,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
}
|
|
||||||
if err := buildTree(subPrefix, depth+1); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
subTree.Children = tree.Children // Wait, no: this is wrong, need to set properly
|
|
||||||
// Actually, since buildTree populates the parent, but wait - restructure
|
|
||||||
|
|
||||||
// Better: populate subTree.Children here
|
|
||||||
// But to avoid deep recursion, limit probes
|
|
||||||
nodeChildren = append(nodeChildren, subTree)
|
|
||||||
} else {
|
|
||||||
// Leaf
|
|
||||||
nodeChildren = append(nodeChildren, &KeyListItem{
|
|
||||||
Subkey: subkey,
|
|
||||||
Timestamp: timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now set to parent - but since recursive, need to return the list
|
|
||||||
// Refactor: make buildTree return the children list
|
|
||||||
return nil // Simplified for now; implement iteratively if needed
|
|
||||||
}
|
|
||||||
|
|
||||||
err := buildTree(prefix, 1)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
tree.Total = total
|
|
||||||
return tree, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to get timestamp for a key
|
|
||||||
func (s *MerkleService) getTimestampForKey(key string) (int64, error) {
|
|
||||||
var timestamp int64
|
|
||||||
err := s.db.View(func(txn *badger.Txn) error {
|
|
||||||
item, err := txn.Get([]byte(key))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var storedValue types.StoredValue
|
|
||||||
return item.Value(func(val []byte) error {
|
|
||||||
return json.Unmarshal(val, &storedValue)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return storedValue.Timestamp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note: The recursive implementation above has a bug in populating children.
|
|
||||||
// For production, implement iteratively with a stack to build the tree structure.
|
|
||||||
|
112
cluster/sync.go
112
cluster/sync.go
@@ -51,11 +51,11 @@ func (s *SyncService) Start() {
|
|||||||
s.logger.Info("Clustering disabled, skipping sync routines")
|
s.logger.Info("Clustering disabled, skipping sync routines")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start sync routine
|
// Start sync routine
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.syncRoutine()
|
go s.syncRoutine()
|
||||||
|
|
||||||
// Start Merkle tree rebuild routine
|
// Start Merkle tree rebuild routine
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.merkleTreeRebuildRoutine()
|
go s.merkleTreeRebuildRoutine()
|
||||||
@@ -172,9 +172,9 @@ func (s *SyncService) performMerkleSync() {
|
|||||||
// 2. Compare roots and start recursive diffing if they differ
|
// 2. Compare roots and start recursive diffing if they differ
|
||||||
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
if !bytes.Equal(localRoot.Hash, remoteRoot.Hash) {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"peer": peer.Address,
|
"peer": peer.Address,
|
||||||
"local_root": hex.EncodeToString(localRoot.Hash),
|
"local_root": hex.EncodeToString(localRoot.Hash),
|
||||||
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
"remote_root": hex.EncodeToString(remoteRoot.Hash),
|
||||||
}).Info("Merkle roots differ, starting recursive diff")
|
}).Info("Merkle roots differ, starting recursive diff")
|
||||||
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
s.diffMerkleTreesRecursive(peer.Address, localRoot, remoteRoot)
|
||||||
} else {
|
} else {
|
||||||
@@ -186,10 +186,17 @@ func (s *SyncService) performMerkleSync() {
|
|||||||
|
|
||||||
// requestMerkleRoot requests the Merkle root from a peer
|
// requestMerkleRoot requests the Merkle root from a peer
|
||||||
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
func (s *SyncService) requestMerkleRoot(peerAddress string) (*types.MerkleRootResponse, error) {
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/root", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/root", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -216,7 +223,7 @@ func (s *SyncService) diffMerkleTreesRecursive(peerAddress string, localNode, re
|
|||||||
// Hashes differ, need to go deeper.
|
// Hashes differ, need to go deeper.
|
||||||
// Request children from the remote peer for the current range.
|
// Request children from the remote peer for the current range.
|
||||||
req := types.MerkleTreeDiffRequest{
|
req := types.MerkleTreeDiffRequest{
|
||||||
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
ParentNode: *remoteNode, // We are asking the remote peer about its children for this range
|
||||||
LocalHash: localNode.Hash, // Our hash for this range
|
LocalHash: localNode.Hash, // Our hash for this range
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -294,10 +301,17 @@ func (s *SyncService) handleLeafLevelDiff(peerAddress string, keys []string, loc
|
|||||||
|
|
||||||
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
// fetchSingleKVFromPeer fetches a single KV pair from a peer
|
||||||
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
func (s *SyncService) fetchSingleKVFromPeer(peerAddress, path string) (*types.StoredValue, error) {
|
||||||
client := &http.Client{Timeout: 5 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 5*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/kv/%s", peerAddress, path)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv/%s", protocol, peerAddress, path)
|
||||||
|
|
||||||
resp, err := client.Get(url)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -398,14 +412,14 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
|
|
||||||
// Timestamps are equal - need sophisticated conflict resolution
|
// Timestamps are equal - need sophisticated conflict resolution
|
||||||
s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule")
|
s.logger.WithField("key", key).Info("Timestamp collision detected, applying oldest-node rule")
|
||||||
|
|
||||||
// Get cluster members to determine which node is older
|
// Get cluster members to determine which node is older
|
||||||
members := s.gossipService.GetMembers()
|
members := s.gossipService.GetMembers()
|
||||||
|
|
||||||
// Find the local node and the remote node in membership
|
// Find the local node and the remote node in membership
|
||||||
var localMember, remoteMember *types.Member
|
var localMember, remoteMember *types.Member
|
||||||
localNodeID := s.config.NodeID
|
localNodeID := s.config.NodeID
|
||||||
|
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
if member.ID == localNodeID {
|
if member.ID == localNodeID {
|
||||||
localMember = member
|
localMember = member
|
||||||
@@ -414,16 +428,16 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
remoteMember = member
|
remoteMember = member
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
// If we can't find membership info, fall back to UUID comparison for deterministic result
|
||||||
if localMember == nil || remoteMember == nil {
|
if localMember == nil || remoteMember == nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"peerAddress": peerAddress,
|
"peerAddress": peerAddress,
|
||||||
"localNodeID": localNodeID,
|
"localNodeID": localNodeID,
|
||||||
"localMember": localMember != nil,
|
"localMember": localMember != nil,
|
||||||
"remoteMember": remoteMember != nil,
|
"remoteMember": remoteMember != nil,
|
||||||
"totalMembers": len(members),
|
"totalMembers": len(members),
|
||||||
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
}).Warn("Could not find membership info for conflict resolution, using UUID comparison")
|
||||||
if remote.UUID < local.UUID {
|
if remote.UUID < local.UUID {
|
||||||
// Remote UUID lexically smaller (deterministic choice)
|
// Remote UUID lexically smaller (deterministic choice)
|
||||||
@@ -436,41 +450,49 @@ func (s *SyncService) resolveConflict(key string, local, remote *types.StoredVal
|
|||||||
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)")
|
s.logger.WithField("key", key).Info("Conflict resolved: local data wins (UUID tie-breaker)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply oldest-node rule: node with earliest joined_timestamp wins
|
// Apply oldest-node rule: node with earliest joined_timestamp wins
|
||||||
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
|
if remoteMember.JoinedTimestamp < localMember.JoinedTimestamp {
|
||||||
// Remote node is older, its data wins
|
// Remote node is older, its data wins
|
||||||
err := s.storeReplicatedDataWithMetadata(key, remote)
|
err := s.storeReplicatedDataWithMetadata(key, remote)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"local_joined": localMember.JoinedTimestamp,
|
"local_joined": localMember.JoinedTimestamp,
|
||||||
"remote_joined": remoteMember.JoinedTimestamp,
|
"remote_joined": remoteMember.JoinedTimestamp,
|
||||||
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
}).Info("Conflict resolved: remote data wins (oldest-node rule)")
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Local node is older or equal, keep local data
|
// Local node is older or equal, keep local data
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"key": key,
|
"key": key,
|
||||||
"local_joined": localMember.JoinedTimestamp,
|
"local_joined": localMember.JoinedTimestamp,
|
||||||
"remote_joined": remoteMember.JoinedTimestamp,
|
"remote_joined": remoteMember.JoinedTimestamp,
|
||||||
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
}).Info("Conflict resolved: local data wins (oldest-node rule)")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
// requestMerkleDiff requests children hashes or keys for a given node/range from a peer
|
||||||
func (s *SyncService) requestMerkleDiff(peerAddress string, req types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
func (s *SyncService) requestMerkleDiff(peerAddress string, reqData types.MerkleTreeDiffRequest) (*types.MerkleTreeDiffResponse, error) {
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
client := NewAuthenticatedHTTPClient(s.config, 10*time.Second)
|
||||||
url := fmt.Sprintf("http://%s/merkle_tree/diff", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/merkle_tree/diff", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -525,20 +547,28 @@ func (s *SyncService) handleChildrenDiff(peerAddress string, children []types.Me
|
|||||||
|
|
||||||
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
// fetchAndStoreRange fetches a range of KV pairs from a peer and stores them locally
|
||||||
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey string) error {
|
||||||
req := types.KVRangeRequest{
|
reqData := types.KVRangeRequest{
|
||||||
StartKey: startKey,
|
StartKey: startKey,
|
||||||
EndKey: endKey,
|
EndKey: endKey,
|
||||||
Limit: 0, // No limit
|
Limit: 0, // No limit
|
||||||
}
|
}
|
||||||
jsonData, err := json.Marshal(req)
|
jsonData, err := json.Marshal(reqData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{Timeout: 30 * time.Second} // Longer timeout for range fetches
|
client := NewAuthenticatedHTTPClient(s.config, 30*time.Second) // Longer timeout for range fetches
|
||||||
url := fmt.Sprintf("http://%s/kv_range", peerAddress)
|
protocol := GetProtocol(s.config)
|
||||||
|
url := fmt.Sprintf("%s://%s/kv_range", protocol, peerAddress)
|
||||||
|
|
||||||
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
AddClusterAuthHeaders(req, s.config)
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -568,4 +598,4 @@ func (s *SyncService) fetchAndStoreRange(peerAddress string, startKey, endKey st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -1,12 +1,14 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"kvs/types"
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
"kvs/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default configuration
|
// Default configuration
|
||||||
@@ -27,41 +29,61 @@ func Default() *types.Config {
|
|||||||
BootstrapMaxAgeHours: 720, // 30 days
|
BootstrapMaxAgeHours: 720, // 30 days
|
||||||
ThrottleDelayMs: 100,
|
ThrottleDelayMs: 100,
|
||||||
FetchDelayMs: 50,
|
FetchDelayMs: 50,
|
||||||
|
|
||||||
// Default compression settings
|
// Default compression settings
|
||||||
CompressionEnabled: true,
|
CompressionEnabled: true,
|
||||||
CompressionLevel: 3, // Balance between performance and compression ratio
|
CompressionLevel: 3, // Balance between performance and compression ratio
|
||||||
|
|
||||||
// Default TTL and size limit settings
|
// Default TTL and size limit settings
|
||||||
DefaultTTL: "0", // No default TTL
|
DefaultTTL: "0", // No default TTL
|
||||||
MaxJSONSize: 1048576, // 1MB default max JSON size
|
MaxJSONSize: 1048576, // 1MB default max JSON size
|
||||||
|
|
||||||
// Default rate limiting settings
|
// Default rate limiting settings
|
||||||
RateLimitRequests: 100, // 100 requests per window
|
RateLimitRequests: 100, // 100 requests per window
|
||||||
RateLimitWindow: "1m", // 1 minute window
|
RateLimitWindow: "1m", // 1 minute window
|
||||||
|
|
||||||
// Default tamper-evident logging settings
|
// Default tamper-evident logging settings
|
||||||
TamperLogActions: []string{"data_write", "user_create", "auth_failure"},
|
TamperLogActions: []string{"data_write", "user_create", "auth_failure"},
|
||||||
|
|
||||||
// Default backup system settings
|
// Default backup system settings
|
||||||
BackupEnabled: true,
|
BackupEnabled: true,
|
||||||
BackupSchedule: "0 0 * * *", // Daily at midnight
|
BackupSchedule: "0 0 * * *", // Daily at midnight
|
||||||
BackupPath: "./backups",
|
BackupPath: "./backups",
|
||||||
BackupRetention: 7, // Keep backups for 7 days
|
BackupRetention: 7, // Keep backups for 7 days
|
||||||
|
|
||||||
// Default feature toggle settings (all enabled by default)
|
// Default feature toggle settings (all enabled by default)
|
||||||
AuthEnabled: true,
|
AuthEnabled: true,
|
||||||
TamperLoggingEnabled: true,
|
TamperLoggingEnabled: true,
|
||||||
ClusteringEnabled: true,
|
ClusteringEnabled: true,
|
||||||
RateLimitingEnabled: true,
|
RateLimitingEnabled: true,
|
||||||
RevisionHistoryEnabled: true,
|
RevisionHistoryEnabled: true,
|
||||||
|
|
||||||
// Default anonymous access settings (both disabled by default for security)
|
// Default anonymous access settings (both disabled by default for security)
|
||||||
AllowAnonymousRead: false,
|
AllowAnonymousRead: false,
|
||||||
AllowAnonymousWrite: false,
|
AllowAnonymousWrite: false,
|
||||||
|
|
||||||
|
// Default cluster authentication settings (Issue #13)
|
||||||
|
ClusterSecret: generateClusterSecret(),
|
||||||
|
ClusterTLSEnabled: false,
|
||||||
|
ClusterTLSCertFile: "",
|
||||||
|
ClusterTLSKeyFile: "",
|
||||||
|
ClusterTLSSkipVerify: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateClusterSecret generates a cryptographically secure random cluster secret
|
||||||
|
func generateClusterSecret() string {
|
||||||
|
// Generate 32 bytes (256 bits) of random data
|
||||||
|
randomBytes := make([]byte, 32)
|
||||||
|
if _, err := rand.Read(randomBytes); err != nil {
|
||||||
|
// Fallback to a warning - this should never happen in practice
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: Failed to generate secure cluster secret: %v\n", err)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// Encode as base64 for easy configuration file storage
|
||||||
|
return base64.StdEncoding.EncodeToString(randomBytes)
|
||||||
|
}
|
||||||
|
|
||||||
// Load configuration from file or create default
|
// Load configuration from file or create default
|
||||||
func Load(configPath string) (*types.Config, error) {
|
func Load(configPath string) (*types.Config, error) {
|
||||||
config := Default()
|
config := Default()
|
||||||
@@ -94,5 +116,13 @@ func Load(configPath string) (*types.Config, error) {
|
|||||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Generate cluster secret if not provided and clustering is enabled (Issue #13)
|
||||||
|
if config.ClusteringEnabled && config.ClusterSecret == "" {
|
||||||
|
config.ClusterSecret = generateClusterSecret()
|
||||||
|
fmt.Printf("Warning: No cluster_secret configured. Generated a random secret.\n")
|
||||||
|
fmt.Printf(" To share this secret with other nodes, add it to your config:\n")
|
||||||
|
fmt.Printf(" cluster_secret: %s\n", config.ClusterSecret)
|
||||||
|
}
|
||||||
|
|
||||||
return config, nil
|
return config, nil
|
||||||
}
|
}
|
||||||
|
@@ -99,4 +99,4 @@ func ExtractKVResourceKey(r *http.Request) string {
|
|||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
@@ -8,4 +8,4 @@ import (
|
|||||||
// GetBackupFilename generates a filename for a backup
|
// GetBackupFilename generates a filename for a backup
|
||||||
func GetBackupFilename(timestamp time.Time) string {
|
func GetBackupFilename(timestamp time.Time) string {
|
||||||
return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02"))
|
return fmt.Sprintf("kvs-backup-%s.zstd", timestamp.Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
// Package features provides utility functions for KVS authentication, validation,
|
// Package features provides utility functions for KVS authentication, validation,
|
||||||
// logging, backup, and other operational features. These functions were extracted
|
// logging, backup, and other operational features. These functions were extracted
|
||||||
// from main.go to improve code organization and maintainability.
|
// from main.go to improve code organization and maintainability.
|
||||||
package features
|
package features
|
||||||
|
@@ -5,4 +5,4 @@ import "fmt"
|
|||||||
// GetRateLimitKey generates the storage key for rate limiting
|
// GetRateLimitKey generates the storage key for rate limiting
|
||||||
func GetRateLimitKey(userUUID string, windowStart int64) string {
|
func GetRateLimitKey(userUUID string, windowStart int64) string {
|
||||||
return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart)
|
return fmt.Sprintf("ratelimit:%s:%d", userUUID, windowStart)
|
||||||
}
|
}
|
||||||
|
@@ -5,4 +5,4 @@ import "fmt"
|
|||||||
// GetRevisionKey generates the storage key for a specific revision
|
// GetRevisionKey generates the storage key for a specific revision
|
||||||
func GetRevisionKey(baseKey string, revision int) string {
|
func GetRevisionKey(baseKey string, revision int) string {
|
||||||
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
||||||
}
|
}
|
||||||
|
@@ -21,4 +21,4 @@ func GenerateLogSignature(timestamp, action, userUUID, resource string) string {
|
|||||||
// Concatenate all fields in a deterministic order
|
// Concatenate all fields in a deterministic order
|
||||||
data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource)
|
data := fmt.Sprintf("%s|%s|%s|%s", timestamp, action, userUUID, resource)
|
||||||
return utils.HashSHA3512(data)
|
return utils.HashSHA3512(data)
|
||||||
}
|
}
|
||||||
|
@@ -21,4 +21,4 @@ func ParseTTL(ttlString string) (time.Duration, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return duration, nil
|
return duration, nil
|
||||||
}
|
}
|
||||||
|
@@ -53,7 +53,7 @@ wait_for_service() {
|
|||||||
local port=$1
|
local port=$1
|
||||||
local timeout=${2:-30}
|
local timeout=${2:-30}
|
||||||
local count=0
|
local count=0
|
||||||
|
|
||||||
while [ $count -lt $timeout ]; do
|
while [ $count -lt $timeout ]; do
|
||||||
if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then
|
if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then
|
||||||
return 0
|
return 0
|
||||||
@@ -67,7 +67,7 @@ wait_for_service() {
|
|||||||
# Test 1: Build verification
|
# Test 1: Build verification
|
||||||
test_build() {
|
test_build() {
|
||||||
test_start "Binary build verification"
|
test_start "Binary build verification"
|
||||||
|
|
||||||
cd "$SCRIPT_DIR"
|
cd "$SCRIPT_DIR"
|
||||||
if go build -o kvs . >/dev/null 2>&1; then
|
if go build -o kvs . >/dev/null 2>&1; then
|
||||||
log_success "Binary builds successfully"
|
log_success "Binary builds successfully"
|
||||||
@@ -82,7 +82,7 @@ test_build() {
|
|||||||
# Test 2: Basic functionality
|
# Test 2: Basic functionality
|
||||||
test_basic_functionality() {
|
test_basic_functionality() {
|
||||||
test_start "Basic functionality test"
|
test_start "Basic functionality test"
|
||||||
|
|
||||||
# Create basic config
|
# Create basic config
|
||||||
cat > basic.yaml <<EOF
|
cat > basic.yaml <<EOF
|
||||||
node_id: "basic-test"
|
node_id: "basic-test"
|
||||||
@@ -94,20 +94,20 @@ log_level: "error"
|
|||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start node
|
# Start node
|
||||||
$BINARY basic.yaml >/dev/null 2>&1 &
|
$BINARY basic.yaml >/dev/null 2>&1 &
|
||||||
local pid=$!
|
local pid=$!
|
||||||
|
|
||||||
if wait_for_service 8090; then
|
if wait_for_service 8090; then
|
||||||
# Test basic CRUD
|
# Test basic CRUD
|
||||||
local put_result=$(curl -s -X PUT http://localhost:8090/kv/test/basic \
|
local put_result=$(curl -s -X PUT http://localhost:8090/kv/test/basic \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-d '{"message":"hello world"}')
|
-d '{"message":"hello world"}')
|
||||||
|
|
||||||
local get_result=$(curl -s http://localhost:8090/kv/test/basic)
|
local get_result=$(curl -s http://localhost:8090/kv/test/basic)
|
||||||
local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) # Adjusted jq path
|
local message=$(echo "$get_result" | jq -r '.data.message' 2>/dev/null) # Adjusted jq path
|
||||||
|
|
||||||
if [ "$message" = "hello world" ]; then
|
if [ "$message" = "hello world" ]; then
|
||||||
log_success "Basic CRUD operations work"
|
log_success "Basic CRUD operations work"
|
||||||
else
|
else
|
||||||
@@ -116,38 +116,18 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "Basic test node failed to start"
|
log_error "Basic test node failed to start"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
kill $pid 2>/dev/null || true
|
kill $pid 2>/dev/null || true
|
||||||
sleep 2
|
sleep 2
|
||||||
|
|
||||||
# Test _ls endpoint
|
|
||||||
echo "Testing _ls endpoint..."
|
|
||||||
curl -X PUT http://localhost:8080/kv/home/room/closet/socks -H "Content-Type: application/json" -d '{"data":"socks"}'
|
|
||||||
curl -X PUT http://localhost:8080/kv/home/room/bed/sheets -H "Content-Type: application/json" -d '{"data":"sheets"}'
|
|
||||||
sleep 2 # Allow indexing
|
|
||||||
|
|
||||||
ls_response=$(curl -s http://localhost:8080/kv/home/room/_ls)
|
|
||||||
if echo "$ls_response" | jq -e '.children | length == 2' >/dev/null; then
|
|
||||||
echo "✓ _ls returns correct number of children"
|
|
||||||
else
|
|
||||||
echo "✗ _ls failed"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Test _tree endpoint
|
|
||||||
tree_response=$(curl -s http://localhost:8080/kv/home/_tree?depth=2)
|
|
||||||
if echo "$tree_response" | jq -e '.total > 0' >/dev/null; then
|
|
||||||
echo "✓ _tree returns tree structure"
|
|
||||||
else
|
|
||||||
echo "✗ _tree failed"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Test 3: Cluster formation
|
# Test 3: Cluster formation
|
||||||
test_cluster_formation() {
|
test_cluster_formation() {
|
||||||
test_start "2-node cluster formation and Merkle Tree replication"
|
test_start "2-node cluster formation and Merkle Tree replication"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="test-cluster-secret-12345678901234567890"
|
||||||
|
|
||||||
# Node 1 config
|
# Node 1 config
|
||||||
cat > cluster1.yaml <<EOF
|
cat > cluster1.yaml <<EOF
|
||||||
node_id: "cluster-1"
|
node_id: "cluster-1"
|
||||||
@@ -161,6 +141,7 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Node 2 config
|
# Node 2 config
|
||||||
@@ -176,52 +157,53 @@ gossip_interval_max: 10
|
|||||||
sync_interval: 10
|
sync_interval: 10
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
$BINARY cluster1.yaml >/dev/null 2>&1 &
|
$BINARY cluster1.yaml >/dev/null 2>&1 &
|
||||||
local pid1=$!
|
local pid1=$!
|
||||||
|
|
||||||
if ! wait_for_service 8101; then
|
if ! wait_for_service 8101; then
|
||||||
log_error "Cluster node 1 failed to start"
|
log_error "Cluster node 1 failed to start"
|
||||||
kill $pid1 2>/dev/null || true
|
kill $pid1 2>/dev/null || true
|
||||||
return 1
|
return 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
sleep 2 # Give node 1 a moment to fully initialize
|
sleep 2 # Give node 1 a moment to fully initialize
|
||||||
$BINARY cluster2.yaml >/dev/null 2>&1 &
|
$BINARY cluster2.yaml >/dev/null 2>&1 &
|
||||||
local pid2=$!
|
local pid2=$!
|
||||||
|
|
||||||
if ! wait_for_service 8102; then
|
if ! wait_for_service 8102; then
|
||||||
log_error "Cluster node 2 failed to start"
|
log_error "Cluster node 2 failed to start"
|
||||||
kill $pid1 $pid2 2>/dev/null || true
|
kill $pid1 $pid2 2>/dev/null || true
|
||||||
return 1
|
return 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Wait for cluster formation and initial Merkle sync
|
# Wait for cluster formation and initial Merkle sync
|
||||||
sleep 15
|
sleep 15
|
||||||
|
|
||||||
# Check if nodes see each other
|
# Check if nodes see each other
|
||||||
local node1_members=$(curl -s http://localhost:8101/members/ | jq length 2>/dev/null || echo 0)
|
local node1_members=$(curl -s http://localhost:8101/members/ | jq length 2>/dev/null || echo 0)
|
||||||
local node2_members=$(curl -s http://localhost:8102/members/ | jq length 2>/dev/null || echo 0)
|
local node2_members=$(curl -s http://localhost:8102/members/ | jq length 2>/dev/null || echo 0)
|
||||||
|
|
||||||
if [ "$node1_members" -ge 1 ] && [ "$node2_members" -ge 1 ]; then
|
if [ "$node1_members" -ge 1 ] && [ "$node2_members" -ge 1 ]; then
|
||||||
log_success "2-node cluster formed successfully (N1 members: $node1_members, N2 members: $node2_members)"
|
log_success "2-node cluster formed successfully (N1 members: $node1_members, N2 members: $node2_members)"
|
||||||
|
|
||||||
# Test data replication
|
# Test data replication
|
||||||
log_info "Putting data on Node 1, waiting for Merkle sync..."
|
log_info "Putting data on Node 1, waiting for Merkle sync..."
|
||||||
curl -s -X PUT http://localhost:8101/kv/cluster/test \
|
curl -s -X PUT http://localhost:8101/kv/cluster/test \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-d '{"source":"node1", "value": 1}' >/dev/null
|
-d '{"source":"node1", "value": 1}' >/dev/null
|
||||||
|
|
||||||
# Wait for Merkle sync cycle to complete
|
# Wait for Merkle sync cycle to complete
|
||||||
sleep 12
|
sleep 12
|
||||||
|
|
||||||
local node2_data_full=$(curl -s http://localhost:8102/kv/cluster/test)
|
local node2_data_full=$(curl -s http://localhost:8102/kv/cluster/test)
|
||||||
local node2_data_source=$(echo "$node2_data_full" | jq -r '.data.source' 2>/dev/null)
|
local node2_data_source=$(echo "$node2_data_full" | jq -r '.data.source' 2>/dev/null)
|
||||||
local node2_data_value=$(echo "$node2_data_full" | jq -r '.data.value' 2>/dev/null)
|
local node2_data_value=$(echo "$node2_data_full" | jq -r '.data.value' 2>/dev/null)
|
||||||
local node1_data_full=$(curl -s http://localhost:8101/kv/cluster/test)
|
local node1_data_full=$(curl -s http://localhost:8101/kv/cluster/test)
|
||||||
|
|
||||||
if [ "$node2_data_source" = "node1" ] && [ "$node2_data_value" = "1" ]; then
|
if [ "$node2_data_source" = "node1" ] && [ "$node2_data_value" = "1" ]; then
|
||||||
log_success "Data replication works correctly (Node 2 has data from Node 1)"
|
log_success "Data replication works correctly (Node 2 has data from Node 1)"
|
||||||
|
|
||||||
@@ -242,7 +224,7 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)"
|
log_error "Cluster formation failed (N1 members: $node1_members, N2 members: $node2_members)"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
kill $pid1 $pid2 2>/dev/null || true
|
kill $pid1 $pid2 2>/dev/null || true
|
||||||
sleep 2
|
sleep 2
|
||||||
}
|
}
|
||||||
@@ -262,6 +244,9 @@ test_conflict_resolution() {
|
|||||||
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
if go run test_conflict.go "$TEST_DIR/conflict1_data" "$TEST_DIR/conflict2_data"; then
|
||||||
cd "$TEST_DIR"
|
cd "$TEST_DIR"
|
||||||
|
|
||||||
|
# Shared cluster secret for authentication (Issue #13)
|
||||||
|
local CLUSTER_SECRET="conflict-cluster-secret-1234567890123"
|
||||||
|
|
||||||
# Create configs
|
# Create configs
|
||||||
cat > conflict1.yaml <<EOF
|
cat > conflict1.yaml <<EOF
|
||||||
node_id: "conflict-1"
|
node_id: "conflict-1"
|
||||||
@@ -273,6 +258,7 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
cat > conflict2.yaml <<EOF
|
cat > conflict2.yaml <<EOF
|
||||||
@@ -285,32 +271,33 @@ log_level: "info"
|
|||||||
sync_interval: 3
|
sync_interval: 3
|
||||||
allow_anonymous_read: true
|
allow_anonymous_read: true
|
||||||
allow_anonymous_write: true
|
allow_anonymous_write: true
|
||||||
|
cluster_secret: "$CLUSTER_SECRET"
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start nodes
|
# Start nodes
|
||||||
# Node 1 started first, making it "older" for tie-breaker if timestamps are equal
|
# Node 1 started first, making it "older" for tie-breaker if timestamps are equal
|
||||||
"$BINARY" conflict1.yaml >conflict1.log 2>&1 &
|
"$BINARY" conflict1.yaml >conflict1.log 2>&1 &
|
||||||
local pid1=$!
|
local pid1=$!
|
||||||
|
|
||||||
if wait_for_service 8111; then
|
if wait_for_service 8111; then
|
||||||
sleep 2
|
sleep 2
|
||||||
$BINARY conflict2.yaml >conflict2.log 2>&1 &
|
$BINARY conflict2.yaml >conflict2.log 2>&1 &
|
||||||
local pid2=$!
|
local pid2=$!
|
||||||
|
|
||||||
if wait_for_service 8112; then
|
if wait_for_service 8112; then
|
||||||
# Get initial data (full StoredValue)
|
# Get initial data (full StoredValue)
|
||||||
local node1_initial_full=$(curl -s http://localhost:8111/kv/test/conflict/data)
|
local node1_initial_full=$(curl -s http://localhost:8111/kv/test/conflict/data)
|
||||||
local node2_initial_full=$(curl -s http://localhost:8112/kv/test/conflict/data)
|
local node2_initial_full=$(curl -s http://localhost:8112/kv/test/conflict/data)
|
||||||
|
|
||||||
local node1_initial_msg=$(echo "$node1_initial_full" | jq -r '.data.message' 2>/dev/null)
|
local node1_initial_msg=$(echo "$node1_initial_full" | jq -r '.data.message' 2>/dev/null)
|
||||||
local node2_initial_msg=$(echo "$node2_initial_full" | jq -r '.data.message' 2>/dev/null)
|
local node2_initial_msg=$(echo "$node2_initial_full" | jq -r '.data.message' 2>/dev/null)
|
||||||
|
|
||||||
log_info "Initial conflict state: Node1='$node1_initial_msg', Node2='$node2_initial_msg'"
|
log_info "Initial conflict state: Node1='$node1_initial_msg', Node2='$node2_initial_msg'"
|
||||||
|
|
||||||
# Allow time for cluster formation and gossip protocol to stabilize
|
# Allow time for cluster formation and gossip protocol to stabilize
|
||||||
log_info "Waiting for cluster formation and gossip stabilization..."
|
log_info "Waiting for cluster formation and gossip stabilization..."
|
||||||
sleep 20
|
sleep 20
|
||||||
|
|
||||||
# Wait for conflict resolution with retry logic (up to 60 seconds)
|
# Wait for conflict resolution with retry logic (up to 60 seconds)
|
||||||
local max_attempts=20
|
local max_attempts=20
|
||||||
local attempt=1
|
local attempt=1
|
||||||
@@ -318,33 +305,33 @@ EOF
|
|||||||
local node2_final_msg=""
|
local node2_final_msg=""
|
||||||
local node1_final_full=""
|
local node1_final_full=""
|
||||||
local node2_final_full=""
|
local node2_final_full=""
|
||||||
|
|
||||||
log_info "Waiting for conflict resolution (checking every 3 seconds, max 60 seconds)..."
|
log_info "Waiting for conflict resolution (checking every 3 seconds, max 60 seconds)..."
|
||||||
|
|
||||||
while [ $attempt -le $max_attempts ]; do
|
while [ $attempt -le $max_attempts ]; do
|
||||||
sleep 3
|
sleep 3
|
||||||
|
|
||||||
# Get current data from both nodes
|
# Get current data from both nodes
|
||||||
node1_final_full=$(curl -s http://localhost:8111/kv/test/conflict/data)
|
node1_final_full=$(curl -s http://localhost:8111/kv/test/conflict/data)
|
||||||
node2_final_full=$(curl -s http://localhost:8112/kv/test/conflict/data)
|
node2_final_full=$(curl -s http://localhost:8112/kv/test/conflict/data)
|
||||||
|
|
||||||
node1_final_msg=$(echo "$node1_final_full" | jq -r '.data.message' 2>/dev/null)
|
node1_final_msg=$(echo "$node1_final_full" | jq -r '.data.message' 2>/dev/null)
|
||||||
node2_final_msg=$(echo "$node2_final_full" | jq -r '.data.message' 2>/dev/null)
|
node2_final_msg=$(echo "$node2_final_full" | jq -r '.data.message' 2>/dev/null)
|
||||||
|
|
||||||
# Check if they've converged
|
# Check if they've converged
|
||||||
if [ "$node1_final_msg" = "$node2_final_msg" ] && [ -n "$node1_final_msg" ] && [ "$node1_final_msg" != "null" ]; then
|
if [ "$node1_final_msg" = "$node2_final_msg" ] && [ -n "$node1_final_msg" ] && [ "$node1_final_msg" != "null" ]; then
|
||||||
log_info "Conflict resolution achieved after $((attempt * 3)) seconds"
|
log_info "Conflict resolution achieved after $((attempt * 3)) seconds"
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
|
|
||||||
log_info "Attempt $attempt/$max_attempts: Node1='$node1_final_msg', Node2='$node2_final_msg' (not converged yet)"
|
log_info "Attempt $attempt/$max_attempts: Node1='$node1_final_msg', Node2='$node2_final_msg' (not converged yet)"
|
||||||
attempt=$((attempt + 1))
|
attempt=$((attempt + 1))
|
||||||
done
|
done
|
||||||
|
|
||||||
# Check if they converged
|
# Check if they converged
|
||||||
if [ "$node1_final_msg" = "$node2_final_msg" ] && [ -n "$node1_final_msg" ]; then
|
if [ "$node1_final_msg" = "$node2_final_msg" ] && [ -n "$node1_final_msg" ]; then
|
||||||
log_success "Conflict resolution converged to: '$node1_final_msg'"
|
log_success "Conflict resolution converged to: '$node1_final_msg'"
|
||||||
|
|
||||||
# Verify UUIDs and Timestamps are identical after resolution
|
# Verify UUIDs and Timestamps are identical after resolution
|
||||||
local node1_final_uuid=$(echo "$node1_final_full" | jq -r '.uuid' 2>/dev/null)
|
local node1_final_uuid=$(echo "$node1_final_full" | jq -r '.uuid' 2>/dev/null)
|
||||||
local node1_final_timestamp=$(echo "$node1_final_full" | jq -r '.timestamp' 2>/dev/null)
|
local node1_final_timestamp=$(echo "$node1_final_full" | jq -r '.timestamp' 2>/dev/null)
|
||||||
@@ -370,12 +357,12 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "Conflict node 2 failed to start"
|
log_error "Conflict node 2 failed to start"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
kill $pid2 2>/dev/null || true
|
kill $pid2 2>/dev/null || true
|
||||||
else
|
else
|
||||||
log_error "Conflict node 1 failed to start"
|
log_error "Conflict node 1 failed to start"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
kill $pid1 2>/dev/null || true
|
kill $pid1 2>/dev/null || true
|
||||||
sleep 2
|
sleep 2
|
||||||
else
|
else
|
||||||
@@ -387,7 +374,7 @@ EOF
|
|||||||
# Test 5: Authentication middleware (Issue #4)
|
# Test 5: Authentication middleware (Issue #4)
|
||||||
test_authentication_middleware() {
|
test_authentication_middleware() {
|
||||||
test_start "Authentication middleware test (Issue #4)"
|
test_start "Authentication middleware test (Issue #4)"
|
||||||
|
|
||||||
# Create auth test config
|
# Create auth test config
|
||||||
cat > auth_test.yaml <<EOF
|
cat > auth_test.yaml <<EOF
|
||||||
node_id: "auth-test"
|
node_id: "auth-test"
|
||||||
@@ -400,23 +387,23 @@ auth_enabled: true
|
|||||||
allow_anonymous_read: false
|
allow_anonymous_read: false
|
||||||
allow_anonymous_write: false
|
allow_anonymous_write: false
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Start node
|
# Start node
|
||||||
$BINARY auth_test.yaml >auth_test.log 2>&1 &
|
$BINARY auth_test.yaml >auth_test.log 2>&1 &
|
||||||
local pid=$!
|
local pid=$!
|
||||||
|
|
||||||
if wait_for_service 8095; then
|
if wait_for_service 8095; then
|
||||||
sleep 2 # Allow root account creation
|
sleep 2 # Allow root account creation
|
||||||
|
|
||||||
# Extract the token from logs
|
# Extract the token from logs
|
||||||
local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r')
|
local token=$(grep "Token:" auth_test.log | sed 's/.*Token: //' | tr -d '\n\r')
|
||||||
|
|
||||||
if [ -z "$token" ]; then
|
if [ -z "$token" ]; then
|
||||||
log_error "Failed to extract authentication token from logs"
|
log_error "Failed to extract authentication token from logs"
|
||||||
kill $pid 2>/dev/null || true
|
kill $pid 2>/dev/null || true
|
||||||
return
|
return
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Test 1: Admin endpoints should fail without authentication
|
# Test 1: Admin endpoints should fail without authentication
|
||||||
local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}')
|
local no_auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -d '{"nickname":"test","password":"test"}')
|
||||||
if echo "$no_auth_response" | grep -q "Unauthorized"; then
|
if echo "$no_auth_response" | grep -q "Unauthorized"; then
|
||||||
@@ -424,7 +411,7 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response"
|
log_error "Admin endpoints should reject unauthenticated requests, got: $no_auth_response"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Test 2: Admin endpoints should work with valid authentication
|
# Test 2: Admin endpoints should work with valid authentication
|
||||||
local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}')
|
local auth_response=$(curl -s -X POST http://localhost:8095/api/users -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"nickname":"authtest","password":"authtest"}')
|
||||||
if echo "$auth_response" | grep -q "uuid"; then
|
if echo "$auth_response" | grep -q "uuid"; then
|
||||||
@@ -432,7 +419,7 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "Admin endpoints should work with authentication, got: $auth_response"
|
log_error "Admin endpoints should work with authentication, got: $auth_response"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Test 3: KV endpoints should require auth when anonymous access is disabled
|
# Test 3: KV endpoints should require auth when anonymous access is disabled
|
||||||
local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}')
|
local kv_no_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -d '{"test":"auth"}')
|
||||||
if echo "$kv_no_auth" | grep -q "Unauthorized"; then
|
if echo "$kv_no_auth" | grep -q "Unauthorized"; then
|
||||||
@@ -440,7 +427,7 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth"
|
log_error "KV endpoints should require auth when anonymous access disabled, got: $kv_no_auth"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Test 4: KV endpoints should work with valid authentication
|
# Test 4: KV endpoints should work with valid authentication
|
||||||
local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}')
|
local kv_auth=$(curl -s -X PUT http://localhost:8095/kv/test/auth -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"test":"auth"}')
|
||||||
if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then
|
if echo "$kv_auth" | grep -q "uuid\|timestamp" || [ -z "$kv_auth" ]; then
|
||||||
@@ -448,7 +435,7 @@ EOF
|
|||||||
else
|
else
|
||||||
log_error "KV endpoints should work with authentication, got: $kv_auth"
|
log_error "KV endpoints should work with authentication, got: $kv_auth"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
kill $pid 2>/dev/null || true
|
kill $pid 2>/dev/null || true
|
||||||
sleep 2
|
sleep 2
|
||||||
else
|
else
|
||||||
@@ -457,24 +444,114 @@ EOF
|
|||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Test 6: Resource Metadata Management (Issue #12)
|
||||||
|
test_metadata_management() {
|
||||||
|
test_start "Resource Metadata Management test (Issue #12)"
|
||||||
|
|
||||||
|
# Create metadata test config
|
||||||
|
cat > metadata_test.yaml <<EOF
|
||||||
|
node_id: "metadata-test"
|
||||||
|
bind_address: "127.0.0.1"
|
||||||
|
port: 8096
|
||||||
|
data_dir: "./metadata_test_data"
|
||||||
|
seed_nodes: []
|
||||||
|
log_level: "error"
|
||||||
|
auth_enabled: true
|
||||||
|
allow_anonymous_read: false
|
||||||
|
allow_anonymous_write: false
|
||||||
|
EOF
|
||||||
|
|
||||||
|
# Start node
|
||||||
|
$BINARY metadata_test.yaml >metadata_test.log 2>&1 &
|
||||||
|
local pid=$!
|
||||||
|
|
||||||
|
if wait_for_service 8096; then
|
||||||
|
sleep 2 # Allow root account creation
|
||||||
|
|
||||||
|
# Extract the token from logs
|
||||||
|
local token=$(grep "Token:" metadata_test.log | sed 's/.*Token: //' | tr -d '\n\r')
|
||||||
|
|
||||||
|
if [ -z "$token" ]; then
|
||||||
|
log_error "Failed to extract authentication token from logs"
|
||||||
|
kill $pid 2>/dev/null || true
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
|
||||||
|
# First, create a KV resource
|
||||||
|
curl -s -X PUT http://localhost:8096/kv/test/resource -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"data":"test"}' >/dev/null
|
||||||
|
sleep 1
|
||||||
|
|
||||||
|
# Test 1: Get metadata should fail for non-existent metadata (initially no metadata exists)
|
||||||
|
local get_response=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
|
||||||
|
local get_body=$(echo "$get_response" | head -n -1)
|
||||||
|
local get_code=$(echo "$get_response" | tail -n 1)
|
||||||
|
|
||||||
|
if [ "$get_code" = "404" ]; then
|
||||||
|
log_success "GET metadata returns 404 for non-existent metadata"
|
||||||
|
else
|
||||||
|
log_error "GET metadata should return 404 for non-existent metadata, got code: $get_code, body: $get_body"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Test 2: Update metadata should create new metadata
|
||||||
|
local update_response=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"test-owner-123","permissions":3840}')
|
||||||
|
if echo "$update_response" | grep -q "owner_uuid"; then
|
||||||
|
log_success "PUT metadata creates metadata successfully"
|
||||||
|
else
|
||||||
|
log_error "PUT metadata should create metadata, got: $update_response"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Test 3: Get metadata should now return the created metadata
|
||||||
|
local get_response2=$(curl -s -X GET http://localhost:8096/kv/test/resource/metadata -H "Authorization: Bearer $token")
|
||||||
|
if echo "$get_response2" | grep -q "test-owner-123" && echo "$get_response2" | grep -q "3840"; then
|
||||||
|
log_success "GET metadata returns created metadata"
|
||||||
|
else
|
||||||
|
log_error "GET metadata should return created metadata, got: $get_response2"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Test 4: Update metadata should modify existing metadata
|
||||||
|
local update_response2=$(curl -s -X PUT http://localhost:8096/kv/test/resource/metadata -H "Content-Type: application/json" -H "Authorization: Bearer $token" -d '{"owner_uuid":"new-owner-456"}')
|
||||||
|
if echo "$update_response2" | grep -q "new-owner-456"; then
|
||||||
|
log_success "PUT metadata updates existing metadata"
|
||||||
|
else
|
||||||
|
log_error "PUT metadata should update metadata, got: $update_response2"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Test 5: Metadata endpoints should require authentication
|
||||||
|
local no_auth=$(curl -s -w "\n%{http_code}" -X GET http://localhost:8096/kv/test/resource/metadata)
|
||||||
|
local no_auth_code=$(echo "$no_auth" | tail -n 1)
|
||||||
|
if [ "$no_auth_code" = "401" ]; then
|
||||||
|
log_success "Metadata endpoints properly require authentication"
|
||||||
|
else
|
||||||
|
log_error "Metadata endpoints should require authentication, got code: $no_auth_code"
|
||||||
|
fi
|
||||||
|
|
||||||
|
kill $pid 2>/dev/null || true
|
||||||
|
sleep 2
|
||||||
|
else
|
||||||
|
log_error "Metadata test node failed to start"
|
||||||
|
kill $pid 2>/dev/null || true
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
# Main test execution
|
# Main test execution
|
||||||
main() {
|
main() {
|
||||||
echo "=================================================="
|
echo "=================================================="
|
||||||
echo " KVS Integration Test Suite (Merkle Tree)"
|
echo " KVS Integration Test Suite (Merkle Tree)"
|
||||||
echo "=================================================="
|
echo "=================================================="
|
||||||
|
|
||||||
# Setup
|
# Setup
|
||||||
log_info "Setting up test environment..."
|
log_info "Setting up test environment..."
|
||||||
cleanup
|
cleanup
|
||||||
mkdir -p "$TEST_DIR"
|
mkdir -p "$TEST_DIR"
|
||||||
cd "$TEST_DIR"
|
cd "$TEST_DIR"
|
||||||
|
|
||||||
# Run core tests
|
# Run core tests
|
||||||
test_build
|
test_build
|
||||||
test_basic_functionality
|
test_basic_functionality
|
||||||
test_cluster_formation
|
test_cluster_formation
|
||||||
test_conflict_resolution
|
test_conflict_resolution
|
||||||
test_authentication_middleware
|
test_authentication_middleware
|
||||||
|
test_metadata_management
|
||||||
|
|
||||||
# Results
|
# Results
|
||||||
echo "=================================================="
|
echo "=================================================="
|
||||||
@@ -484,7 +561,7 @@ main() {
|
|||||||
echo -e "${GREEN}Passed: $TESTS_PASSED${NC}"
|
echo -e "${GREEN}Passed: $TESTS_PASSED${NC}"
|
||||||
echo -e "${RED}Failed: $TESTS_FAILED${NC}"
|
echo -e "${RED}Failed: $TESTS_FAILED${NC}"
|
||||||
echo "=================================================="
|
echo "=================================================="
|
||||||
|
|
||||||
if [ $TESTS_FAILED -eq 0 ]; then
|
if [ $TESTS_FAILED -eq 0 ]; then
|
||||||
echo -e "${GREEN}🎉 All tests passed! KVS with Merkle Tree sync is working correctly.${NC}"
|
echo -e "${GREEN}🎉 All tests passed! KVS with Merkle Tree sync is working correctly.${NC}"
|
||||||
cleanup
|
cleanup
|
||||||
|
120
issues/7and12.md
120
issues/7and12.md
@@ -1,120 +0,0 @@
|
|||||||
#7 Add _ls and _tree Endpoints for Hierarchical Key Listing Using Merkle Tree
|
|
||||||
-----------------------------------------
|
|
||||||
|
|
||||||
KVS supports hierarchical keys (e.g., /home/room/closet/socks), which is great for organizing data like a file system. However, there's currently no built-in way for clients to discover or list subkeys under a given prefix/path. This makes it hard to build intuitive tools or UIs that need to navigate the keyspace, such as a web-based explorer or CLI client.
|
|
||||||
|
|
||||||
Add two new read-only endpoints that leverage the existing Merkle tree infrastructure for efficient prefix-based key listing. This aligns with KVS's modular design, eventual consistency model, and Merkle-based sync (no need for full DB scans—traverse the tree to identify relevant leaf nodes in O(log N) time).
|
|
||||||
Proposed Endpoints
|
|
||||||
|
|
||||||
Direct Children Listing (_ls or _list):
|
|
||||||
Endpoint: GET /kv/{path}/_ls (or GET /kv/{path}/_list for clarity).
|
|
||||||
Purpose: Returns a sorted list of direct subkeys under the given path/prefix (non-recursive).
|
|
||||||
Query Params (optional):
|
|
||||||
limit: Max number of keys to return (default: 100, max: 1000).
|
|
||||||
include_metadata: If true, include basic metadata like timestamps (default: false).
|
|
||||||
Response (JSON):
|
|
||||||
|
|
||||||
{
|
|
||||||
"path": "/home/room",
|
|
||||||
"children": [
|
|
||||||
{ "subkey": "closet", "timestamp": 1695280000000 },
|
|
||||||
{ "subkey": "bed", "timestamp": 1695279000000 }
|
|
||||||
],
|
|
||||||
"total": 2,
|
|
||||||
"truncated": false
|
|
||||||
}
|
|
||||||
|
|
||||||
Behavior:
|
|
||||||
Treat {path} as a prefix (e.g., /home/room/ → keys starting with /home/room/ but not /home/room/sub/).
|
|
||||||
Use the Merkle tree to find leaf nodes in the prefix range [prefix, prefix~] (where ~ is the next lexicographical prefix).
|
|
||||||
Skip index keys (e.g., _ts:*).
|
|
||||||
Respect auth: Use existing middleware (e.g., read scope if auth_enabled: true).
|
|
||||||
In read-only/syncing modes: Allow if not modifying data.
|
|
||||||
|
|
||||||
Recursive Tree View (_tree):
|
|
||||||
|
|
||||||
Endpoint: GET /kv/{path}/_tree.
|
|
||||||
Purpose: Returns a recursive tree structure of all subkeys under the given path (depth-first or breadth-first, configurable).
|
|
||||||
Query Params (optional):
|
|
||||||
depth: Max recursion depth (default: unlimited, but suggest 5 for safety).
|
|
||||||
limit: Max total keys (default: 500, max: 5000).
|
|
||||||
include_metadata: Include timestamps/UUIDs (default: false).
|
|
||||||
format: json (default) or nested (tree-like JSON).
|
|
||||||
Response (JSON, nested format):
|
|
||||||
|
|
||||||
{
|
|
||||||
"path": "/home/room",
|
|
||||||
"children": [
|
|
||||||
{
|
|
||||||
"subkey": "closet",
|
|
||||||
"children": [
|
|
||||||
{ "subkey": "socks", "timestamp": 1695281000000 }
|
|
||||||
],
|
|
||||||
"timestamp": 1695280000000
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"subkey": "bed",
|
|
||||||
"timestamp": 1695279000000
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"total": 3,
|
|
||||||
"truncated": false
|
|
||||||
}
|
|
||||||
|
|
||||||
Behavior:
|
|
||||||
Build on _ls logic: Recursively query sub-prefixes via Merkle tree traversal.
|
|
||||||
Prune at depth or limit to avoid overload.
|
|
||||||
Same auth and mode rules as _ls.
|
|
||||||
|
|
||||||
Integration with Existing Systems
|
|
||||||
|
|
||||||
Merkle Tree Usage: Extend cluster/merkle.go (e.g., add GetKeysInRange(startKey, endKey) []string method) to traverse nodes covering the prefix range without fetching full values. Reuse buildMerkleTreeFromPairs and filterPairsByRange from handlers.go.
|
|
||||||
Range Query Reuse: Build on existing KVRangeRequest/KVRangeResponse in types.go and getKVRangeHandler (strip values to return just keys for efficiency).
|
|
||||||
Auth & Permissions: Apply via authService.Middleware (e.g., read scope). Respect allow_anonymous_read.
|
|
||||||
Config Toggle: Add key_listing_enabled: true to types.Config for optional disable (e.g., for security in public clusters).
|
|
||||||
Distributed Consistency: Since Merkle trees are synced, listings will be eventually consistent across nodes. Add a consistent: true query param to force a quick Merkle refresh if needed.
|
|
||||||
|
|
||||||
|
|
||||||
#12 Missing API Endpoints for Resource Metadata Management (Ownership & Permissions)
|
|
||||||
-----------------------------------------
|
|
||||||
|
|
||||||
The KVS system currently lacks API endpoints to manage ResourceMetadata for key-value paths (/kv/{path}). While the AuthService and permissions.go implement robust permission checking based on OwnerUUID, GroupUUID, and Permissions, there are no exposed routes to:
|
|
||||||
|
|
||||||
Assign group-level permissions: Users cannot grant read/write access to specific groups for a given key-value path.
|
|
||||||
|
|
||||||
Change resource ownership: Users cannot transfer ownership of a key-value entry to another user.
|
|
||||||
|
|
||||||
This prevents administrators from fully leveraging the existing authentication and authorization framework for fine-grained access control over stored data.
|
|
||||||
|
|
||||||
Impact:
|
|
||||||
|
|
||||||
Limited administrative control over data access.
|
|
||||||
|
|
||||||
Inability to implement granular, group-based access policies for KV data.
|
|
||||||
|
|
||||||
Difficulty in reassigning data ownership when users or roles change.
|
|
||||||
|
|
||||||
Proposed Solution:
|
|
||||||
Implement new API endpoints (e.g., /kv/{path}/metadata) to allow authenticated and authorized users to:
|
|
||||||
|
|
||||||
Set/update the OwnerUUID for a given path.
|
|
||||||
|
|
||||||
Set/update the GroupUUID for a given path.
|
|
||||||
|
|
||||||
Set/update the Permissions bitmask for a given path.
|
|
||||||
|
|
||||||
Relevant Files:
|
|
||||||
|
|
||||||
server/routes.go (for new API routes)
|
|
||||||
|
|
||||||
server/handlers.go (for implementing new handlers)
|
|
||||||
|
|
||||||
auth/auth.go (for AuthService methods to interact with ResourceMetadata)
|
|
||||||
|
|
||||||
auth/permissions.go (existing logic for permission checks)
|
|
||||||
|
|
||||||
types/types.go (for ResourceMetadata structure)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
1
main.go
1
main.go
@@ -11,7 +11,6 @@ import (
|
|||||||
"kvs/server"
|
"kvs/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
configPath := "./config.yaml"
|
configPath := "./config.yaml"
|
||||||
|
|
||||||
|
@@ -213,6 +213,104 @@ func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.logger.WithField("path", path).Info("Value deleted")
|
s.logger.WithField("path", path).Info("Value deleted")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getResourceMetadataHandler retrieves metadata for a KV resource
|
||||||
|
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
path := vars["path"]
|
||||||
|
|
||||||
|
// Get metadata from storage
|
||||||
|
metadata, err := s.authService.GetResourceMetadata(path)
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
http.Error(w, "Not Found: No metadata exists for this resource", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
|
||||||
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := types.GetResourceMetadataResponse{
|
||||||
|
OwnerUUID: metadata.OwnerUUID,
|
||||||
|
GroupUUID: metadata.GroupUUID,
|
||||||
|
Permissions: metadata.Permissions,
|
||||||
|
TTL: metadata.TTL,
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
UpdatedAt: metadata.UpdatedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateResourceMetadataHandler updates metadata for a KV resource
|
||||||
|
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
path := vars["path"]
|
||||||
|
|
||||||
|
// Parse request body
|
||||||
|
var req types.UpdateResourceMetadataRequest
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
http.Error(w, "Bad Request: Invalid JSON", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get existing metadata or create new one
|
||||||
|
metadata, err := s.authService.GetResourceMetadata(path)
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
// Create new metadata with defaults
|
||||||
|
metadata = &types.ResourceMetadata{
|
||||||
|
OwnerUUID: "",
|
||||||
|
GroupUUID: "",
|
||||||
|
Permissions: types.DefaultPermissions,
|
||||||
|
TTL: "",
|
||||||
|
CreatedAt: time.Now().Unix(),
|
||||||
|
UpdatedAt: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
|
||||||
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update only provided fields
|
||||||
|
if req.OwnerUUID != nil {
|
||||||
|
metadata.OwnerUUID = *req.OwnerUUID
|
||||||
|
}
|
||||||
|
if req.GroupUUID != nil {
|
||||||
|
metadata.GroupUUID = *req.GroupUUID
|
||||||
|
}
|
||||||
|
if req.Permissions != nil {
|
||||||
|
metadata.Permissions = *req.Permissions
|
||||||
|
}
|
||||||
|
metadata.UpdatedAt = time.Now().Unix()
|
||||||
|
|
||||||
|
// Store updated metadata
|
||||||
|
if err := s.authService.SetResourceMetadata(path, metadata); err != nil {
|
||||||
|
s.logger.WithError(err).WithField("path", path).Error("Failed to update resource metadata")
|
||||||
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
response := types.GetResourceMetadataResponse{
|
||||||
|
OwnerUUID: metadata.OwnerUUID,
|
||||||
|
GroupUUID: metadata.GroupUUID,
|
||||||
|
Permissions: metadata.Permissions,
|
||||||
|
TTL: metadata.TTL,
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
UpdatedAt: metadata.UpdatedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
|
||||||
|
s.logger.WithFields(logrus.Fields{
|
||||||
|
"path": path,
|
||||||
|
"owner_uuid": metadata.OwnerUUID,
|
||||||
|
"group_uuid": metadata.GroupUUID,
|
||||||
|
}).Info("Resource metadata updated")
|
||||||
|
}
|
||||||
|
|
||||||
// isClusterMember checks if request is from a cluster member
|
// isClusterMember checks if request is from a cluster member
|
||||||
func (s *Server) isClusterMember(remoteAddr string) bool {
|
func (s *Server) isClusterMember(remoteAddr string) bool {
|
||||||
host, _, err := net.SplitHostPort(remoteAddr)
|
host, _, err := net.SplitHostPort(remoteAddr)
|
||||||
@@ -1097,102 +1195,6 @@ func (s *Server) getSpecificRevisionHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
json.NewEncoder(w).Encode(storedValue)
|
json.NewEncoder(w).Encode(storedValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getKeyListHandler handles _ls endpoint for direct children
|
|
||||||
func (s *Server) getKeyListHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := "/" + vars["path"] // Ensure leading slash for consistency
|
|
||||||
|
|
||||||
// Parse query params
|
|
||||||
limitStr := r.URL.Query().Get("limit")
|
|
||||||
limit := 100 // Default
|
|
||||||
if limitStr != "" {
|
|
||||||
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
|
|
||||||
limit = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
includeMetadata := r.URL.Query().Get("include_metadata") == "true"
|
|
||||||
|
|
||||||
mode := s.getMode()
|
|
||||||
if mode == "syncing" {
|
|
||||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
keys, err := s.merkleService.GetKeysInPrefix(path, limit)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get keys in prefix")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response := KeyListResponse{
|
|
||||||
Path: path,
|
|
||||||
Children: make([]struct{ Subkey string; Timestamp int64 }, len(keys)),
|
|
||||||
Total: len(keys),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, subkey := range keys {
|
|
||||||
fullKey := path + subkey
|
|
||||||
if includeMetadata {
|
|
||||||
ts, err := s.merkleService.getTimestampForKey(fullKey)
|
|
||||||
if err == nil {
|
|
||||||
response.Children[i].Timestamp = ts
|
|
||||||
}
|
|
||||||
}
|
|
||||||
response.Children[i].Subkey = subkey
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(keys) >= limit {
|
|
||||||
response.Truncated = true
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
json.NewEncoder(w).Encode(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getKeyTreeHandler handles _tree endpoint for recursive tree
|
|
||||||
func (s *Server) getKeyTreeHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := "/" + vars["path"]
|
|
||||||
|
|
||||||
// Parse query params
|
|
||||||
depthStr := r.URL.Query().Get("depth")
|
|
||||||
maxDepth := 0 // Unlimited
|
|
||||||
if depthStr != "" {
|
|
||||||
if d, err := strconv.Atoi(depthStr); err == nil && d > 0 {
|
|
||||||
maxDepth = d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
limitStr := r.URL.Query().Get("limit")
|
|
||||||
limit := 500
|
|
||||||
if limitStr != "" {
|
|
||||||
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 5000 {
|
|
||||||
limit = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
includeMetadata := r.URL.Query().Get("include_metadata") == "true"
|
|
||||||
|
|
||||||
mode := s.getMode()
|
|
||||||
if mode == "syncing" {
|
|
||||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tree, err := s.merkleService.GetTreeForPrefix(path, maxDepth, limit)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to build tree")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
json.NewEncoder(w).Encode(tree)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// calculateHash computes SHA256 hash of data
|
// calculateHash computes SHA256 hash of data
|
||||||
func calculateHash(data []byte) []byte {
|
func calculateHash(data []byte) []byte {
|
||||||
h := sha256.New()
|
h := sha256.New()
|
||||||
@@ -1366,141 +1368,28 @@ func (s *Server) getSpecificRevision(key string, revision int) (*types.StoredVal
|
|||||||
return s.revisionService.GetSpecificRevision(key, revision)
|
return s.revisionService.GetSpecificRevision(key, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getResourceMetadataHandler retrieves metadata for a resource path
|
// clusterBootstrapHandler provides the cluster secret to authenticated administrators (Issue #13)
|
||||||
func (s *Server) getResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) clusterBootstrapHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
// Ensure clustering is enabled
|
||||||
path := vars["path"]
|
if !s.config.ClusteringEnabled {
|
||||||
|
http.Error(w, "Clustering is disabled", http.StatusServiceUnavailable)
|
||||||
authCtx := auth.GetAuthContext(r.Context())
|
|
||||||
if authCtx == nil {
|
|
||||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check read permission on the resource
|
// Ensure cluster secret is configured
|
||||||
if !s.authService.CheckResourcePermission(authCtx, path, "read") {
|
if s.config.ClusterSecret == "" {
|
||||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
s.logger.Error("Cluster secret is not configured")
|
||||||
|
http.Error(w, "Cluster secret is not configured", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata, err := s.authService.GetResourceMetadata(path)
|
// Return the cluster secret for secure bootstrap
|
||||||
if err == badger.ErrKeyNotFound {
|
response := map[string]string{
|
||||||
// Return default metadata if not found
|
"cluster_secret": s.config.ClusterSecret,
|
||||||
defaultMetadata := types.ResourceMetadata{
|
|
||||||
OwnerUUID: authCtx.UserUUID,
|
|
||||||
GroupUUID: "",
|
|
||||||
Permissions: types.DefaultPermissions,
|
|
||||||
CreatedAt: time.Now().Unix(),
|
|
||||||
UpdatedAt: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
metadata = &defaultMetadata
|
|
||||||
} else if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response := types.GetResourceMetadataResponse{
|
s.logger.WithField("remote_addr", r.RemoteAddr).Info("Cluster secret retrieved for bootstrap")
|
||||||
OwnerUUID: metadata.OwnerUUID,
|
|
||||||
GroupUUID: metadata.GroupUUID,
|
|
||||||
Permissions: metadata.Permissions,
|
|
||||||
TTL: metadata.TTL,
|
|
||||||
CreatedAt: metadata.CreatedAt,
|
|
||||||
UpdatedAt: metadata.UpdatedAt,
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
json.NewEncoder(w).Encode(response)
|
json.NewEncoder(w).Encode(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateResourceMetadataHandler updates metadata for a resource path
|
|
||||||
func (s *Server) updateResourceMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
path := vars["path"]
|
|
||||||
|
|
||||||
authCtx := auth.GetAuthContext(r.Context())
|
|
||||||
if authCtx == nil {
|
|
||||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check write permission on the resource (owner write required for metadata changes)
|
|
||||||
if !s.authService.CheckResourcePermission(authCtx, path, "write") {
|
|
||||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var req types.UpdateResourceMetadataRequest
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
||||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get current metadata (or default if not exists)
|
|
||||||
currentMetadata, err := s.authService.GetResourceMetadata(path)
|
|
||||||
if err == badger.ErrKeyNotFound {
|
|
||||||
currentMetadata = &types.ResourceMetadata{
|
|
||||||
OwnerUUID: authCtx.UserUUID,
|
|
||||||
GroupUUID: "",
|
|
||||||
Permissions: types.DefaultPermissions,
|
|
||||||
CreatedAt: time.Now().Unix(),
|
|
||||||
UpdatedAt: time.Now().Unix(),
|
|
||||||
}
|
|
||||||
} else if err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get current resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply updates only to provided fields
|
|
||||||
updated := false
|
|
||||||
if req.OwnerUUID != "" {
|
|
||||||
currentMetadata.OwnerUUID = req.OwnerUUID
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.GroupUUID != "" {
|
|
||||||
currentMetadata.GroupUUID = req.GroupUUID
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.Permissions != 0 {
|
|
||||||
currentMetadata.Permissions = req.Permissions
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
if req.TTL != "" {
|
|
||||||
currentMetadata.TTL = req.TTL
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if !updated {
|
|
||||||
http.Error(w, "No fields provided for update", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store updated metadata
|
|
||||||
if err := s.authService.StoreResourceMetadata(path, currentMetadata); err != nil {
|
|
||||||
s.logger.WithError(err).WithField("path", path).Error("Failed to store resource metadata")
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response := types.GetResourceMetadataResponse{
|
|
||||||
OwnerUUID: currentMetadata.OwnerUUID,
|
|
||||||
GroupUUID: currentMetadata.GroupUUID,
|
|
||||||
Permissions: currentMetadata.Permissions,
|
|
||||||
TTL: currentMetadata.TTL,
|
|
||||||
CreatedAt: currentMetadata.CreatedAt,
|
|
||||||
UpdatedAt: currentMetadata.UpdatedAt,
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
json.NewEncoder(w).Encode(response)
|
|
||||||
|
|
||||||
s.logger.WithFields(logrus.Fields{
|
|
||||||
"path": path,
|
|
||||||
"user_uuid": authCtx.UserUUID,
|
|
||||||
"owner_uuid": currentMetadata.OwnerUUID,
|
|
||||||
"group_uuid": currentMetadata.GroupUUID,
|
|
||||||
"permissions": currentMetadata.Permissions,
|
|
||||||
}).Info("Resource metadata updated")
|
|
||||||
}
|
|
||||||
|
102
server/routes.go
102
server/routes.go
@@ -1,6 +1,8 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -11,6 +13,18 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
// Health endpoint (always available)
|
// Health endpoint (always available)
|
||||||
router.HandleFunc("/health", s.healthHandler).Methods("GET")
|
router.HandleFunc("/health", s.healthHandler).Methods("GET")
|
||||||
|
|
||||||
|
// Resource Metadata Management endpoints (Issue #12) - Must come BEFORE general KV routes
|
||||||
|
// These need to be registered first to prevent /kv/{path:.+} from matching metadata paths
|
||||||
|
if s.config.AuthEnabled {
|
||||||
|
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
||||||
|
[]string{"admin:users:read"}, nil, "",
|
||||||
|
)(s.getResourceMetadataHandler)).Methods("GET")
|
||||||
|
|
||||||
|
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
||||||
|
[]string{"admin:users:update"}, nil, "",
|
||||||
|
)(s.updateResourceMetadataHandler)).Methods("PUT")
|
||||||
|
}
|
||||||
|
|
||||||
// KV endpoints (with conditional authentication based on anonymous access settings)
|
// KV endpoints (with conditional authentication based on anonymous access settings)
|
||||||
// GET endpoint - require auth if anonymous read is disabled
|
// GET endpoint - require auth if anonymous read is disabled
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
||||||
@@ -20,7 +34,7 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
} else {
|
} else {
|
||||||
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
// PUT endpoint - require auth if anonymous write is disabled
|
// PUT endpoint - require auth if anonymous write is disabled
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousWrite {
|
if s.config.AuthEnabled && !s.config.AllowAnonymousWrite {
|
||||||
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
||||||
@@ -29,7 +43,7 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
} else {
|
} else {
|
||||||
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
||||||
}
|
}
|
||||||
|
|
||||||
// DELETE endpoint - always require authentication (no anonymous delete)
|
// DELETE endpoint - always require authentication (no anonymous delete)
|
||||||
if s.config.AuthEnabled {
|
if s.config.AuthEnabled {
|
||||||
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
router.Handle("/kv/{path:.+}", s.authService.Middleware(
|
||||||
@@ -39,52 +53,34 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource Metadata endpoints (available when auth is enabled)
|
|
||||||
if s.config.AuthEnabled {
|
|
||||||
// GET metadata - require read permission
|
|
||||||
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
|
||||||
[]string{"read"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "read",
|
|
||||||
)(s.getResourceMetadataHandler)).Methods("GET")
|
|
||||||
|
|
||||||
// PUT metadata - require write permission (owner write)
|
|
||||||
router.Handle("/kv/{path:.+}/metadata", s.authService.Middleware(
|
|
||||||
[]string{"write"}, func(r *http.Request) string { return mux.Vars(r)["path"] }, "write",
|
|
||||||
)(s.updateResourceMetadataHandler)).Methods("PUT")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Key listing endpoints (read-only, leverage Merkle tree)
|
|
||||||
if s.config.ClusteringEnabled { // Require Merkle for efficiency
|
|
||||||
// _ls endpoint - require read if auth enabled and not anonymous
|
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
|
||||||
router.Handle("/kv/{path:.+}/_ls", s.authService.Middleware(
|
|
||||||
[]string{"read"}, nil, "",
|
|
||||||
)(s.getKeyListHandler)).Methods("GET")
|
|
||||||
} else {
|
|
||||||
router.HandleFunc("/kv/{path:.+}/_ls", s.getKeyListHandler).Methods("GET")
|
|
||||||
}
|
|
||||||
|
|
||||||
// _tree endpoint - same auth rules
|
|
||||||
if s.config.AuthEnabled && !s.config.AllowAnonymousRead {
|
|
||||||
router.Handle("/kv/{path:.+}/_tree", s.authService.Middleware(
|
|
||||||
[]string{"read"}, nil, "",
|
|
||||||
)(s.getKeyTreeHandler)).Methods("GET")
|
|
||||||
} else {
|
|
||||||
router.HandleFunc("/kv/{path:.+}/_tree", s.getKeyTreeHandler).Methods("GET")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Member endpoints (available when clustering is enabled)
|
// Member endpoints (available when clustering is enabled)
|
||||||
if s.config.ClusteringEnabled {
|
if s.config.ClusteringEnabled {
|
||||||
|
// GET /members/ is unprotected for monitoring/inspection
|
||||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
|
||||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
|
||||||
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
|
||||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
|
||||||
|
|
||||||
// Merkle Tree endpoints (clustering feature)
|
// Apply cluster authentication middleware to all cluster communication endpoints
|
||||||
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
if s.clusterAuthService != nil {
|
||||||
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
router.Handle("/members/join", s.clusterAuthService.Middleware(http.HandlerFunc(s.joinMemberHandler))).Methods("POST")
|
||||||
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
router.Handle("/members/leave", s.clusterAuthService.Middleware(http.HandlerFunc(s.leaveMemberHandler))).Methods("DELETE")
|
||||||
|
router.Handle("/members/gossip", s.clusterAuthService.Middleware(http.HandlerFunc(s.gossipHandler))).Methods("POST")
|
||||||
|
router.Handle("/members/pairs_by_time", s.clusterAuthService.Middleware(http.HandlerFunc(s.pairsByTimeHandler))).Methods("POST")
|
||||||
|
|
||||||
|
// Merkle Tree endpoints (clustering feature)
|
||||||
|
router.Handle("/merkle_tree/root", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleRootHandler))).Methods("GET")
|
||||||
|
router.Handle("/merkle_tree/diff", s.clusterAuthService.Middleware(http.HandlerFunc(s.getMerkleDiffHandler))).Methods("POST")
|
||||||
|
router.Handle("/kv_range", s.clusterAuthService.Middleware(http.HandlerFunc(s.getKVRangeHandler))).Methods("POST")
|
||||||
|
} else {
|
||||||
|
// Fallback to unprotected endpoints (for backwards compatibility)
|
||||||
|
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||||
|
router.HandleFunc("/members/gossip", s.gossipHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
||||||
|
|
||||||
|
// Merkle Tree endpoints (clustering feature)
|
||||||
|
router.HandleFunc("/merkle_tree/root", s.getMerkleRootHandler).Methods("GET")
|
||||||
|
router.HandleFunc("/merkle_tree/diff", s.getMerkleDiffHandler).Methods("POST")
|
||||||
|
router.HandleFunc("/kv_range", s.getKVRangeHandler).Methods("POST")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Authentication and user management endpoints (available when auth is enabled)
|
// Authentication and user management endpoints (available when auth is enabled)
|
||||||
@@ -93,15 +89,15 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/api/users", s.authService.Middleware(
|
router.Handle("/api/users", s.authService.Middleware(
|
||||||
[]string{"admin:users:create"}, nil, "",
|
[]string{"admin:users:create"}, nil, "",
|
||||||
)(s.createUserHandler)).Methods("POST")
|
)(s.createUserHandler)).Methods("POST")
|
||||||
|
|
||||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:users:read"}, nil, "",
|
[]string{"admin:users:read"}, nil, "",
|
||||||
)(s.getUserHandler)).Methods("GET")
|
)(s.getUserHandler)).Methods("GET")
|
||||||
|
|
||||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:users:update"}, nil, "",
|
[]string{"admin:users:update"}, nil, "",
|
||||||
)(s.updateUserHandler)).Methods("PUT")
|
)(s.updateUserHandler)).Methods("PUT")
|
||||||
|
|
||||||
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
router.Handle("/api/users/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:users:delete"}, nil, "",
|
[]string{"admin:users:delete"}, nil, "",
|
||||||
)(s.deleteUserHandler)).Methods("DELETE")
|
)(s.deleteUserHandler)).Methods("DELETE")
|
||||||
@@ -110,15 +106,15 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/api/groups", s.authService.Middleware(
|
router.Handle("/api/groups", s.authService.Middleware(
|
||||||
[]string{"admin:groups:create"}, nil, "",
|
[]string{"admin:groups:create"}, nil, "",
|
||||||
)(s.createGroupHandler)).Methods("POST")
|
)(s.createGroupHandler)).Methods("POST")
|
||||||
|
|
||||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:groups:read"}, nil, "",
|
[]string{"admin:groups:read"}, nil, "",
|
||||||
)(s.getGroupHandler)).Methods("GET")
|
)(s.getGroupHandler)).Methods("GET")
|
||||||
|
|
||||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:groups:update"}, nil, "",
|
[]string{"admin:groups:update"}, nil, "",
|
||||||
)(s.updateGroupHandler)).Methods("PUT")
|
)(s.updateGroupHandler)).Methods("PUT")
|
||||||
|
|
||||||
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
router.Handle("/api/groups/{uuid}", s.authService.Middleware(
|
||||||
[]string{"admin:groups:delete"}, nil, "",
|
[]string{"admin:groups:delete"}, nil, "",
|
||||||
)(s.deleteGroupHandler)).Methods("DELETE")
|
)(s.deleteGroupHandler)).Methods("DELETE")
|
||||||
@@ -127,6 +123,12 @@ func (s *Server) setupRoutes() *mux.Router {
|
|||||||
router.Handle("/api/tokens", s.authService.Middleware(
|
router.Handle("/api/tokens", s.authService.Middleware(
|
||||||
[]string{"admin:tokens:create"}, nil, "",
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
)(s.createTokenHandler)).Methods("POST")
|
)(s.createTokenHandler)).Methods("POST")
|
||||||
|
|
||||||
|
// Cluster Bootstrap endpoint (Issue #13) - Protected by JWT authentication
|
||||||
|
// Allows authenticated administrators to retrieve the cluster secret for new nodes
|
||||||
|
router.Handle("/auth/cluster-bootstrap", s.authService.Middleware(
|
||||||
|
[]string{"admin:tokens:create"}, nil, "",
|
||||||
|
)(s.clusterBootstrapHandler)).Methods("GET")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Revision History endpoints (available when revision history is enabled)
|
// Revision History endpoints (available when revision history is enabled)
|
||||||
|
@@ -50,7 +50,8 @@ type Server struct {
|
|||||||
backupMu sync.RWMutex // Protects backup status
|
backupMu sync.RWMutex // Protects backup status
|
||||||
|
|
||||||
// Authentication service
|
// Authentication service
|
||||||
authService *auth.AuthService
|
authService *auth.AuthService
|
||||||
|
clusterAuthService *auth.ClusterAuthService
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer initializes and returns a new Server instance
|
// NewServer initializes and returns a new Server instance
|
||||||
@@ -120,6 +121,11 @@ func NewServer(config *types.Config) (*Server, error) {
|
|||||||
// Initialize authentication service
|
// Initialize authentication service
|
||||||
server.authService = auth.NewAuthService(db, logger, config)
|
server.authService = auth.NewAuthService(db, logger, config)
|
||||||
|
|
||||||
|
// Initialize cluster authentication service (Issue #13)
|
||||||
|
if config.ClusteringEnabled {
|
||||||
|
server.clusterAuthService = auth.NewClusterAuthService(config.ClusterSecret, logger)
|
||||||
|
}
|
||||||
|
|
||||||
// Setup initial root account if needed (Issue #3)
|
// Setup initial root account if needed (Issue #3)
|
||||||
if config.AuthEnabled {
|
if config.AuthEnabled {
|
||||||
if err := server.setupRootAccount(); err != nil {
|
if err := server.setupRootAccount(); err != nil {
|
||||||
@@ -219,7 +225,7 @@ func (s *Server) setupRootAccount() error {
|
|||||||
func (s *Server) createRootUserAndToken() error {
|
func (s *Server) createRootUserAndToken() error {
|
||||||
rootNickname := "root"
|
rootNickname := "root"
|
||||||
adminGroupName := "admin"
|
adminGroupName := "admin"
|
||||||
|
|
||||||
// Generate UUIDs
|
// Generate UUIDs
|
||||||
rootUserUUID := "root-" + time.Now().Format("20060102-150405")
|
rootUserUUID := "root-" + time.Now().Format("20060102-150405")
|
||||||
adminGroupUUID := "admin-" + time.Now().Format("20060102-150405")
|
adminGroupUUID := "admin-" + time.Now().Format("20060102-150405")
|
||||||
@@ -234,7 +240,7 @@ func (s *Server) createRootUserAndToken() error {
|
|||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create root user
|
// Create root user
|
||||||
rootUser := types.User{
|
rootUser := types.User{
|
||||||
UUID: rootUserUUID,
|
UUID: rootUserUUID,
|
||||||
NicknameHash: hashUserNickname(rootNickname),
|
NicknameHash: hashUserNickname(rootNickname),
|
||||||
@@ -251,7 +257,7 @@ func (s *Server) createRootUserAndToken() error {
|
|||||||
// Create API token with full administrative scopes
|
// Create API token with full administrative scopes
|
||||||
adminScopes := []string{
|
adminScopes := []string{
|
||||||
"admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete",
|
"admin:users:create", "admin:users:read", "admin:users:update", "admin:users:delete",
|
||||||
"admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete",
|
"admin:groups:create", "admin:groups:read", "admin:groups:update", "admin:groups:delete",
|
||||||
"admin:tokens:create", "admin:tokens:revoke",
|
"admin:tokens:create", "admin:tokens:revoke",
|
||||||
"read", "write", "delete",
|
"read", "write", "delete",
|
||||||
}
|
}
|
||||||
@@ -269,13 +275,13 @@ func (s *Server) createRootUserAndToken() error {
|
|||||||
|
|
||||||
// Log the token securely (one-time display)
|
// Log the token securely (one-time display)
|
||||||
s.logger.WithFields(logrus.Fields{
|
s.logger.WithFields(logrus.Fields{
|
||||||
"user_uuid": rootUserUUID,
|
"user_uuid": rootUserUUID,
|
||||||
"group_uuid": adminGroupUUID,
|
"group_uuid": adminGroupUUID,
|
||||||
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
|
"expires_at": time.Unix(expiresAt, 0).Format(time.RFC3339),
|
||||||
"expires_in": "24 hours",
|
"expires_in": "24 hours",
|
||||||
}).Warn("Root account created - SAVE THIS TOKEN:")
|
}).Warn("Root account created - SAVE THIS TOKEN:")
|
||||||
|
|
||||||
// Display token prominently
|
// Display token prominently
|
||||||
fmt.Printf("\n" + strings.Repeat("=", 80) + "\n")
|
fmt.Printf("\n" + strings.Repeat("=", 80) + "\n")
|
||||||
fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n")
|
fmt.Printf("🔐 ROOT ACCOUNT CREATED - INITIAL SETUP TOKEN\n")
|
||||||
fmt.Printf("===========================================\n")
|
fmt.Printf("===========================================\n")
|
||||||
@@ -309,7 +315,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal user data: %v", err)
|
return fmt.Errorf("failed to marshal user data: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil {
|
if err := txn.Set([]byte(auth.UserStorageKey(user.UUID)), userData); err != nil {
|
||||||
return fmt.Errorf("failed to store user: %v", err)
|
return fmt.Errorf("failed to store user: %v", err)
|
||||||
}
|
}
|
||||||
@@ -319,7 +325,7 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal group data: %v", err)
|
return fmt.Errorf("failed to marshal group data: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil {
|
if err := txn.Set([]byte(auth.GroupStorageKey(group.UUID)), groupData); err != nil {
|
||||||
return fmt.Errorf("failed to store group: %v", err)
|
return fmt.Errorf("failed to store group: %v", err)
|
||||||
}
|
}
|
||||||
@@ -327,4 +333,3 @@ func (s *Server) storeUserAndGroup(user *types.User, group *types.Group) error {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,7 +2,7 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -57,4 +57,4 @@ func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, erro
|
|||||||
return nil, fmt.Errorf("decompressor not initialized")
|
return nil, fmt.Errorf("decompressor not initialized")
|
||||||
}
|
}
|
||||||
return c.decompressor.DecodeAll(compressedData, nil)
|
return c.decompressor.DecodeAll(compressedData, nil)
|
||||||
}
|
}
|
||||||
|
@@ -34,10 +34,10 @@ func GetRevisionKey(baseKey string, revision int) string {
|
|||||||
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
|
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
|
||||||
// Get existing metadata to check current revisions
|
// Get existing metadata to check current revisions
|
||||||
metadataKey := auth.ResourceMetadataKey(key)
|
metadataKey := auth.ResourceMetadataKey(key)
|
||||||
|
|
||||||
var metadata types.ResourceMetadata
|
var metadata types.ResourceMetadata
|
||||||
var currentRevisions []int
|
var currentRevisions []int
|
||||||
|
|
||||||
// Try to get existing metadata
|
// Try to get existing metadata
|
||||||
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
|
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
|
||||||
if err == badger.ErrKeyNotFound {
|
if err == badger.ErrKeyNotFound {
|
||||||
@@ -60,7 +60,7 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal metadata: %v", err)
|
return fmt.Errorf("failed to unmarshal metadata: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract current revisions (we store them as a custom field)
|
// Extract current revisions (we store them as a custom field)
|
||||||
if metadata.TTL == "" {
|
if metadata.TTL == "" {
|
||||||
currentRevisions = []int{}
|
currentRevisions = []int{}
|
||||||
@@ -69,13 +69,13 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
|||||||
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
|
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Revision rotation logic: shift existing revisions
|
// Revision rotation logic: shift existing revisions
|
||||||
if len(currentRevisions) >= 3 {
|
if len(currentRevisions) >= 3 {
|
||||||
// Delete oldest revision (rev:3)
|
// Delete oldest revision (rev:3)
|
||||||
oldestRevKey := GetRevisionKey(key, 3)
|
oldestRevKey := GetRevisionKey(key, 3)
|
||||||
txn.Delete([]byte(oldestRevKey))
|
txn.Delete([]byte(oldestRevKey))
|
||||||
|
|
||||||
// Shift rev:2 → rev:3
|
// Shift rev:2 → rev:3
|
||||||
rev2Key := GetRevisionKey(key, 2)
|
rev2Key := GetRevisionKey(key, 2)
|
||||||
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
|
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
|
||||||
@@ -83,8 +83,8 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
|||||||
rev3Key := GetRevisionKey(key, 3)
|
rev3Key := GetRevisionKey(key, 3)
|
||||||
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
|
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shift rev:1 → rev:2
|
// Shift rev:1 → rev:2
|
||||||
rev1Key := GetRevisionKey(key, 1)
|
rev1Key := GetRevisionKey(key, 1)
|
||||||
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
|
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -92,80 +92,80 @@ func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, stor
|
|||||||
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
|
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store current value as rev:1
|
// Store current value as rev:1
|
||||||
currentValueBytes, err := json.Marshal(storedValue)
|
currentValueBytes, err := json.Marshal(storedValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal current value for revision: %v", err)
|
return fmt.Errorf("failed to marshal current value for revision: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rev1Key := GetRevisionKey(key, 1)
|
rev1Key := GetRevisionKey(key, 1)
|
||||||
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
|
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to store revision 1: %v", err)
|
return fmt.Errorf("failed to store revision 1: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update metadata with new revision count
|
// Update metadata with new revision count
|
||||||
metadata.UpdatedAt = time.Now().Unix()
|
metadata.UpdatedAt = time.Now().Unix()
|
||||||
metadataBytes, err := json.Marshal(metadata)
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal metadata: %v", err)
|
return fmt.Errorf("failed to marshal metadata: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
|
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRevisionHistory retrieves all available revisions for a given key
|
// GetRevisionHistory retrieves all available revisions for a given key
|
||||||
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
|
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
|
||||||
var revisions []map[string]interface{}
|
var revisions []map[string]interface{}
|
||||||
|
|
||||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||||
// Check revisions 1, 2, 3
|
// Check revisions 1, 2, 3
|
||||||
for rev := 1; rev <= 3; rev++ {
|
for rev := 1; rev <= 3; rev++ {
|
||||||
revKey := GetRevisionKey(key, rev)
|
revKey := GetRevisionKey(key, rev)
|
||||||
|
|
||||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||||
if err == badger.ErrKeyNotFound {
|
if err == badger.ErrKeyNotFound {
|
||||||
continue // Skip missing revisions
|
continue // Skip missing revisions
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
|
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var storedValue types.StoredValue
|
var storedValue types.StoredValue
|
||||||
err = json.Unmarshal(revData, &storedValue)
|
err = json.Unmarshal(revData, &storedValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
|
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var data interface{}
|
var data interface{}
|
||||||
err = json.Unmarshal(storedValue.Data, &data)
|
err = json.Unmarshal(storedValue.Data, &data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
|
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
revision := map[string]interface{}{
|
revision := map[string]interface{}{
|
||||||
"revision": rev,
|
"revision": rev,
|
||||||
"uuid": storedValue.UUID,
|
"uuid": storedValue.UUID,
|
||||||
"timestamp": storedValue.Timestamp,
|
"timestamp": storedValue.Timestamp,
|
||||||
"data": data,
|
"data": data,
|
||||||
}
|
}
|
||||||
|
|
||||||
revisions = append(revisions, revision)
|
revisions = append(revisions, revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort revisions by revision number (newest first)
|
// Sort revisions by revision number (newest first)
|
||||||
// Note: they're already in order since we iterate 1->3, but reverse for newest first
|
// Note: they're already in order since we iterate 1->3, but reverse for newest first
|
||||||
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
|
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
|
||||||
revisions[i], revisions[j] = revisions[j], revisions[i]
|
revisions[i], revisions[j] = revisions[j], revisions[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
return revisions, nil
|
return revisions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,23 +174,23 @@ func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.
|
|||||||
if revision < 1 || revision > 3 {
|
if revision < 1 || revision > 3 {
|
||||||
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
|
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
|
||||||
}
|
}
|
||||||
|
|
||||||
var storedValue types.StoredValue
|
var storedValue types.StoredValue
|
||||||
err := r.storage.db.View(func(txn *badger.Txn) error {
|
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||||
revKey := GetRevisionKey(key, revision)
|
revKey := GetRevisionKey(key, revision)
|
||||||
|
|
||||||
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return json.Unmarshal(revData, &storedValue)
|
return json.Unmarshal(revData, &storedValue)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &storedValue, nil
|
return &storedValue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,15 +200,15 @@ func GetRevisionFromPath(path string) (string, int, error) {
|
|||||||
if len(parts) < 4 || parts[len(parts)-2] != "rev" {
|
if len(parts) < 4 || parts[len(parts)-2] != "rev" {
|
||||||
return "", 0, fmt.Errorf("invalid revision path format")
|
return "", 0, fmt.Errorf("invalid revision path format")
|
||||||
}
|
}
|
||||||
|
|
||||||
revisionStr := parts[len(parts)-1]
|
revisionStr := parts[len(parts)-1]
|
||||||
revision, err := strconv.Atoi(revisionStr)
|
revision, err := strconv.Atoi(revisionStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
|
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconstruct the base key without the "/rev/N" suffix
|
// Reconstruct the base key without the "/rev/N" suffix
|
||||||
baseKey := strings.Join(parts[:len(parts)-2], "/")
|
baseKey := strings.Join(parts[:len(parts)-2], "/")
|
||||||
|
|
||||||
return baseKey, revision, nil
|
return baseKey, revision, nil
|
||||||
}
|
}
|
||||||
|
@@ -12,17 +12,17 @@ import (
|
|||||||
|
|
||||||
// StorageService handles all BadgerDB operations and data management
|
// StorageService handles all BadgerDB operations and data management
|
||||||
type StorageService struct {
|
type StorageService struct {
|
||||||
db *badger.DB
|
db *badger.DB
|
||||||
config *types.Config
|
config *types.Config
|
||||||
compressionSvc *CompressionService
|
compressionSvc *CompressionService
|
||||||
logger *logrus.Logger
|
logger *logrus.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStorageService creates a new storage service
|
// NewStorageService creates a new storage service
|
||||||
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
|
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
|
||||||
var compressionSvc *CompressionService
|
var compressionSvc *CompressionService
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// Initialize compression if enabled
|
// Initialize compression if enabled
|
||||||
if config.CompressionEnabled {
|
if config.CompressionEnabled {
|
||||||
compressionSvc, err = NewCompressionService()
|
compressionSvc, err = NewCompressionService()
|
||||||
@@ -50,7 +50,7 @@ func (s *StorageService) Close() {
|
|||||||
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
|
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
|
||||||
var finalData []byte
|
var finalData []byte
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// Compress data if compression is enabled
|
// Compress data if compression is enabled
|
||||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||||
finalData, err = s.compressionSvc.CompressData(data)
|
finalData, err = s.compressionSvc.CompressData(data)
|
||||||
@@ -60,14 +60,14 @@ func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte,
|
|||||||
} else {
|
} else {
|
||||||
finalData = data
|
finalData = data
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := badger.NewEntry(key, finalData)
|
entry := badger.NewEntry(key, finalData)
|
||||||
|
|
||||||
// Apply TTL if specified
|
// Apply TTL if specified
|
||||||
if ttl > 0 {
|
if ttl > 0 {
|
||||||
entry = entry.WithTTL(ttl)
|
entry = entry.WithTTL(ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
return txn.SetEntry(entry)
|
return txn.SetEntry(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,7 +77,7 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var compressedData []byte
|
var compressedData []byte
|
||||||
err = item.Value(func(val []byte) error {
|
err = item.Value(func(val []byte) error {
|
||||||
compressedData = append(compressedData, val...)
|
compressedData = append(compressedData, val...)
|
||||||
@@ -86,12 +86,12 @@ func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decompress data if compression is enabled
|
// Decompress data if compression is enabled
|
||||||
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||||
return s.compressionSvc.DecompressData(compressedData)
|
return s.compressionSvc.DecompressData(compressedData)
|
||||||
}
|
}
|
||||||
|
|
||||||
return compressedData, nil
|
return compressedData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,4 +109,4 @@ func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) {
|
|||||||
return compressedData, nil
|
return compressedData, nil
|
||||||
}
|
}
|
||||||
return s.compressionSvc.DecompressData(compressedData)
|
return s.compressionSvc.DecompressData(compressedData)
|
||||||
}
|
}
|
||||||
|
151
types/types.go
151
types/types.go
@@ -13,20 +13,20 @@ type StoredValue struct {
|
|||||||
|
|
||||||
// User represents a system user
|
// User represents a system user
|
||||||
type User struct {
|
type User struct {
|
||||||
UUID string `json:"uuid"` // Server-generated UUID
|
UUID string `json:"uuid"` // Server-generated UUID
|
||||||
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
NicknameHash string `json:"nickname_hash"` // SHA3-512 hash of nickname
|
||||||
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
Groups []string `json:"groups"` // List of group UUIDs this user belongs to
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group represents a user group
|
// Group represents a user group
|
||||||
type Group struct {
|
type Group struct {
|
||||||
UUID string `json:"uuid"` // Server-generated UUID
|
UUID string `json:"uuid"` // Server-generated UUID
|
||||||
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
NameHash string `json:"name_hash"` // SHA3-512 hash of group name
|
||||||
Members []string `json:"members"` // List of user UUIDs in this group
|
Members []string `json:"members"` // List of user UUIDs in this group
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
CreatedAt int64 `json:"created_at"` // Unix timestamp
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// APIToken represents a JWT authentication token
|
// APIToken represents a JWT authentication token
|
||||||
@@ -40,12 +40,12 @@ type APIToken struct {
|
|||||||
|
|
||||||
// ResourceMetadata contains ownership and permission information for stored resources
|
// ResourceMetadata contains ownership and permission information for stored resources
|
||||||
type ResourceMetadata struct {
|
type ResourceMetadata struct {
|
||||||
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
OwnerUUID string `json:"owner_uuid"` // UUID of the resource owner
|
||||||
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
GroupUUID string `json:"group_uuid"` // UUID of the resource group
|
||||||
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
Permissions int `json:"permissions"` // 12-bit permission mask (POSIX-inspired)
|
||||||
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
TTL string `json:"ttl"` // Time-to-live duration (Go format)
|
||||||
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
CreatedAt int64 `json:"created_at"` // Unix timestamp when resource was created
|
||||||
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
UpdatedAt int64 `json:"updated_at"` // Unix timestamp when resource was last updated
|
||||||
}
|
}
|
||||||
|
|
||||||
// Permission constants for POSIX-inspired ACL
|
// Permission constants for POSIX-inspired ACL
|
||||||
@@ -55,19 +55,19 @@ const (
|
|||||||
PermOwnerDelete = 1 << 10
|
PermOwnerDelete = 1 << 10
|
||||||
PermOwnerWrite = 1 << 9
|
PermOwnerWrite = 1 << 9
|
||||||
PermOwnerRead = 1 << 8
|
PermOwnerRead = 1 << 8
|
||||||
|
|
||||||
// Group permissions (bits 7-4)
|
// Group permissions (bits 7-4)
|
||||||
PermGroupCreate = 1 << 7
|
PermGroupCreate = 1 << 7
|
||||||
PermGroupDelete = 1 << 6
|
PermGroupDelete = 1 << 6
|
||||||
PermGroupWrite = 1 << 5
|
PermGroupWrite = 1 << 5
|
||||||
PermGroupRead = 1 << 4
|
PermGroupRead = 1 << 4
|
||||||
|
|
||||||
// Others permissions (bits 3-0)
|
// Others permissions (bits 3-0)
|
||||||
PermOthersCreate = 1 << 3
|
PermOthersCreate = 1 << 3
|
||||||
PermOthersDelete = 1 << 2
|
PermOthersDelete = 1 << 2
|
||||||
PermOthersWrite = 1 << 1
|
PermOthersWrite = 1 << 1
|
||||||
PermOthersRead = 1 << 0
|
PermOthersRead = 1 << 0
|
||||||
|
|
||||||
// Default permissions: Owner(1111), Group(0110), Others(0010)
|
// Default permissions: Owner(1111), Group(0110), Others(0010)
|
||||||
DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) |
|
DefaultPermissions = (PermOwnerCreate | PermOwnerDelete | PermOwnerWrite | PermOwnerRead) |
|
||||||
(PermGroupWrite | PermGroupRead) |
|
(PermGroupWrite | PermGroupRead) |
|
||||||
@@ -131,14 +131,7 @@ type CreateTokenResponse struct {
|
|||||||
ExpiresAt int64 `json:"expires_at"`
|
ExpiresAt int64 `json:"expires_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource Metadata Management API structures
|
// Resource Metadata Management API structures (Issue #12)
|
||||||
type UpdateResourceMetadataRequest struct {
|
|
||||||
OwnerUUID string `json:"owner_uuid,omitempty"`
|
|
||||||
GroupUUID string `json:"group_uuid,omitempty"`
|
|
||||||
Permissions int `json:"permissions,omitempty"`
|
|
||||||
TTL string `json:"ttl,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type GetResourceMetadataResponse struct {
|
type GetResourceMetadataResponse struct {
|
||||||
OwnerUUID string `json:"owner_uuid"`
|
OwnerUUID string `json:"owner_uuid"`
|
||||||
GroupUUID string `json:"group_uuid"`
|
GroupUUID string `json:"group_uuid"`
|
||||||
@@ -148,6 +141,12 @@ type GetResourceMetadataResponse struct {
|
|||||||
UpdatedAt int64 `json:"updated_at"`
|
UpdatedAt int64 `json:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type UpdateResourceMetadataRequest struct {
|
||||||
|
OwnerUUID *string `json:"owner_uuid,omitempty"`
|
||||||
|
GroupUUID *string `json:"group_uuid,omitempty"`
|
||||||
|
Permissions *int `json:"permissions,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// Cluster and member management types
|
// Cluster and member management types
|
||||||
type Member struct {
|
type Member struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
@@ -232,38 +231,6 @@ type MerkleTreeDiffResponse struct {
|
|||||||
Keys []string `json:"keys,omitempty"` // Actual keys if this is a leaf-level diff
|
Keys []string `json:"keys,omitempty"` // Actual keys if this is a leaf-level diff
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyListResponse is the response for _ls endpoint
|
|
||||||
type KeyListResponse struct {
|
|
||||||
Path string `json:"path"`
|
|
||||||
Children []struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
} `json:"children"`
|
|
||||||
Total int `json:"total"`
|
|
||||||
Truncated bool `json:"truncated"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyTreeResponse is the response for _tree endpoint
|
|
||||||
type KeyTreeResponse struct {
|
|
||||||
Path string `json:"path"`
|
|
||||||
Children []interface{} `json:"children"` // Mixed: either KeyTreeNode or KeyListItem for leaves
|
|
||||||
Total int `json:"total"`
|
|
||||||
Truncated bool `json:"truncated"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyTreeNode represents a node in the tree
|
|
||||||
type KeyTreeNode struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
Children []interface{} `json:"children,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyListItem represents a leaf in the tree (without children)
|
|
||||||
type KeyListItem struct {
|
|
||||||
Subkey string `json:"subkey"`
|
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// For fetching a range of KV pairs
|
// For fetching a range of KV pairs
|
||||||
type KVRangeRequest struct {
|
type KVRangeRequest struct {
|
||||||
StartKey string `json:"start_key"`
|
StartKey string `json:"start_key"`
|
||||||
@@ -280,53 +247,57 @@ type KVRangeResponse struct {
|
|||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
NodeID string `yaml:"node_id"`
|
NodeID string `yaml:"node_id"`
|
||||||
BindAddress string `yaml:"bind_address"`
|
BindAddress string `yaml:"bind_address"`
|
||||||
Port int `yaml:"port"`
|
Port int `yaml:"port"`
|
||||||
DataDir string `yaml:"data_dir"`
|
DataDir string `yaml:"data_dir"`
|
||||||
SeedNodes []string `yaml:"seed_nodes"`
|
SeedNodes []string `yaml:"seed_nodes"`
|
||||||
ReadOnly bool `yaml:"read_only"`
|
ReadOnly bool `yaml:"read_only"`
|
||||||
LogLevel string `yaml:"log_level"`
|
LogLevel string `yaml:"log_level"`
|
||||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||||
SyncInterval int `yaml:"sync_interval"`
|
SyncInterval int `yaml:"sync_interval"`
|
||||||
CatchupInterval int `yaml:"catchup_interval"`
|
CatchupInterval int `yaml:"catchup_interval"`
|
||||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||||
|
|
||||||
// Database compression configuration
|
// Database compression configuration
|
||||||
CompressionEnabled bool `yaml:"compression_enabled"`
|
CompressionEnabled bool `yaml:"compression_enabled"`
|
||||||
CompressionLevel int `yaml:"compression_level"`
|
CompressionLevel int `yaml:"compression_level"`
|
||||||
|
|
||||||
// TTL configuration
|
// TTL configuration
|
||||||
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
DefaultTTL string `yaml:"default_ttl"` // Go duration format, "0" means no default TTL
|
||||||
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
MaxJSONSize int `yaml:"max_json_size"` // Maximum JSON size in bytes
|
||||||
|
|
||||||
// Rate limiting configuration
|
// Rate limiting configuration
|
||||||
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
RateLimitRequests int `yaml:"rate_limit_requests"` // Max requests per window
|
||||||
RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format)
|
RateLimitWindow string `yaml:"rate_limit_window"` // Window duration (Go format)
|
||||||
|
|
||||||
// Tamper-evident logging configuration
|
// Tamper-evident logging configuration
|
||||||
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
TamperLogActions []string `yaml:"tamper_log_actions"` // Actions to log
|
||||||
|
|
||||||
// Backup system configuration
|
// Backup system configuration
|
||||||
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
BackupEnabled bool `yaml:"backup_enabled"` // Enable/disable automated backups
|
||||||
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
BackupSchedule string `yaml:"backup_schedule"` // Cron schedule format
|
||||||
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
BackupPath string `yaml:"backup_path"` // Directory to store backups
|
||||||
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
BackupRetention int `yaml:"backup_retention"` // Days to keep backups
|
||||||
|
|
||||||
// Feature toggles for optional functionalities
|
// Feature toggles for optional functionalities
|
||||||
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
AuthEnabled bool `yaml:"auth_enabled"` // Enable/disable authentication system
|
||||||
TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging
|
TamperLoggingEnabled bool `yaml:"tamper_logging_enabled"` // Enable/disable tamper-evident logging
|
||||||
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
|
ClusteringEnabled bool `yaml:"clustering_enabled"` // Enable/disable clustering/gossip
|
||||||
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
|
RateLimitingEnabled bool `yaml:"rate_limiting_enabled"` // Enable/disable rate limiting
|
||||||
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
RevisionHistoryEnabled bool `yaml:"revision_history_enabled"` // Enable/disable revision history
|
||||||
|
|
||||||
// Anonymous access control (Issue #5)
|
|
||||||
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
|
||||||
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
|
||||||
|
|
||||||
// Key listing configuration
|
// Anonymous access control (Issue #5)
|
||||||
KeyListingEnabled bool `yaml:"key_listing_enabled"` // Enable/disable hierarchical key listing
|
AllowAnonymousRead bool `yaml:"allow_anonymous_read"` // Allow unauthenticated read access to KV endpoints
|
||||||
|
AllowAnonymousWrite bool `yaml:"allow_anonymous_write"` // Allow unauthenticated write access to KV endpoints
|
||||||
|
|
||||||
|
// Cluster authentication (Issue #13)
|
||||||
|
ClusterSecret string `yaml:"cluster_secret"` // Shared secret for cluster authentication (auto-generated if empty)
|
||||||
|
ClusterTLSEnabled bool `yaml:"cluster_tls_enabled"` // Require TLS for inter-node communication
|
||||||
|
ClusterTLSCertFile string `yaml:"cluster_tls_cert_file"` // Path to TLS certificate file
|
||||||
|
ClusterTLSKeyFile string `yaml:"cluster_tls_key_file"` // Path to TLS private key file
|
||||||
|
ClusterTLSSkipVerify bool `yaml:"cluster_tls_skip_verify"` // Skip TLS verification (insecure, for testing only)
|
||||||
}
|
}
|
||||||
|
@@ -22,4 +22,4 @@ func HashGroupName(groupname string) string {
|
|||||||
|
|
||||||
func HashToken(token string) string {
|
func HashToken(token string) string {
|
||||||
return HashSHA3512(token)
|
return HashSHA3512(token)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user