forked from ryyst/kalzu-value-store
Initial KVS implementation with core functionality
- Go module setup with BadgerDB, Gorilla Mux, Logrus, UUID, and YAML - Core data structures for distributed key-value store - HTTP REST API with /kv/ endpoints (GET, PUT, DELETE) - Member management endpoints (/members/) - Timestamp indexing for efficient time-based queries - YAML configuration with auto-generation - Structured JSON logging with configurable levels - Operational modes (normal, read-only, syncing) - Basic health check endpoint - Graceful shutdown handling Tested basic functionality - all core endpoints working correctly. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
701
main.go
Normal file
701
main.go
Normal file
@ -0,0 +1,701 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Core data structures
|
||||
type StoredValue struct {
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
type Member struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
LastSeen int64 `json:"last_seen"`
|
||||
JoinedTimestamp int64 `json:"joined_timestamp"`
|
||||
}
|
||||
|
||||
type JoinRequest struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
JoinedTimestamp int64 `json:"joined_timestamp"`
|
||||
}
|
||||
|
||||
type LeaveRequest struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
type PairsByTimeRequest struct {
|
||||
StartTimestamp int64 `json:"start_timestamp"`
|
||||
EndTimestamp int64 `json:"end_timestamp"`
|
||||
Limit int `json:"limit"`
|
||||
Prefix string `json:"prefix,omitempty"`
|
||||
}
|
||||
|
||||
type PairsByTimeResponse struct {
|
||||
Path string `json:"path"`
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
type PutResponse struct {
|
||||
UUID string `json:"uuid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Configuration
|
||||
type Config struct {
|
||||
NodeID string `yaml:"node_id"`
|
||||
BindAddress string `yaml:"bind_address"`
|
||||
Port int `yaml:"port"`
|
||||
DataDir string `yaml:"data_dir"`
|
||||
SeedNodes []string `yaml:"seed_nodes"`
|
||||
ReadOnly bool `yaml:"read_only"`
|
||||
LogLevel string `yaml:"log_level"`
|
||||
GossipIntervalMin int `yaml:"gossip_interval_min"`
|
||||
GossipIntervalMax int `yaml:"gossip_interval_max"`
|
||||
SyncInterval int `yaml:"sync_interval"`
|
||||
CatchupInterval int `yaml:"catchup_interval"`
|
||||
BootstrapMaxAgeHours int `yaml:"bootstrap_max_age_hours"`
|
||||
ThrottleDelayMs int `yaml:"throttle_delay_ms"`
|
||||
FetchDelayMs int `yaml:"fetch_delay_ms"`
|
||||
}
|
||||
|
||||
// Server represents the KVS node
|
||||
type Server struct {
|
||||
config *Config
|
||||
db *badger.DB
|
||||
members map[string]*Member
|
||||
membersMu sync.RWMutex
|
||||
mode string // "normal", "read-only", "syncing"
|
||||
modeMu sync.RWMutex
|
||||
logger *logrus.Logger
|
||||
httpServer *http.Server
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// Default configuration
|
||||
func defaultConfig() *Config {
|
||||
hostname, _ := os.Hostname()
|
||||
return &Config{
|
||||
NodeID: hostname,
|
||||
BindAddress: "127.0.0.1",
|
||||
Port: 8080,
|
||||
DataDir: "./data",
|
||||
SeedNodes: []string{},
|
||||
ReadOnly: false,
|
||||
LogLevel: "info",
|
||||
GossipIntervalMin: 60, // 1 minute
|
||||
GossipIntervalMax: 120, // 2 minutes
|
||||
SyncInterval: 300, // 5 minutes
|
||||
CatchupInterval: 120, // 2 minutes
|
||||
BootstrapMaxAgeHours: 720, // 30 days
|
||||
ThrottleDelayMs: 100,
|
||||
FetchDelayMs: 50,
|
||||
}
|
||||
}
|
||||
|
||||
// Load configuration from file or create default
|
||||
func loadConfig(configPath string) (*Config, error) {
|
||||
config := defaultConfig()
|
||||
|
||||
if _, err := os.Stat(configPath); os.IsNotExist(err) {
|
||||
// Create default config file
|
||||
if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create config directory: %v", err)
|
||||
}
|
||||
|
||||
data, err := yaml.Marshal(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal default config: %v", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(configPath, data, 0644); err != nil {
|
||||
return nil, fmt.Errorf("failed to write default config: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Created default configuration at %s\n", configPath)
|
||||
return config, nil
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read config file: %v", err)
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(data, config); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse config file: %v", err)
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// Initialize server
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
logger := logrus.New()
|
||||
logger.SetFormatter(&logrus.JSONFormatter{})
|
||||
|
||||
level, err := logrus.ParseLevel(config.LogLevel)
|
||||
if err != nil {
|
||||
level = logrus.InfoLevel
|
||||
}
|
||||
logger.SetLevel(level)
|
||||
|
||||
// Create data directory
|
||||
if err := os.MkdirAll(config.DataDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create data directory: %v", err)
|
||||
}
|
||||
|
||||
// Open BadgerDB
|
||||
opts := badger.DefaultOptions(filepath.Join(config.DataDir, "badger"))
|
||||
opts.Logger = nil // Disable badger's internal logging
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open BadgerDB: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
server := &Server{
|
||||
config: config,
|
||||
db: db,
|
||||
members: make(map[string]*Member),
|
||||
mode: "normal",
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
if config.ReadOnly {
|
||||
server.setMode("read-only")
|
||||
}
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
// Mode management
|
||||
func (s *Server) getMode() string {
|
||||
s.modeMu.RLock()
|
||||
defer s.modeMu.RUnlock()
|
||||
return s.mode
|
||||
}
|
||||
|
||||
func (s *Server) setMode(mode string) {
|
||||
s.modeMu.Lock()
|
||||
defer s.modeMu.Unlock()
|
||||
oldMode := s.mode
|
||||
s.mode = mode
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"old_mode": oldMode,
|
||||
"new_mode": mode,
|
||||
}).Info("Mode changed")
|
||||
}
|
||||
|
||||
// Member management
|
||||
func (s *Server) addMember(member *Member) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
s.members[member.ID] = member
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": member.ID,
|
||||
"address": member.Address,
|
||||
}).Info("Member added")
|
||||
}
|
||||
|
||||
func (s *Server) removeMember(nodeID string) {
|
||||
s.membersMu.Lock()
|
||||
defer s.membersMu.Unlock()
|
||||
if member, exists := s.members[nodeID]; exists {
|
||||
delete(s.members, nodeID)
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": member.ID,
|
||||
"address": member.Address,
|
||||
}).Info("Member removed")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) getMembers() []*Member {
|
||||
s.membersMu.RLock()
|
||||
defer s.membersMu.RUnlock()
|
||||
members := make([]*Member, 0, len(s.members))
|
||||
for _, member := range s.members {
|
||||
members = append(members, member)
|
||||
}
|
||||
return members
|
||||
}
|
||||
|
||||
// HTTP Handlers
|
||||
func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
mode := s.getMode()
|
||||
memberCount := len(s.getMembers())
|
||||
|
||||
health := map[string]interface{}{
|
||||
"status": "ok",
|
||||
"mode": mode,
|
||||
"member_count": memberCount,
|
||||
"node_id": s.config.NodeID,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(health)
|
||||
}
|
||||
|
||||
func (s *Server) getKVHandler(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
path := vars["path"]
|
||||
|
||||
var storedValue StoredValue
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
item, err := txn.Get([]byte(path))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &storedValue)
|
||||
})
|
||||
})
|
||||
|
||||
if err == badger.ErrKeyNotFound {
|
||||
http.Error(w, "Not Found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("path", path).Error("Failed to get value")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(storedValue.Data)
|
||||
}
|
||||
|
||||
func (s *Server) putKVHandler(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
path := vars["path"]
|
||||
|
||||
mode := s.getMode()
|
||||
if mode == "syncing" {
|
||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) {
|
||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
var data json.RawMessage
|
||||
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
|
||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
newUUID := uuid.New().String()
|
||||
|
||||
storedValue := StoredValue{
|
||||
UUID: newUUID,
|
||||
Timestamp: now,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
valueBytes, err := json.Marshal(storedValue)
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to marshal stored value")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var isUpdate bool
|
||||
err = s.db.Update(func(txn *badger.Txn) error {
|
||||
// Check if key exists
|
||||
_, err := txn.Get([]byte(path))
|
||||
isUpdate = (err == nil)
|
||||
|
||||
// Store main data
|
||||
if err := txn.Set([]byte(path), valueBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store timestamp index
|
||||
indexKey := fmt.Sprintf("_ts:%020d:%s", now, path)
|
||||
return txn.Set([]byte(indexKey), []byte(newUUID))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("path", path).Error("Failed to store value")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
response := PutResponse{
|
||||
UUID: newUUID,
|
||||
Timestamp: now,
|
||||
}
|
||||
|
||||
status := http.StatusCreated
|
||||
if isUpdate {
|
||||
status = http.StatusOK
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
json.NewEncoder(w).Encode(response)
|
||||
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"path": path,
|
||||
"uuid": newUUID,
|
||||
"timestamp": now,
|
||||
"is_update": isUpdate,
|
||||
}).Info("Value stored")
|
||||
}
|
||||
|
||||
func (s *Server) deleteKVHandler(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
path := vars["path"]
|
||||
|
||||
mode := s.getMode()
|
||||
if mode == "syncing" {
|
||||
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
if mode == "read-only" && !s.isClusterMember(r.RemoteAddr) {
|
||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
var found bool
|
||||
err := s.db.Update(func(txn *badger.Txn) error {
|
||||
// Check if key exists and get timestamp for index cleanup
|
||||
item, err := txn.Get([]byte(path))
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
found = true
|
||||
|
||||
var storedValue StoredValue
|
||||
err = item.Value(func(val []byte) error {
|
||||
return json.Unmarshal(val, &storedValue)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete main data
|
||||
if err := txn.Delete([]byte(path)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete timestamp index
|
||||
indexKey := fmt.Sprintf("_ts:%020d:%s", storedValue.Timestamp, path)
|
||||
return txn.Delete([]byte(indexKey))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).WithField("path", path).Error("Failed to delete value")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if !found {
|
||||
http.Error(w, "Not Found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
s.logger.WithField("path", path).Info("Value deleted")
|
||||
}
|
||||
|
||||
func (s *Server) getMembersHandler(w http.ResponseWriter, r *http.Request) {
|
||||
members := s.getMembers()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(members)
|
||||
}
|
||||
|
||||
func (s *Server) joinMemberHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var req JoinRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
member := &Member{
|
||||
ID: req.ID,
|
||||
Address: req.Address,
|
||||
LastSeen: now,
|
||||
JoinedTimestamp: req.JoinedTimestamp,
|
||||
}
|
||||
|
||||
s.addMember(member)
|
||||
|
||||
// Return current member list
|
||||
members := s.getMembers()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(members)
|
||||
}
|
||||
|
||||
func (s *Server) leaveMemberHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var req LeaveRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
s.removeMember(req.ID)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (s *Server) pairsByTimeHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var req PairsByTimeRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Default limit to 15 as per spec
|
||||
if req.Limit <= 0 {
|
||||
req.Limit = 15
|
||||
}
|
||||
|
||||
var pairs []PairsByTimeResponse
|
||||
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
opts := badger.DefaultIteratorOptions
|
||||
opts.PrefetchSize = req.Limit
|
||||
it := txn.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
prefix := []byte("_ts:")
|
||||
if req.Prefix != "" {
|
||||
// We need to scan through timestamp entries and filter by path prefix
|
||||
}
|
||||
|
||||
for it.Seek(prefix); it.ValidForPrefix(prefix) && len(pairs) < req.Limit; it.Next() {
|
||||
item := it.Item()
|
||||
key := string(item.Key())
|
||||
|
||||
// Parse timestamp index key: "_ts:{timestamp}:{path}"
|
||||
parts := strings.SplitN(key, ":", 3)
|
||||
if len(parts) != 3 {
|
||||
continue
|
||||
}
|
||||
|
||||
timestamp, err := strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter by timestamp range
|
||||
if req.StartTimestamp > 0 && timestamp < req.StartTimestamp {
|
||||
continue
|
||||
}
|
||||
if req.EndTimestamp > 0 && timestamp >= req.EndTimestamp {
|
||||
continue
|
||||
}
|
||||
|
||||
path := parts[2]
|
||||
|
||||
// Filter by prefix if specified
|
||||
if req.Prefix != "" && !strings.HasPrefix(path, req.Prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
var uuid string
|
||||
err = item.Value(func(val []byte) error {
|
||||
uuid = string(val)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pairs = append(pairs, PairsByTimeResponse{
|
||||
Path: path,
|
||||
UUID: uuid,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("Failed to query pairs by time")
|
||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(pairs) == 0 {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(pairs)
|
||||
}
|
||||
|
||||
// Utility function to check if request is from cluster member
|
||||
func (s *Server) isClusterMember(remoteAddr string) bool {
|
||||
host, _, err := net.SplitHostPort(remoteAddr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
s.membersMu.RLock()
|
||||
defer s.membersMu.RUnlock()
|
||||
|
||||
for _, member := range s.members {
|
||||
memberHost, _, err := net.SplitHostPort(member.Address)
|
||||
if err == nil && memberHost == host {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Setup HTTP routes
|
||||
func (s *Server) setupRoutes() *mux.Router {
|
||||
router := mux.NewRouter()
|
||||
|
||||
// Health endpoint
|
||||
router.HandleFunc("/health", s.healthHandler).Methods("GET")
|
||||
|
||||
// KV endpoints
|
||||
router.HandleFunc("/kv/{path:.+}", s.getKVHandler).Methods("GET")
|
||||
router.HandleFunc("/kv/{path:.+}", s.putKVHandler).Methods("PUT")
|
||||
router.HandleFunc("/kv/{path:.+}", s.deleteKVHandler).Methods("DELETE")
|
||||
|
||||
// Member endpoints
|
||||
router.HandleFunc("/members/", s.getMembersHandler).Methods("GET")
|
||||
router.HandleFunc("/members/join", s.joinMemberHandler).Methods("POST")
|
||||
router.HandleFunc("/members/leave", s.leaveMemberHandler).Methods("DELETE")
|
||||
router.HandleFunc("/members/pairs_by_time", s.pairsByTimeHandler).Methods("POST")
|
||||
|
||||
return router
|
||||
}
|
||||
|
||||
// Start the server
|
||||
func (s *Server) Start() error {
|
||||
router := s.setupRoutes()
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", s.config.BindAddress, s.config.Port)
|
||||
s.httpServer = &http.Server{
|
||||
Addr: addr,
|
||||
Handler: router,
|
||||
}
|
||||
|
||||
s.logger.WithFields(logrus.Fields{
|
||||
"node_id": s.config.NodeID,
|
||||
"address": addr,
|
||||
}).Info("Starting KVS server")
|
||||
|
||||
// Start gossip and sync routines
|
||||
s.startBackgroundTasks()
|
||||
|
||||
// Try to join cluster if seed nodes are configured
|
||||
if len(s.config.SeedNodes) > 0 {
|
||||
go s.bootstrap()
|
||||
}
|
||||
|
||||
return s.httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
// Stop the server gracefully
|
||||
func (s *Server) Stop() error {
|
||||
s.logger.Info("Shutting down KVS server")
|
||||
|
||||
s.cancel()
|
||||
s.wg.Wait()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := s.httpServer.Shutdown(ctx); err != nil {
|
||||
s.logger.WithError(err).Error("HTTP server shutdown error")
|
||||
}
|
||||
|
||||
if err := s.db.Close(); err != nil {
|
||||
s.logger.WithError(err).Error("BadgerDB close error")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Background tasks placeholder (gossip, sync, etc.)
|
||||
func (s *Server) startBackgroundTasks() {
|
||||
// TODO: Implement gossip protocol
|
||||
// TODO: Implement periodic sync
|
||||
// TODO: Implement catch-up sync
|
||||
}
|
||||
|
||||
// Bootstrap placeholder
|
||||
func (s *Server) bootstrap() {
|
||||
// TODO: Implement gradual bootstrapping
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := "./config.yaml"
|
||||
|
||||
// Simple CLI argument parsing
|
||||
if len(os.Args) > 1 {
|
||||
configPath = os.Args[1]
|
||||
}
|
||||
|
||||
config, err := loadConfig(configPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to load configuration: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
server, err := NewServer(config)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to create server: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Handle graceful shutdown
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
<-sigCh
|
||||
server.Stop()
|
||||
}()
|
||||
|
||||
if err := server.Start(); err != nil && err != http.ErrServerClosed {
|
||||
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user