forked from ryyst/kalzu-value-store
271 lines
7.8 KiB
Go
271 lines
7.8 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
"encoding/json"
|
|
|
|
"github.com/dgraph-io/badger/v4"
|
|
"github.com/robfig/cron/v3"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/google/uuid"
|
|
|
|
"kvs/auth"
|
|
"kvs/cluster"
|
|
"kvs/storage"
|
|
"kvs/types"
|
|
"kvs/utils"
|
|
)
|
|
|
|
// Server represents the KVS node
|
|
type Server struct {
|
|
config *types.Config
|
|
db *badger.DB
|
|
mode string // "normal", "read-only", "syncing"
|
|
modeMu sync.RWMutex
|
|
logger *logrus.Logger
|
|
httpServer *http.Server
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
|
|
// Cluster services
|
|
gossipService *cluster.GossipService
|
|
syncService *cluster.SyncService
|
|
merkleService *cluster.MerkleService
|
|
bootstrapService *cluster.BootstrapService
|
|
|
|
// Storage services
|
|
storageService *storage.StorageService
|
|
revisionService *storage.RevisionService
|
|
|
|
// Backup system
|
|
cronScheduler *cron.Cron // Cron scheduler for backups
|
|
backupStatus types.BackupStatus // Current backup status
|
|
backupMu sync.RWMutex // Protects backup status
|
|
|
|
// Authentication service
|
|
authService *auth.AuthService
|
|
}
|
|
|
|
// NewServer initializes and returns a new Server instance
|
|
func NewServer(config *types.Config) (*Server, error) {
|
|
logger := logrus.New()
|
|
logger.SetFormatter(&logrus.JSONFormatter{})
|
|
|
|
level, err := logrus.ParseLevel(config.LogLevel)
|
|
if err != nil {
|
|
level = logrus.InfoLevel
|
|
}
|
|
logger.SetLevel(level)
|
|
|
|
// Create data directory
|
|
if err := os.MkdirAll(config.DataDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create data directory: %v", err)
|
|
}
|
|
|
|
// Open BadgerDB
|
|
opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger"))
|
|
opts.Logger = nil // Disable badger's internal logging
|
|
db, err := badger.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open BadgerDB: %v", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Initialize cluster services
|
|
merkleService := cluster.NewMerkleService(db, logger)
|
|
gossipService := cluster.NewGossipService(config, logger)
|
|
syncService := cluster.NewSyncService(db, config, gossipService, merkleService, logger)
|
|
var server *Server // Forward declaration
|
|
bootstrapService := cluster.NewBootstrapService(config, gossipService, syncService, logger, func(mode string) {
|
|
if server != nil {
|
|
server.setMode(mode)
|
|
}
|
|
})
|
|
|
|
server = &Server{
|
|
config: config,
|
|
db: db,
|
|
mode: "normal",
|
|
logger: logger,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
gossipService: gossipService,
|
|
syncService: syncService,
|
|
merkleService: merkleService,
|
|
bootstrapService: bootstrapService,
|
|
}
|
|
|
|
if config.ReadOnly {
|
|
server.setMode("read-only")
|
|
}
|
|
|
|
// Initialize storage services
|
|
storageService, err := storage.NewStorageService(db, config, logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize storage service: %v", err)
|
|
}
|
|
server.storageService = storageService
|
|
|
|
// Initialize revision service
|
|
server.revisionService = storage.NewRevisionService(storageService)
|
|
|
|
// Initialize authentication service
|
|
server.authService = auth.NewAuthService(db, logger)
|
|
|
|
// New: Initial root account setup for empty DB with no seeds
|
|
if len(config.SeedNodes) == 0 {
|
|
hasUsers, err := server.authService.HasUsers()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to check for existing users: %v", err)
|
|
}
|
|
if !hasUsers {
|
|
server.logger.Info("Detected empty database with no seed nodes; creating initial root account")
|
|
|
|
now := time.Now().Unix()
|
|
|
|
// Create admin group
|
|
adminGroupUUID := uuid.NewString()
|
|
hashedGroupName := utils.HashGroupName("admin") // Adjust if function name differs
|
|
adminGroup := types.Group{
|
|
UUID: adminGroupUUID,
|
|
Name: hashedGroupName,
|
|
CreatedAt: now, // Add if field exists; remove otherwise
|
|
// Members: []string{}, // Add if needed; e.g., add root later
|
|
}
|
|
groupData, err := json.Marshal(adminGroup)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal admin group: %v", err)
|
|
}
|
|
err = db.Update(func(txn *badger.Txn) error {
|
|
return txn.Set([]byte(GroupStorageKey(adminGroupUUID)), groupData)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to store admin group: %v", err)
|
|
}
|
|
|
|
// Create root user
|
|
rootUUID := uuid.NewString()
|
|
hashedNickname := utils.HashUserNickname("root") // Adjust if function name differs
|
|
rootUser := types.User{
|
|
UUID: rootUUID,
|
|
Nickname: hashedNickname,
|
|
Groups: []string{adminGroupUUID},
|
|
CreatedAt: now, // Add if field exists; remove otherwise
|
|
}
|
|
userData, err := json.Marshal(rootUser)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal root user: %v", err)
|
|
}
|
|
err = db.Update(func(txn *badger.Txn) error {
|
|
return txn.Set([]byte(UserStorageKey(rootUUID)), userData)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to store root user: %v", err)
|
|
}
|
|
|
|
// Optionally update group members if bidirectional
|
|
// adminGroup.Members = append(adminGroup.Members, rootUUID)
|
|
// Update group in DB if needed...
|
|
|
|
// Generate and store API token
|
|
scopes := []string{"admin", "read", "write", "create", "delete"}
|
|
expirationHours := 8760 // 1 year
|
|
tokenString, expiresAt, err := auth.GenerateJWT(rootUUID, scopes, expirationHours)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate JWT: %v", err)
|
|
}
|
|
err = server.authService.StoreAPIToken(tokenString, rootUUID, scopes, expiresAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to store API token: %v", err)
|
|
}
|
|
|
|
// Log the details securely (only once, to stderr)
|
|
fmt.Fprintf(os.Stderr, `
|
|
***************************************************************************
|
|
WARNING: Initial root user created for new server instance.
|
|
Save this information securely—it will not be shown again.
|
|
|
|
Root User UUID: %s
|
|
API Token (Bearer): %s
|
|
Expires At: %s (UTC)
|
|
|
|
Use this token for authentication in API requests. Change or revoke it immediately via the API for security.
|
|
***************************************************************************
|
|
`, rootUUID, tokenString, time.Unix(expiresAt, 0).UTC())
|
|
}
|
|
}
|
|
|
|
// Initialize Merkle tree using cluster service
|
|
if err := server.syncService.InitializeMerkleTree(); err != nil {
|
|
return nil, fmt.Errorf("failed to initialize Merkle tree: %v", err)
|
|
}
|
|
|
|
return server, nil
|
|
}
|
|
|
|
// getMode returns the current server mode
|
|
func (s *Server) getMode() string {
|
|
s.modeMu.RLock()
|
|
defer s.modeMu.RUnlock()
|
|
return s.mode
|
|
}
|
|
|
|
// setMode sets the server mode
|
|
func (s *Server) setMode(mode string) {
|
|
s.modeMu.Lock()
|
|
defer s.modeMu.Unlock()
|
|
oldMode := s.mode
|
|
s.mode = mode
|
|
s.logger.WithFields(logrus.Fields{
|
|
"old_mode": oldMode,
|
|
"new_mode": mode,
|
|
}).Info("Mode changed")
|
|
}
|
|
|
|
// addMember adds a member using cluster service
|
|
func (s *Server) addMember(member *types.Member) {
|
|
s.gossipService.AddMember(member)
|
|
}
|
|
|
|
// removeMember removes a member using cluster service
|
|
func (s *Server) removeMember(nodeID string) {
|
|
s.gossipService.RemoveMember(nodeID)
|
|
}
|
|
|
|
// getMembers returns all cluster members
|
|
func (s *Server) getMembers() []*types.Member {
|
|
return s.gossipService.GetMembers()
|
|
}
|
|
|
|
// getJoinedTimestamp returns this node's joined timestamp (startup time)
|
|
func (s *Server) getJoinedTimestamp() int64 {
|
|
// For now, use a simple approach - this should be stored persistently
|
|
return time.Now().UnixMilli()
|
|
}
|
|
|
|
// getBackupStatus returns the current backup status
|
|
func (s *Server) getBackupStatus() types.BackupStatus {
|
|
s.backupMu.RLock()
|
|
defer s.backupMu.RUnlock()
|
|
|
|
status := s.backupStatus
|
|
|
|
// Calculate next backup time if scheduler is running
|
|
if s.cronScheduler != nil && len(s.cronScheduler.Entries()) > 0 {
|
|
nextRun := s.cronScheduler.Entries()[0].Next
|
|
if !nextRun.IsZero() {
|
|
status.NextBackupTime = nextRun.Unix()
|
|
}
|
|
}
|
|
|
|
return status
|
|
}
|