forked from ryyst/kalzu-value-store
refactor: extract storage system to storage package
Extracted BadgerDB operations, compression, and revision management from main.go to dedicated storage package for better modularity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
60
storage/compression.go
Normal file
60
storage/compression.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CompressionService handles ZSTD compression and decompression
|
||||||
|
type CompressionService struct {
|
||||||
|
compressor *zstd.Encoder
|
||||||
|
decompressor *zstd.Decoder
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCompressionService creates a new compression service
|
||||||
|
func NewCompressionService() (*CompressionService, error) {
|
||||||
|
// Initialize ZSTD compressor
|
||||||
|
compressor, err := zstd.NewWriter(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize ZSTD compressor: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize ZSTD decompressor
|
||||||
|
decompressor, err := zstd.NewReader(nil)
|
||||||
|
if err != nil {
|
||||||
|
compressor.Close()
|
||||||
|
return nil, fmt.Errorf("failed to initialize ZSTD decompressor: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &CompressionService{
|
||||||
|
compressor: compressor,
|
||||||
|
decompressor: decompressor,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the compression and decompression resources
|
||||||
|
func (c *CompressionService) Close() {
|
||||||
|
if c.compressor != nil {
|
||||||
|
c.compressor.Close()
|
||||||
|
}
|
||||||
|
if c.decompressor != nil {
|
||||||
|
c.decompressor.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompressData compresses data using ZSTD
|
||||||
|
func (c *CompressionService) CompressData(data []byte) ([]byte, error) {
|
||||||
|
if c.compressor == nil {
|
||||||
|
return nil, fmt.Errorf("compressor not initialized")
|
||||||
|
}
|
||||||
|
return c.compressor.EncodeAll(data, make([]byte, 0, len(data))), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecompressData decompresses ZSTD-compressed data
|
||||||
|
func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, error) {
|
||||||
|
if c.decompressor == nil {
|
||||||
|
return nil, fmt.Errorf("decompressor not initialized")
|
||||||
|
}
|
||||||
|
return c.decompressor.DecodeAll(compressedData, nil)
|
||||||
|
}
|
214
storage/revision.go
Normal file
214
storage/revision.go
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
badger "github.com/dgraph-io/badger/v4"
|
||||||
|
|
||||||
|
"kvs/auth"
|
||||||
|
"kvs/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RevisionService handles revision history management
|
||||||
|
type RevisionService struct {
|
||||||
|
storage *StorageService
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRevisionService creates a new revision service
|
||||||
|
func NewRevisionService(storage *StorageService) *RevisionService {
|
||||||
|
return &RevisionService{
|
||||||
|
storage: storage,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRevisionKey generates the storage key for a specific revision
|
||||||
|
func GetRevisionKey(baseKey string, revision int) string {
|
||||||
|
return fmt.Sprintf("%s:rev:%d", baseKey, revision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreRevisionHistory stores a value and manages revision history (up to 3 revisions)
|
||||||
|
func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error {
|
||||||
|
// Get existing metadata to check current revisions
|
||||||
|
metadataKey := auth.ResourceMetadataKey(key)
|
||||||
|
|
||||||
|
var metadata types.ResourceMetadata
|
||||||
|
var currentRevisions []int
|
||||||
|
|
||||||
|
// Try to get existing metadata
|
||||||
|
metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey))
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
// No existing metadata, this is a new key
|
||||||
|
metadata = types.ResourceMetadata{
|
||||||
|
OwnerUUID: "", // Will be set by caller if needed
|
||||||
|
GroupUUID: "",
|
||||||
|
Permissions: types.DefaultPermissions,
|
||||||
|
TTL: "",
|
||||||
|
CreatedAt: time.Now().Unix(),
|
||||||
|
UpdatedAt: time.Now().Unix(),
|
||||||
|
}
|
||||||
|
currentRevisions = []int{}
|
||||||
|
} else if err != nil {
|
||||||
|
// Error reading metadata
|
||||||
|
return fmt.Errorf("failed to read metadata: %v", err)
|
||||||
|
} else {
|
||||||
|
// Parse existing metadata
|
||||||
|
err = json.Unmarshal(metadataData, &metadata)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal metadata: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract current revisions (we store them as a custom field)
|
||||||
|
if metadata.TTL == "" {
|
||||||
|
currentRevisions = []int{}
|
||||||
|
} else {
|
||||||
|
// For now, we'll manage revisions separately - let's create a new metadata field
|
||||||
|
currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revision rotation logic: shift existing revisions
|
||||||
|
if len(currentRevisions) >= 3 {
|
||||||
|
// Delete oldest revision (rev:3)
|
||||||
|
oldestRevKey := GetRevisionKey(key, 3)
|
||||||
|
txn.Delete([]byte(oldestRevKey))
|
||||||
|
|
||||||
|
// Shift rev:2 → rev:3
|
||||||
|
rev2Key := GetRevisionKey(key, 2)
|
||||||
|
rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key))
|
||||||
|
if err == nil {
|
||||||
|
rev3Key := GetRevisionKey(key, 3)
|
||||||
|
r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shift rev:1 → rev:2
|
||||||
|
rev1Key := GetRevisionKey(key, 1)
|
||||||
|
rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key))
|
||||||
|
if err == nil {
|
||||||
|
rev2Key := GetRevisionKey(key, 2)
|
||||||
|
r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store current value as rev:1
|
||||||
|
currentValueBytes, err := json.Marshal(storedValue)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal current value for revision: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rev1Key := GetRevisionKey(key, 1)
|
||||||
|
err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to store revision 1: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update metadata with new revision count
|
||||||
|
metadata.UpdatedAt = time.Now().Unix()
|
||||||
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal metadata: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRevisionHistory retrieves all available revisions for a given key
|
||||||
|
func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) {
|
||||||
|
var revisions []map[string]interface{}
|
||||||
|
|
||||||
|
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||||
|
// Check revisions 1, 2, 3
|
||||||
|
for rev := 1; rev <= 3; rev++ {
|
||||||
|
revKey := GetRevisionKey(key, rev)
|
||||||
|
|
||||||
|
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||||
|
if err == badger.ErrKeyNotFound {
|
||||||
|
continue // Skip missing revisions
|
||||||
|
} else if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve revision %d: %v", rev, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var storedValue types.StoredValue
|
||||||
|
err = json.Unmarshal(revData, &storedValue)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var data interface{}
|
||||||
|
err = json.Unmarshal(storedValue.Data, &data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := map[string]interface{}{
|
||||||
|
"revision": rev,
|
||||||
|
"uuid": storedValue.UUID,
|
||||||
|
"timestamp": storedValue.Timestamp,
|
||||||
|
"data": data,
|
||||||
|
}
|
||||||
|
|
||||||
|
revisions = append(revisions, revision)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort revisions by revision number (newest first)
|
||||||
|
// Note: they're already in order since we iterate 1->3, but reverse for newest first
|
||||||
|
for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 {
|
||||||
|
revisions[i], revisions[j] = revisions[j], revisions[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
return revisions, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSpecificRevision retrieves a specific revision of a key
|
||||||
|
func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.StoredValue, error) {
|
||||||
|
if revision < 1 || revision > 3 {
|
||||||
|
return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision)
|
||||||
|
}
|
||||||
|
|
||||||
|
var storedValue types.StoredValue
|
||||||
|
err := r.storage.db.View(func(txn *badger.Txn) error {
|
||||||
|
revKey := GetRevisionKey(key, revision)
|
||||||
|
|
||||||
|
revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Unmarshal(revData, &storedValue)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &storedValue, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRevisionFromPath extracts revision number from a path like "key/data/rev/2"
|
||||||
|
func GetRevisionFromPath(path string) (string, int, error) {
|
||||||
|
parts := strings.Split(path, "/")
|
||||||
|
if len(parts) < 4 || parts[len(parts)-2] != "rev" {
|
||||||
|
return "", 0, fmt.Errorf("invalid revision path format")
|
||||||
|
}
|
||||||
|
|
||||||
|
revisionStr := parts[len(parts)-1]
|
||||||
|
revision, err := strconv.Atoi(revisionStr)
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconstruct the base key without the "/rev/N" suffix
|
||||||
|
baseKey := strings.Join(parts[:len(parts)-2], "/")
|
||||||
|
|
||||||
|
return baseKey, revision, nil
|
||||||
|
}
|
112
storage/storage.go
Normal file
112
storage/storage.go
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
badger "github.com/dgraph-io/badger/v4"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"kvs/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorageService handles all BadgerDB operations and data management
|
||||||
|
type StorageService struct {
|
||||||
|
db *badger.DB
|
||||||
|
config *types.Config
|
||||||
|
compressionSvc *CompressionService
|
||||||
|
logger *logrus.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorageService creates a new storage service
|
||||||
|
func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) {
|
||||||
|
var compressionSvc *CompressionService
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Initialize compression if enabled
|
||||||
|
if config.CompressionEnabled {
|
||||||
|
compressionSvc, err = NewCompressionService()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to initialize compression: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &StorageService{
|
||||||
|
db: db,
|
||||||
|
config: config,
|
||||||
|
compressionSvc: compressionSvc,
|
||||||
|
logger: logger,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the storage service and its resources
|
||||||
|
func (s *StorageService) Close() {
|
||||||
|
if s.compressionSvc != nil {
|
||||||
|
s.compressionSvc.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreWithTTL stores data with optional TTL and compression
|
||||||
|
func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error {
|
||||||
|
var finalData []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Compress data if compression is enabled
|
||||||
|
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||||
|
finalData, err = s.compressionSvc.CompressData(data)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to compress data: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
finalData = data
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := badger.NewEntry(key, finalData)
|
||||||
|
|
||||||
|
// Apply TTL if specified
|
||||||
|
if ttl > 0 {
|
||||||
|
entry = entry.WithTTL(ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
return txn.SetEntry(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetrieveWithDecompression retrieves and decompresses data from BadgerDB
|
||||||
|
func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte) ([]byte, error) {
|
||||||
|
item, err := txn.Get(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var compressedData []byte
|
||||||
|
err = item.Value(func(val []byte) error {
|
||||||
|
compressedData = append(compressedData, val...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decompress data if compression is enabled
|
||||||
|
if s.config.CompressionEnabled && s.compressionSvc != nil {
|
||||||
|
return s.compressionSvc.DecompressData(compressedData)
|
||||||
|
}
|
||||||
|
|
||||||
|
return compressedData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompressData compresses data using the compression service
|
||||||
|
func (s *StorageService) CompressData(data []byte) ([]byte, error) {
|
||||||
|
if !s.config.CompressionEnabled || s.compressionSvc == nil {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
return s.compressionSvc.CompressData(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DecompressData decompresses data using the compression service
|
||||||
|
func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) {
|
||||||
|
if !s.config.CompressionEnabled || s.compressionSvc == nil {
|
||||||
|
return compressedData, nil
|
||||||
|
}
|
||||||
|
return s.compressionSvc.DecompressData(compressedData)
|
||||||
|
}
|
Reference in New Issue
Block a user