From 5223438ddfc2ea239789303e7c451ae9fc8f115f Mon Sep 17 00:00:00 2001 From: ryyst Date: Thu, 18 Sep 2025 20:40:42 +0300 Subject: [PATCH] refactor: extract storage system to storage package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extracted BadgerDB operations, compression, and revision management from main.go to dedicated storage package for better modularity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- storage/compression.go | 60 ++++++++++++ storage/revision.go | 214 +++++++++++++++++++++++++++++++++++++++++ storage/storage.go | 112 +++++++++++++++++++++ 3 files changed, 386 insertions(+) create mode 100644 storage/compression.go create mode 100644 storage/revision.go create mode 100644 storage/storage.go diff --git a/storage/compression.go b/storage/compression.go new file mode 100644 index 0000000..0a635de --- /dev/null +++ b/storage/compression.go @@ -0,0 +1,60 @@ +package storage + +import ( + "fmt" + + "github.com/klauspost/compress/zstd" +) + +// CompressionService handles ZSTD compression and decompression +type CompressionService struct { + compressor *zstd.Encoder + decompressor *zstd.Decoder +} + +// NewCompressionService creates a new compression service +func NewCompressionService() (*CompressionService, error) { + // Initialize ZSTD compressor + compressor, err := zstd.NewWriter(nil) + if err != nil { + return nil, fmt.Errorf("failed to initialize ZSTD compressor: %v", err) + } + + // Initialize ZSTD decompressor + decompressor, err := zstd.NewReader(nil) + if err != nil { + compressor.Close() + return nil, fmt.Errorf("failed to initialize ZSTD decompressor: %v", err) + } + + return &CompressionService{ + compressor: compressor, + decompressor: decompressor, + }, nil +} + +// Close closes the compression and decompression resources +func (c *CompressionService) Close() { + if c.compressor != nil { + c.compressor.Close() + } + if c.decompressor != nil { + c.decompressor.Close() + } +} + +// CompressData compresses data using ZSTD +func (c *CompressionService) CompressData(data []byte) ([]byte, error) { + if c.compressor == nil { + return nil, fmt.Errorf("compressor not initialized") + } + return c.compressor.EncodeAll(data, make([]byte, 0, len(data))), nil +} + +// DecompressData decompresses ZSTD-compressed data +func (c *CompressionService) DecompressData(compressedData []byte) ([]byte, error) { + if c.decompressor == nil { + return nil, fmt.Errorf("decompressor not initialized") + } + return c.decompressor.DecodeAll(compressedData, nil) +} \ No newline at end of file diff --git a/storage/revision.go b/storage/revision.go new file mode 100644 index 0000000..25f35c2 --- /dev/null +++ b/storage/revision.go @@ -0,0 +1,214 @@ +package storage + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + badger "github.com/dgraph-io/badger/v4" + + "kvs/auth" + "kvs/types" +) + +// RevisionService handles revision history management +type RevisionService struct { + storage *StorageService +} + +// NewRevisionService creates a new revision service +func NewRevisionService(storage *StorageService) *RevisionService { + return &RevisionService{ + storage: storage, + } +} + +// GetRevisionKey generates the storage key for a specific revision +func GetRevisionKey(baseKey string, revision int) string { + return fmt.Sprintf("%s:rev:%d", baseKey, revision) +} + +// StoreRevisionHistory stores a value and manages revision history (up to 3 revisions) +func (r *RevisionService) StoreRevisionHistory(txn *badger.Txn, key string, storedValue types.StoredValue, ttl time.Duration) error { + // Get existing metadata to check current revisions + metadataKey := auth.ResourceMetadataKey(key) + + var metadata types.ResourceMetadata + var currentRevisions []int + + // Try to get existing metadata + metadataData, err := r.storage.RetrieveWithDecompression(txn, []byte(metadataKey)) + if err == badger.ErrKeyNotFound { + // No existing metadata, this is a new key + metadata = types.ResourceMetadata{ + OwnerUUID: "", // Will be set by caller if needed + GroupUUID: "", + Permissions: types.DefaultPermissions, + TTL: "", + CreatedAt: time.Now().Unix(), + UpdatedAt: time.Now().Unix(), + } + currentRevisions = []int{} + } else if err != nil { + // Error reading metadata + return fmt.Errorf("failed to read metadata: %v", err) + } else { + // Parse existing metadata + err = json.Unmarshal(metadataData, &metadata) + if err != nil { + return fmt.Errorf("failed to unmarshal metadata: %v", err) + } + + // Extract current revisions (we store them as a custom field) + if metadata.TTL == "" { + currentRevisions = []int{} + } else { + // For now, we'll manage revisions separately - let's create a new metadata field + currentRevisions = []int{1, 2, 3} // Assume all revisions exist for existing keys + } + } + + // Revision rotation logic: shift existing revisions + if len(currentRevisions) >= 3 { + // Delete oldest revision (rev:3) + oldestRevKey := GetRevisionKey(key, 3) + txn.Delete([]byte(oldestRevKey)) + + // Shift rev:2 → rev:3 + rev2Key := GetRevisionKey(key, 2) + rev2Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev2Key)) + if err == nil { + rev3Key := GetRevisionKey(key, 3) + r.storage.StoreWithTTL(txn, []byte(rev3Key), rev2Data, ttl) + } + + // Shift rev:1 → rev:2 + rev1Key := GetRevisionKey(key, 1) + rev1Data, err := r.storage.RetrieveWithDecompression(txn, []byte(rev1Key)) + if err == nil { + rev2Key := GetRevisionKey(key, 2) + r.storage.StoreWithTTL(txn, []byte(rev2Key), rev1Data, ttl) + } + } + + // Store current value as rev:1 + currentValueBytes, err := json.Marshal(storedValue) + if err != nil { + return fmt.Errorf("failed to marshal current value for revision: %v", err) + } + + rev1Key := GetRevisionKey(key, 1) + err = r.storage.StoreWithTTL(txn, []byte(rev1Key), currentValueBytes, ttl) + if err != nil { + return fmt.Errorf("failed to store revision 1: %v", err) + } + + // Update metadata with new revision count + metadata.UpdatedAt = time.Now().Unix() + metadataBytes, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %v", err) + } + + return r.storage.StoreWithTTL(txn, []byte(metadataKey), metadataBytes, ttl) +} + +// GetRevisionHistory retrieves all available revisions for a given key +func (r *RevisionService) GetRevisionHistory(key string) ([]map[string]interface{}, error) { + var revisions []map[string]interface{} + + err := r.storage.db.View(func(txn *badger.Txn) error { + // Check revisions 1, 2, 3 + for rev := 1; rev <= 3; rev++ { + revKey := GetRevisionKey(key, rev) + + revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) + if err == badger.ErrKeyNotFound { + continue // Skip missing revisions + } else if err != nil { + return fmt.Errorf("failed to retrieve revision %d: %v", rev, err) + } + + var storedValue types.StoredValue + err = json.Unmarshal(revData, &storedValue) + if err != nil { + return fmt.Errorf("failed to unmarshal revision %d: %v", rev, err) + } + + var data interface{} + err = json.Unmarshal(storedValue.Data, &data) + if err != nil { + return fmt.Errorf("failed to unmarshal revision %d data: %v", rev, err) + } + + revision := map[string]interface{}{ + "revision": rev, + "uuid": storedValue.UUID, + "timestamp": storedValue.Timestamp, + "data": data, + } + + revisions = append(revisions, revision) + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Sort revisions by revision number (newest first) + // Note: they're already in order since we iterate 1->3, but reverse for newest first + for i, j := 0, len(revisions)-1; i < j; i, j = i+1, j-1 { + revisions[i], revisions[j] = revisions[j], revisions[i] + } + + return revisions, nil +} + +// GetSpecificRevision retrieves a specific revision of a key +func (r *RevisionService) GetSpecificRevision(key string, revision int) (*types.StoredValue, error) { + if revision < 1 || revision > 3 { + return nil, fmt.Errorf("invalid revision number: %d (must be 1-3)", revision) + } + + var storedValue types.StoredValue + err := r.storage.db.View(func(txn *badger.Txn) error { + revKey := GetRevisionKey(key, revision) + + revData, err := r.storage.RetrieveWithDecompression(txn, []byte(revKey)) + if err != nil { + return err + } + + return json.Unmarshal(revData, &storedValue) + }) + + if err != nil { + return nil, err + } + + return &storedValue, nil +} + +// GetRevisionFromPath extracts revision number from a path like "key/data/rev/2" +func GetRevisionFromPath(path string) (string, int, error) { + parts := strings.Split(path, "/") + if len(parts) < 4 || parts[len(parts)-2] != "rev" { + return "", 0, fmt.Errorf("invalid revision path format") + } + + revisionStr := parts[len(parts)-1] + revision, err := strconv.Atoi(revisionStr) + if err != nil { + return "", 0, fmt.Errorf("invalid revision number: %s", revisionStr) + } + + // Reconstruct the base key without the "/rev/N" suffix + baseKey := strings.Join(parts[:len(parts)-2], "/") + + return baseKey, revision, nil +} \ No newline at end of file diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..952725d --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,112 @@ +package storage + +import ( + "fmt" + "time" + + badger "github.com/dgraph-io/badger/v4" + "github.com/sirupsen/logrus" + + "kvs/types" +) + +// StorageService handles all BadgerDB operations and data management +type StorageService struct { + db *badger.DB + config *types.Config + compressionSvc *CompressionService + logger *logrus.Logger +} + +// NewStorageService creates a new storage service +func NewStorageService(db *badger.DB, config *types.Config, logger *logrus.Logger) (*StorageService, error) { + var compressionSvc *CompressionService + var err error + + // Initialize compression if enabled + if config.CompressionEnabled { + compressionSvc, err = NewCompressionService() + if err != nil { + return nil, fmt.Errorf("failed to initialize compression: %v", err) + } + } + + return &StorageService{ + db: db, + config: config, + compressionSvc: compressionSvc, + logger: logger, + }, nil +} + +// Close closes the storage service and its resources +func (s *StorageService) Close() { + if s.compressionSvc != nil { + s.compressionSvc.Close() + } +} + +// StoreWithTTL stores data with optional TTL and compression +func (s *StorageService) StoreWithTTL(txn *badger.Txn, key []byte, data []byte, ttl time.Duration) error { + var finalData []byte + var err error + + // Compress data if compression is enabled + if s.config.CompressionEnabled && s.compressionSvc != nil { + finalData, err = s.compressionSvc.CompressData(data) + if err != nil { + return fmt.Errorf("failed to compress data: %v", err) + } + } else { + finalData = data + } + + entry := badger.NewEntry(key, finalData) + + // Apply TTL if specified + if ttl > 0 { + entry = entry.WithTTL(ttl) + } + + return txn.SetEntry(entry) +} + +// RetrieveWithDecompression retrieves and decompresses data from BadgerDB +func (s *StorageService) RetrieveWithDecompression(txn *badger.Txn, key []byte) ([]byte, error) { + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + var compressedData []byte + err = item.Value(func(val []byte) error { + compressedData = append(compressedData, val...) + return nil + }) + if err != nil { + return nil, err + } + + // Decompress data if compression is enabled + if s.config.CompressionEnabled && s.compressionSvc != nil { + return s.compressionSvc.DecompressData(compressedData) + } + + return compressedData, nil +} + +// CompressData compresses data using the compression service +func (s *StorageService) CompressData(data []byte) ([]byte, error) { + if !s.config.CompressionEnabled || s.compressionSvc == nil { + return data, nil + } + return s.compressionSvc.CompressData(data) +} + +// DecompressData decompresses data using the compression service +func (s *StorageService) DecompressData(compressedData []byte) ([]byte, error) { + if !s.config.CompressionEnabled || s.compressionSvc == nil { + return compressedData, nil + } + return s.compressionSvc.DecompressData(compressedData) +} \ No newline at end of file