Files
kalzu-value-store/server/server.go
2025-09-20 21:47:49 +03:00

282 lines
8.0 KiB
Go

package server
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"encoding/json"
"github.com/dgraph-io/badger/v4"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/google/uuid"
"kvs/auth"
"kvs/cluster"
"kvs/storage"
"kvs/types"
"kvs/utils"
)
// Server represents the KVS node
type Server struct {
config *types.Config
db *badger.DB
mode string // "normal", "read-only", "syncing"
modeMu sync.RWMutex
logger *logrus.Logger
httpServer *http.Server
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Cluster services
gossipService *cluster.GossipService
syncService *cluster.SyncService
merkleService *cluster.MerkleService
bootstrapService *cluster.BootstrapService
// Storage services
storageService *storage.StorageService
revisionService *storage.RevisionService
// Backup system
cronScheduler *cron.Cron // Cron scheduler for backups
backupStatus types.BackupStatus // Current backup status
backupMu sync.RWMutex // Protects backup status
// Authentication service
authService *auth.AuthService
}
// NewServer initializes and returns a new Server instance
func NewServer(config *types.Config) (*Server, error) {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
level, err := logrus.ParseLevel(config.LogLevel)
if err != nil {
level = logrus.InfoLevel
}
logger.SetLevel(level)
// Create data directory
if err := os.MkdirAll(config.DataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory: %v", err)
}
// Open BadgerDB
opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger"))
opts.Logger = nil // Disable badger's internal logging
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open BadgerDB: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
// Initialize cluster services
merkleService := cluster.NewMerkleService(db, logger)
gossipService := cluster.NewGossipService(config, logger)
syncService := cluster.NewSyncService(db, config, gossipService, merkleService, logger)
var server *Server // Forward declaration
bootstrapService := cluster.NewBootstrapService(config, gossipService, syncService, logger, func(mode string) {
if server != nil {
server.setMode(mode)
}
})
server = &Server{
config: config,
db: db,
mode: "normal",
logger: logger,
ctx: ctx,
cancel: cancel,
gossipService: gossipService,
syncService: syncService,
merkleService: merkleService,
bootstrapService: bootstrapService,
}
if config.ReadOnly {
server.setMode("read-only")
}
// Initialize storage services
storageService, err := storage.NewStorageService(db, config, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize storage service: %v", err)
}
server.storageService = storageService
// Initialize revision service
server.revisionService = storage.NewRevisionService(storageService)
// Initialize authentication service
server.authService = auth.NewAuthService(db, logger)
// New: Initial root account setup for empty DB with no seeds
if len(config.SeedNodes) == 0 {
hasUsers, err := server.authService.HasUsers()
if err != nil {
return nil, fmt.Errorf("failed to check for existing users: %v", err)
}
if !hasUsers {
server.logger.Info("Detected empty database with no seed nodes; creating initial root account")
now := time.Now().Unix()
// Create admin group
adminGroupUUID := uuid.NewString()
hashedGroupName := utils.HashGroupName("admin")
adminGroup := types.Group{
UUID: adminGroupUUID,
NameHash: hashedGroupName,
Members: []string{},
CreatedAt: now,
UpdatedAt: now,
}
groupData, err := json.Marshal(adminGroup)
if err != nil {
return nil, fmt.Errorf("failed to marshal admin group: %v", err)
}
err = db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(auth.GroupStorageKey(adminGroupUUID)), groupData)
})
if err != nil {
return nil, fmt.Errorf("failed to store admin group: %v", err)
}
// Create root user
rootUUID := uuid.NewString()
hashedNickname := utils.HashUserNickname("root")
rootUser := types.User{
UUID: rootUUID,
NicknameHash: hashedNickname,
Groups: []string{adminGroupUUID},
CreatedAt: now,
UpdatedAt: now,
}
userData, err := json.Marshal(rootUser)
if err != nil {
return nil, fmt.Errorf("failed to marshal root user: %v", err)
}
err = db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(auth.UserStorageKey(rootUUID)), userData)
})
if err != nil {
return nil, fmt.Errorf("failed to store root user: %v", err)
}
// Update admin group to include root user (bidirectional)
adminGroup.Members = append(adminGroup.Members, rootUUID)
groupData, err = json.Marshal(adminGroup)
if err != nil {
return nil, fmt.Errorf("failed to marshal updated admin group: %v", err)
}
err = db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(auth.GroupStorageKey(adminGroupUUID)), groupData)
})
if err != nil {
return nil, fmt.Errorf("failed to update admin group: %v", err)
}
// Generate and store API token
scopes := []string{"admin", "read", "write", "create", "delete"}
expirationHours := 8760 // 1 year
tokenString, expiresAt, err := auth.GenerateJWT(rootUUID, scopes, expirationHours)
if err != nil {
return nil, fmt.Errorf("failed to generate JWT: %v", err)
}
err = server.authService.StoreAPIToken(tokenString, rootUUID, scopes, expiresAt)
if err != nil {
return nil, fmt.Errorf("failed to store API token: %v", err)
}
// Log the details securely (only once, to stderr)
fmt.Fprintf(os.Stderr, `
***************************************************************************
WARNING: Initial root user created for new server instance.
Save this information securely—it will not be shown again.
Root User UUID: %s
API Token (Bearer): %s
Expires At: %s (UTC)
Use this token for authentication in API requests. Change or revoke it immediately via the API for security.
***************************************************************************
`, rootUUID, tokenString, time.Unix(expiresAt, 0).UTC())
}
}
// Initialize Merkle tree using cluster service
if err := server.syncService.InitializeMerkleTree(); err != nil {
return nil, fmt.Errorf("failed to initialize Merkle tree: %v", err)
}
return server, nil
}
// getMode returns the current server mode
func (s *Server) getMode() string {
s.modeMu.RLock()
defer s.modeMu.RUnlock()
return s.mode
}
// setMode sets the server mode
func (s *Server) setMode(mode string) {
s.modeMu.Lock()
defer s.modeMu.Unlock()
oldMode := s.mode
s.mode = mode
s.logger.WithFields(logrus.Fields{
"old_mode": oldMode,
"new_mode": mode,
}).Info("Mode changed")
}
// addMember adds a member using cluster service
func (s *Server) addMember(member *types.Member) {
s.gossipService.AddMember(member)
}
// removeMember removes a member using cluster service
func (s *Server) removeMember(nodeID string) {
s.gossipService.RemoveMember(nodeID)
}
// getMembers returns all cluster members
func (s *Server) getMembers() []*types.Member {
return s.gossipService.GetMembers()
}
// getJoinedTimestamp returns this node's joined timestamp (startup time)
func (s *Server) getJoinedTimestamp() int64 {
// For now, use a simple approach - this should be stored persistently
return time.Now().UnixMilli()
}
// getBackupStatus returns the current backup status
func (s *Server) getBackupStatus() types.BackupStatus {
s.backupMu.RLock()
defer s.backupMu.RUnlock()
status := s.backupStatus
// Calculate next backup time if scheduler is running
if s.cronScheduler != nil && len(s.cronScheduler.Entries()) > 0 {
nextRun := s.cronScheduler.Entries()[0].Next
if !nextRun.IsZero() {
status.NextBackupTime = nextRun.Unix()
}
}
return status
}