From dc5d652503cc1fb645c5b11139738e893bfb21ac Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Sun, 16 Mar 2025 10:30:47 +0200 Subject: [PATCH] First push to rauhala gitea. --- README.md | 87 ++++- SlidingSqlite.py | 720 +++++++++++++++++++++++++++++++++++ test_slidingsqlite.py | 99 +++++ test_slidingsqlite_simple.py | 101 +++++ usage_by_gpt.md | 91 +++++ usage_by_grok.md | 500 ++++++++++++++++++++++++ 6 files changed, 1597 insertions(+), 1 deletion(-) create mode 100644 SlidingSqlite.py create mode 100644 test_slidingsqlite.py create mode 100644 test_slidingsqlite_simple.py create mode 100644 usage_by_gpt.md create mode 100644 usage_by_grok.md diff --git a/README.md b/README.md index d62b895..5a08404 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,87 @@ -# sliding_sqlite +# SlidingSQLite + +A thread-safe SQLite implementation with automatic time-based database rotation, designed for high-throughput, multi-threaded applications. This library provides a robust solution for managing time-windowed data, with features like database rotation, retention policies, and asynchronous query execution. + +## Features + +- **Thread-Safe Operations**: Safely execute read and write operations in a multi-threaded environment using a queue-based worker system. +- **Time-Based Database Rotation**: Automatically rotates database files based on a configurable time interval (e.g., hourly, daily). +- **Retention Policy**: Automatically deletes old database files after a configurable retention period to prevent disk space exhaustion. +- **Asynchronous Query Execution**: Supports asynchronous read and write operations with UUID-based result retrieval. +- **Transparent Read Across Databases**: Read queries are automatically executed across all relevant database files, providing a seamless view of time-windowed data. +- **Error Handling**: Robust error handling with custom exceptions and query result objects. +- **Configurable Cleanup**: Periodic cleanup of stale queries and old databases to prevent memory leaks and manage disk usage. +- **Customizable Schema**: Initialize databases with a user-defined schema. + +## Installation + +To use `SlidingSQLite`, you need Python 3.7 or higher. The library depends on the standard library and does not require external packages beyond SQLite, which is included with Python. + +1. Clone or download the repository: + ```bash + git clone + cd SlidingSQLite + ``` + +2. Place the `SlidingSqlite.py` file in your project directory or install it as a module. + +## Quick Start + +Here is a basic example to get you started: + +```python +import logging +from SlidingSqlite import SlidingSQLite + +# Set up logging +logging.basicConfig(level=logging.INFO) + +# Define a simple schema +schema = """ +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + message TEXT +); +""" + +# Initialize SlidingSQLite +db = SlidingSQLite( + db_dir="./databases", + schema=schema, + rotation_interval=3600, # Rotate every hour + retention_period=604800, # Keep databases for 7 days + cleanup_interval=3600, # Run cleanup every hour + auto_delete_old_dbs=True +) + +# Insert some data +query_id = db.execute_write( + "INSERT INTO logs (timestamp, message) VALUES (?, ?)", + (time.time(), "Hello, SlidingSQLite!") +) +result = db.get_result(query_id) +if result.success: + logging.info("Write operation successful") + +# Read data across all databases +query_id = db.execute_read("SELECT * FROM logs WHERE timestamp > ? ORDER BY timestamp DESC", (time.time() - 86400,)) +result = db.get_read_result(query_id) +if result.success: + logging.info(f"Found {len(result.data)} log entries: {result.data}") + +# Shut down the database +db.shutdown() +``` + +For a more comprehensive example, see the `example.py` file in the repository, which demonstrates multi-threaded usage. + +## Documentation + +For detailed usage instructions, API reference, and examples, please refer to the [Usage Documentation](USAGE.md). + +## Requirements + +- Python 3.7+ +- SQLite (included with Python) diff --git a/SlidingSqlite.py b/SlidingSqlite.py new file mode 100644 index 0000000..736d266 --- /dev/null +++ b/SlidingSqlite.py @@ -0,0 +1,720 @@ +import sqlite3 +import uuid +import threading +import datetime +import queue +import os +import logging +import time +from typing import ( + Any, + Dict, + Optional, + Tuple, + List, + Union, + Set, + NamedTuple, + TypeVar, + Generic, + Callable, +) + +T = TypeVar("T") + + +class DatabaseError(Exception): + """Base exception for all database errors""" + + pass + + +class QueryError(DatabaseError): + """Exception raised when a query fails""" + + pass + + +class QueryResult(Generic[T]): + """Class to handle query results with proper error handling""" + + def __init__(self, data: Optional[T] = None, error: Optional[Exception] = None): + self.data = data + self.error = error + self.success = error is None and data is not None + + def __bool__(self) -> bool: + return self.success + + +class DatabaseTimeframe(NamedTuple): + """Represents a database file and its time range""" + + db_file: str + start_time: float + end_time: float + + +class QueryLockManager: + def __init__(self, sliding_sqlite, query_id): + self.sliding_sqlite = sliding_sqlite + self.query_id = query_id + self.lock = sliding_sqlite.query_lock + self.is_active = False + + def __enter__(self): + # Acquire the lock and check query status + with self.lock: + self.is_active = ( + self.query_id in self.sliding_sqlite.read_queues + and self.query_id in self.sliding_sqlite.active_queries + ) + return self.is_active + + def __exit__(self, exc_type, exc_val, exc_tb): + # If there was an exception, we don't need to do anything + pass + + +class SlidingSQLite: + """ + Thread-safe SQLite implementation with automatic time-based database rotation. + + This class provides a way to safely use SQLite in a multi-threaded environment + by queuing database operations and processing them in dedicated worker threads. + Databases are created based on the specified rotation interval and old databases + are automatically cleaned up based on the specified retention period. + """ + + def __init__( + self, + db_dir: str, + schema: str, + retention_period: int = 604800, + rotation_interval: int = 3600, + cleanup_interval: int = 3600, + auto_delete_old_dbs: bool = True, + ) -> None: + """ + Initialize the SlidingSQLite instance. + + Args: + db_dir: Directory to store database files + schema: SQL schema to initialize new databases + retention_period: Number of seconds to keep databases before deletion (default: 7 days) + rotation_interval: How often to rotate to a new database file in seconds (default: 1 hour) + cleanup_interval: How often to run the cleanup process in seconds (default: 1 hour) + auto_delete_old_dbs: Whether to automatically delete old databases (default: True) + """ + self.db_dir = db_dir + self.schema = schema + self.retention_period = retention_period # In seconds + self.rotation_interval = rotation_interval # In seconds + self.cleanup_interval = cleanup_interval # In seconds + self.auto_delete_old_dbs = auto_delete_old_dbs # New field + + # Queues for operations + self.write_queue: queue.Queue[Tuple[str, Tuple[Any, ...], uuid.UUID]] = ( + queue.Queue() + ) + self.result_queues: Dict[uuid.UUID, queue.Queue[QueryResult[bool]]] = {} + self.read_queues: Dict[ + uuid.UUID, queue.Queue[QueryResult[List[Tuple[Any, ...]]]] + ] = {} + + # Thread synchronization + self.shutdown_flag = threading.Event() + self.worker_thread = None + + # Cache for database connections + self.connections: Dict[str, sqlite3.Connection] = {} + self.conn_lock = threading.Lock() + + # Track active query IDs for cleanup + self.active_queries: Set[uuid.UUID] = set() + self.query_lock = threading.Lock() + + # Initialize system + self._setup() + + def _setup(self) -> None: + """Setup the database directory and initialize workers""" + try: + os.makedirs(self.db_dir, exist_ok=True) + self._init_metadata() + + # Start worker threads + self._start_worker() + self._start_cleanup_worker() + + # Register current database + self._register_current_db() + except Exception as e: + logging.error(f"Failed to initialize SlidingSQLite: {e}") + raise DatabaseError(f"Failed to initialize SlidingSQLite: {e}") + + def _init_metadata(self) -> None: + """Initialize the metadata database""" + try: + with self._get_connection(self._get_metadata_db()) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS metadata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + db_file TEXT NOT NULL UNIQUE, + start_time REAL NOT NULL, + end_time REAL NOT NULL + ) + """ + ) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Failed to initialize metadata database: {e}") + raise DatabaseError(f"Failed to initialize metadata database: {e}") + + def _get_connection(self, db_file: str) -> sqlite3.Connection: + """ + Get a connection to the specified database file. + Reuses existing connections when possible. + + Args: + db_file: Path to the database file + + Returns: + SQLite connection object + """ + with self.conn_lock: + if db_file not in self.connections or self.connections[db_file] is None: + try: + conn = sqlite3.connect( + db_file, isolation_level=None, check_same_thread=False + ) + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute( + "PRAGMA busy_timeout=5000;" + ) # Wait up to 5 seconds when database is locked + + # Initialize schema if this is a data database (not metadata) + if db_file != self._get_metadata_db(): + conn.executescript(self.schema) + + self.connections[db_file] = conn + except sqlite3.Error as e: + logging.error(f"Failed to connect to database {db_file}: {e}") + raise DatabaseError(f"Failed to connect to database {db_file}: {e}") + + return self.connections[db_file] + + def _get_metadata_db(self) -> str: + """Get the path to the metadata database""" + return os.path.join(self.db_dir, "metadata.db") + + def _get_current_db(self) -> str: + """Get the path to the current time-based database""" + # Generate timestamped DB name based on rotation interval + now = time.time() + interval_timestamp = int(now // self.rotation_interval) * self.rotation_interval + timestamp_str = datetime.datetime.fromtimestamp(interval_timestamp).strftime( + "%Y%m%d_%H%M%S" + ) + return os.path.join(self.db_dir, f"data_{timestamp_str}.db") + + def _register_current_db(self) -> None: + """Register the current database in the metadata table""" + current_db = self._get_current_db() + now = time.time() + + # Calculate time boundaries for the current database + interval_start = int(now // self.rotation_interval) * self.rotation_interval + interval_end = interval_start + self.rotation_interval + + try: + with self._get_connection(self._get_metadata_db()) as conn: + # Check if this database is already registered + existing = conn.execute( + "SELECT id FROM metadata WHERE db_file = ?", (current_db,) + ).fetchone() + + if not existing: + conn.execute( + "INSERT INTO metadata (db_file, start_time, end_time) VALUES (?, ?, ?)", + (current_db, interval_start, interval_end), + ) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Failed to register current database: {e}") + # Continue execution as this is not critical + + def _rotate_databases(self) -> None: + """Delete databases that are older than the retention period""" + if not self.auto_delete_old_dbs: + return # Skip deletion if auto-delete is disabled + + cutoff_time = time.time() - self.retention_period + + try: + with self._get_connection(self._get_metadata_db()) as conn: + # Find databases older than the cutoff time + old_dbs = conn.execute( + "SELECT db_file FROM metadata WHERE end_time < ?", (cutoff_time,) + ).fetchall() + + # Delete old database files + for (db_file,) in old_dbs: + self._delete_database_file(db_file) + + # Clean up metadata entries + conn.execute("DELETE FROM metadata WHERE end_time < ?", (cutoff_time,)) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Database rotation error: {e}") + + def _cleanup_stale_queries(self) -> None: + """Clean up stale query results to prevent memory leaks""" + with self.query_lock: + # Find completed queries to clean up + completed_queries = set() + + for query_id in list(self.result_queues.keys()): + if query_id not in self.active_queries: + completed_queries.add(query_id) + + for query_id in list(self.read_queues.keys()): + if query_id not in self.active_queries: + completed_queries.add(query_id) + + # Remove completed queries from dictionaries + for query_id in completed_queries: + if query_id in self.result_queues: + del self.result_queues[query_id] + if query_id in self.read_queues: + del self.read_queues[query_id] + + def _delete_database_file(self, db_file: str) -> bool: + """ + Delete a database file and clean up resources. + + Args: + db_file: Path to the database file to delete + + Returns: + True if deleted successfully, False otherwise + """ + # Close and remove connection if it exists + with self.conn_lock: + if db_file in self.connections: + try: + self.connections[db_file].close() + except sqlite3.Error: + pass # Ignore errors on close + del self.connections[db_file] + + # Delete the file + if os.path.exists(db_file): + try: + os.remove(db_file) + logging.info(f"Deleted database: {db_file}") + return True + except OSError as e: + logging.error(f"Failed to delete database {db_file}: {e}") + return False + return False # File didn't exist + + def set_retention_period(self, seconds: int) -> None: + """ + Set the retention period for databases. + + Args: + seconds: Number of seconds to keep databases before deletion + """ + self.retention_period = max(0, seconds) # Ensure positive value + + def set_auto_delete(self, enabled: bool) -> None: + """ + Enable or disable automatic deletion of old databases. + + Args: + enabled: Whether to automatically delete old databases + """ + self.auto_delete_old_dbs = enabled + + def delete_databases_before(self, timestamp: float) -> int: + """ + Delete all databases with end_time before the specified timestamp. + + Args: + timestamp: Unix timestamp (seconds since epoch) + + Returns: + Number of databases deleted + """ + count = 0 + try: + with self._get_connection(self._get_metadata_db()) as conn: + # Find databases older than the specified time + old_dbs = conn.execute( + "SELECT db_file FROM metadata WHERE end_time < ?", (timestamp,) + ).fetchall() + + # Delete old database files + for (db_file,) in old_dbs: + if self._delete_database_file(db_file): + count += 1 + + # Clean up metadata entries + conn.execute("DELETE FROM metadata WHERE end_time < ?", (timestamp,)) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Database deletion error: {e}") + raise DatabaseError(f"Failed to delete databases: {e}") + + return count + + def delete_databases_in_range(self, start_time: float, end_time: float) -> int: + """ + Delete all databases with time ranges falling within the specified period. + + Args: + start_time: Start of time range (unix timestamp) + end_time: End of time range (unix timestamp) + + Returns: + Number of databases deleted + """ + count = 0 + try: + with self._get_connection(self._get_metadata_db()) as conn: + # Find databases in the specified time range + # A database is in range if its time range overlaps with the specified range + dbs = conn.execute( + """ + SELECT db_file FROM metadata + WHERE (start_time <= ? AND end_time >= ?) OR + (start_time <= ? AND end_time >= ?) OR + (start_time >= ? AND end_time <= ?) + """, + (end_time, start_time, end_time, start_time, start_time, end_time), + ).fetchall() + + # Delete database files + for (db_file,) in dbs: + if self._delete_database_file(db_file): + count += 1 + + # Clean up metadata entries + conn.execute( + """ + DELETE FROM metadata + WHERE (start_time <= ? AND end_time >= ?) OR + (start_time <= ? AND end_time >= ?) OR + (start_time >= ? AND end_time <= ?) + """, + (end_time, start_time, end_time, start_time, start_time, end_time), + ) + conn.commit() + except sqlite3.Error as e: + logging.error(f"Database deletion error: {e}") + raise DatabaseError(f"Failed to delete databases: {e}") + + return count + + def get_databases_info(self) -> List[DatabaseTimeframe]: + """ + Get information about all available databases. + + Returns: + List of DatabaseTimeframe objects containing database file paths and time ranges + """ + databases = [] + try: + with self._get_connection(self._get_metadata_db()) as conn: + rows = conn.execute( + "SELECT db_file, start_time, end_time FROM metadata ORDER BY start_time" + ).fetchall() + + for db_file, start_time, end_time in rows: + databases.append(DatabaseTimeframe(db_file, start_time, end_time)) + + except sqlite3.Error as e: + logging.error(f"Error retrieving database info: {e}") + raise DatabaseError(f"Failed to retrieve database info: {e}") + + return databases + + def execute(self, query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID: + """ + Smart query executor that automatically determines if the query + is a read or write operation and routes accordingly. + + Args: + query: SQL query to execute + params: Parameters for the query + + Returns: + UUID that can be used to retrieve the result + """ + # look for new database files + self._register_current_db() + + query_upper = query.strip().upper() + + # Check if the query is a read operation + if ( + query_upper.startswith("SELECT") + or query_upper.startswith("PRAGMA") + or query_upper.startswith("EXPLAIN") + ): + return self.execute_read(query, params) + else: + return self.execute_write(query, params) + + def execute_write(self, query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID: + """ + Execute a write query asynchronously. + + Args: + query: SQL query to execute + params: Parameters for the query + + Returns: + UUID that can be used to retrieve the result + """ + # look for new database files + self._register_current_db() + + query_id = uuid.uuid4() + + with self.query_lock: + self.result_queues[query_id] = queue.Queue() + self.active_queries.add(query_id) + + self.write_queue.put((query, params, query_id)) + return query_id + + def execute_write_sync( + self, query: str, params: Tuple[Any, ...] = (), timeout: float = 5.0 + ) -> QueryResult[bool]: + """ + Execute a write query synchronously. + + Args: + query: SQL query to execute + params: Parameters for the query + timeout: Maximum time to wait for the result + + Returns: + QueryResult containing success status and any error + """ + query_id = self.execute_write(query, params) + return self.get_result(query_id, timeout) + + def execute_read(self, query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID: + """ + Execute a read query asynchronously across all relevant databases. + This provides transparent access to all time-windowed data. + + Args: + query: SQL query to execute + params: Parameters for the query + + Returns: + UUID that can be used to retrieve the result + """ + # look for new database files + self._register_current_db() + + query_id = uuid.uuid4() + + with self.query_lock: + self.read_queues[query_id] = queue.Queue() + self.active_queries.add(query_id) + + # Start the worker thread that will query across all databases + threading.Thread( + target=self._read_across_all_worker, + args=(query, params, query_id), + daemon=True, + ).start() + + return query_id + + def _read_worker( + self, query: str, params: Tuple[Any, ...], query_id: uuid.UUID + ) -> None: + """Worker thread for processing read queries""" + db_file = self._get_current_db() + try: + with self._get_connection(db_file) as conn: + results = conn.execute(query, params).fetchall() + if query_id in self.read_queues: + self.read_queues[query_id].put(QueryResult(data=results)) + except Exception as e: + error_msg = f"Read error: {e}" + logging.error(error_msg) + if query_id in self.read_queues: + self.read_queues[query_id].put(QueryResult(error=QueryError(error_msg))) + + def execute_read_sync( + self, query: str, params: Tuple[Any, ...] = (), timeout: float = 5.0 + ) -> QueryResult[List[Tuple[Any, ...]]]: + """ + Execute a read query synchronously across all relevant databases. + + Args: + query: SQL query to execute + params: Parameters for the query + timeout: Maximum time to wait for the result + + Returns: + QueryResult containing combined query results and any error + """ + query_id = self.execute_read(query, params) + return self.get_read_result(query_id, timeout) + + def _read_across_all_worker( + self, query: str, params: Tuple[Any, ...], query_id: uuid.UUID + ) -> None: + """Worker thread for processing read queries across all databases""" + try: + # Get all available databases from metadata + with self._get_connection(self._get_metadata_db()) as conn: + db_files = conn.execute( + "SELECT db_file FROM metadata ORDER BY end_time DESC" + ).fetchall() + + all_results: List[Tuple[Any, ...]] = [] + for (db_file,) in db_files: + if os.path.exists(db_file): + try: + with self._get_connection(db_file) as conn: + results = conn.execute(query, params).fetchall() + all_results.extend(results) + except sqlite3.Error as e: + logging.warning(f"Error reading from {db_file}: {e}") + # Continue with other databases + + # Use the context manager to safely check query status + with QueryLockManager(self, query_id) as is_active: + if is_active: + self.read_queues[query_id].put(QueryResult(data=all_results)) + else: + logging.warning( + f"Query ID {query_id} no longer active when trying to return results" + ) + + except Exception as e: + error_msg = f"Failed to execute query across all databases: {e}" + logging.error(error_msg) + with QueryLockManager(self, query_id) as is_active: + if is_active: + self.read_queues[query_id].put( + QueryResult(error=QueryError(error_msg)) + ) + + def get_result( + self, query_id: uuid.UUID, timeout: float = 5.0 + ) -> QueryResult[bool]: + """ + Get the result of a write query. + + Args: + query_id: UUID returned by execute_write + timeout: Maximum time to wait for the result + + Returns: + QueryResult containing success status and any error + """ + if query_id not in self.result_queues: + return QueryResult(error=QueryError("Invalid query ID")) + + try: + result = self.result_queues[query_id].get(timeout=timeout) + + with self.query_lock: + if query_id in self.active_queries: + self.active_queries.remove(query_id) + + return result + except queue.Empty: + return QueryResult(error=QueryError("Query timed out")) + + def get_read_result( + self, query_id: uuid.UUID, timeout: float = 5.0 + ) -> QueryResult[List[Tuple[Any, ...]]]: + """ + Get the result of a read query. + + Args: + query_id: UUID returned by execute_read + timeout: Maximum time to wait for the result + + Returns: + QueryResult containing query results and any error + """ + # Check if the query ID exists in read_queues + with self.query_lock: + if query_id not in self.read_queues: + return QueryResult(error=QueryError("Invalid query ID")) + if query_id not in self.active_queries: + self.active_queries.add(query_id) # Re-add if it was removed + + try: + result = self.read_queues[query_id].get(timeout=timeout) + + with self.query_lock: + if query_id in self.active_queries: + self.active_queries.remove(query_id) + + return result + except queue.Empty: + return QueryResult(error=QueryError("Query timed out")) + + def _start_worker(self) -> None: + """Start the background worker thread for processing write operations.""" + if self.worker_thread and self.worker_thread.is_alive(): + return + + def worker() -> None: + while not self.shutdown_flag.is_set(): + try: + task = self.write_queue.get(timeout=1) # Adjust timeout as needed + if task: + self._process_write_task(task) + except queue.Empty: + continue + except Exception as e: + logging.error(f"Worker thread encountered an error: {e}") + + self.worker_thread = threading.Thread(target=worker, daemon=True) + self.worker_thread.start() + + def _start_cleanup_worker(self) -> None: + """Start the cleanup worker thread for database rotation.""" + threading.Thread(target=self._cleanup_worker, daemon=True).start() + + def _process_write_task(self, task: Tuple[str, Tuple[Any, ...], uuid.UUID]) -> None: + """Process a single write task from the queue.""" + query, params, query_id = task + db_file = self._get_current_db() + try: + with self._get_connection(db_file) as conn: + conn.execute(query, params) + conn.commit() + self.result_queues[query_id].put(QueryResult(data=True)) + except Exception as e: + logging.error(f"Write error: {e}") + self.result_queues[query_id].put(QueryResult(error=e)) + + def _cleanup_worker(self) -> None: + """Worker thread for handling database rotation and cleanup.""" + while not self.shutdown_flag.is_set(): + self._rotate_databases() + self._cleanup_stale_queries() # Also clean up stale queries + time.sleep(self.cleanup_interval) # Use the configurable interval + + def shutdown(self) -> None: + """Gracefully shut down the workers and close connections.""" + self.shutdown_flag.set() + if self.worker_thread: + self.worker_thread.join() + logging.info("SlidingSQLite shutdown completed.") diff --git a/test_slidingsqlite.py b/test_slidingsqlite.py new file mode 100644 index 0000000..2615a2f --- /dev/null +++ b/test_slidingsqlite.py @@ -0,0 +1,99 @@ +import time +import uuid +import hashlib +import threading +import random +from datetime import datetime, timezone +from SlidingSqlite import SlidingSQLite +import logging + +# Set up logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], +) + +# Configuration +NUM_WRITER_THREADS = 4 # Number of writer threads +NUM_READER_THREADS = 2 # Number of reader threads +TARGET_OPS_PER_SECOND = 10 # Target database operations per second + +# Define a more complex schema +db_schema = """ +CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY, + username TEXT UNIQUE, + created_at TEXT +); +CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY, + user_id INTEGER, + event_type TEXT, + event_timestamp TEXT, + hash TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) +); +""" + +# Initialize SlidingSQLite +db = SlidingSQLite( + db_dir="./databases", + schema=db_schema, + rotation_interval=10, + retention_period=60, + cleanup_interval=30, + auto_delete_old_dbs=True, +) + +def generate_md5(): + return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest() + +def insert_user(): + username = f"user_{uuid.uuid4().hex[:8]}" + created_at = datetime.now(timezone.utc).isoformat() + db.execute_write("INSERT INTO users (username, created_at) VALUES (?, ?)", (username, created_at)) + return username + +def insert_event(): + query_id = db.execute_read("SELECT id FROM users ORDER BY RANDOM() LIMIT 1", ()) + result = db.get_read_result(query_id) + if result.success and result.data: + user_id = result.data[0][0] + event_type = "login" if uuid.uuid4().int % 2 == 0 else "logout" + event_timestamp = datetime.now(timezone.utc).isoformat() + event_hash = generate_md5() + db.execute_write("INSERT INTO events (user_id, event_type, event_timestamp, hash) VALUES (?, ?, ?, ?)", (user_id, event_type, event_timestamp, event_hash)) + +def writer_thread(): + while True: + insert_user() + insert_event() + time.sleep(random.uniform(0.05, 0.15)) # Randomized sleep to target ~10 ops/sec + +def reader_thread(): + while True: + query_id = db.execute_read("SELECT e.event_type, u.username, e.event_timestamp FROM events e JOIN users u ON e.user_id = u.id ORDER BY e.event_timestamp DESC LIMIT 5", ()) + result = db.get_read_result(query_id) + if result.success: + logging.info(f"Recent events: {result.data}") + time.sleep(random.uniform(0.5, 1.5)) # Randomized sleep for more natural load + +# Start multiple writer and reader threads +threads = [] +for _ in range(NUM_WRITER_THREADS): + t = threading.Thread(target=writer_thread, daemon=True) + t.start() + threads.append(t) +for _ in range(NUM_READER_THREADS): + t = threading.Thread(target=reader_thread, daemon=True) + t.start() + threads.append(t) + +try: + print("Running multi-threaded SlidingSQLite test. Press Ctrl+C to stop.") + while True: + time.sleep(1) +except KeyboardInterrupt: + print("\nShutting down...") + db.shutdown() diff --git a/test_slidingsqlite_simple.py b/test_slidingsqlite_simple.py new file mode 100644 index 0000000..9455ff6 --- /dev/null +++ b/test_slidingsqlite_simple.py @@ -0,0 +1,101 @@ +import time +import uuid +import hashlib +from datetime import datetime, timezone +from SlidingSqlite import SlidingSQLite +import logging + +# Set up logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], +) + +# Initialize SlidingSQLite with 1-minute rotation and 5-minute retention +db = SlidingSQLite( + db_dir="./databases", + schema="CREATE TABLE IF NOT EXISTS data (id INTEGER PRIMARY KEY, timestamp TEXT, hash TEXT)", + rotation_interval=60, # Rotate every 1 minute (60 seconds) + retention_period=300, # Keep data for 5 minutes (300 seconds) + cleanup_interval=30, # Run cleanup every 30 seconds + auto_delete_old_dbs=True, # Enable automatic deletion of old databases +) + + +# Function to generate a random MD5 hash +def generate_md5(): + return hashlib.md5(str(uuid.uuid4()).encode()).hexdigest() + + +try: + print("Starting test. Press Ctrl+C to stop.") + start_time = time.time() + + while True: + # Generate timestamp and random MD5 hash + timestamp = datetime.now(timezone.utc).isoformat() + md5_hash = generate_md5() + + # Insert data into the database + query_id = db.execute_write( + "INSERT INTO data (timestamp, hash) VALUES (?, ?)", (timestamp, md5_hash) + ) + result = db.get_result(query_id) + + if result.success: + print(f"Inserted: {timestamp} | {md5_hash} | Success: True") + else: + print(f"Failed to insert: {timestamp} | {md5_hash} | Error: {result.error}") + + # Every 10 seconds, query the database + if int(time.time() - start_time) % 10 == 0: + try: + # Ensure at least one record exists before querying + if time.time() - start_time > 2: # Wait at least 2 seconds after start + read_id = db.execute_read("SELECT * FROM data", ()) + + # Add a longer delay to allow the worker thread to process + time.sleep(0.5) + + read_result = db.get_read_result( + read_id, timeout=10.0 + ) # Increased timeout + + if read_result.success: + print( + f"\nStored Entries ({len(read_result.data)}):", + read_result.data[:5], + ) + if len(read_result.data) > 5: + print(f"...and {len(read_result.data) - 5} more entries\n") + else: + print(f"\nError retrieving entries: {read_result.error}\n") + except Exception as e: + print(f"\nException during database query: {e}\n") + + # Test manual database management + if int(time.time() - start_time) % 60 == 0: # Every minute + try: + # Get information about all databases + print("\nDatabase Information:") + for db_info in db.get_databases_info(): + start_time_str = datetime.fromtimestamp(db_info.start_time).strftime('%Y-%m-%d %H:%M:%S') + end_time_str = datetime.fromtimestamp(db_info.end_time).strftime('%Y-%m-%d %H:%M:%S') + print(f" - {db_info.db_file}: {start_time_str} to {end_time_str}") + + # If auto_delete_old_dbs is False, demonstrate manual deletion + if not db.auto_delete_old_dbs: + # Delete databases older than 3 minutes + cutoff_time = time.time() - 180 # 3 minutes ago + deleted_count = db.delete_databases_before(cutoff_time) + print(f"\nManually deleted {deleted_count} databases older than 3 minutes\n") + except Exception as e: + print(f"\nException during database management: {e}\n") + + # Wait a bit before continuing with inserts + time.sleep(0.5) + +except KeyboardInterrupt: + print("\nShutting down...") + db.shutdown() diff --git a/usage_by_gpt.md b/usage_by_gpt.md new file mode 100644 index 0000000..74e78eb --- /dev/null +++ b/usage_by_gpt.md @@ -0,0 +1,91 @@ +# SlidingSQLite Library Documentation + +## Overview +SlidingSQLite is a thread-safe SQLite implementation with automatic time-based database rotation. It allows concurrent read and write operations, supports database rotation based on time intervals, and ensures old databases are automatically cleaned up. + +## Features +- Automatic database file rotation at configurable intervals. +- Retention-based cleanup of old database files. +- Thread-safe, queue-based execution for write operations. +- Transparent access to all historical databases for read queries. +- Synchronous and asynchronous query execution. + +## Installation +Ensure you have Python 3.10+ installed, as well as the necessary standard library modules. +```bash +pip install sqlite3 +``` + +## Initialization +Create a `SlidingSQLite` instance by specifying a directory to store databases and providing a schema for table initialization. +```python +from SlidingSqlite import SlidingSQLite + +db = SlidingSQLite( + db_dir="./databases", + schema=""" + CREATE TABLE IF NOT EXISTS data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL NOT NULL, + value TEXT NOT NULL + ); + """, + retention_period=604800, # 7 days + rotation_interval=3600 # 1 hour +) +``` + +## Writing Data +Use `execute_write_sync` for synchronous writes: +```python +result = db.execute_write_sync("INSERT INTO data (timestamp, value) VALUES (?, ?)", (time.time(), "Hello")) +if result.success: + print("Write successful") +else: + print("Write failed:", result.error) +``` +For asynchronous writes, use `execute_write`: +```python +query_id = db.execute_write("INSERT INTO data (timestamp, value) VALUES (?, ?)", (time.time(), "Async Entry")) +``` + +## Reading Data +Perform synchronous reads: +```python +result = db.execute_read_sync("SELECT * FROM data") +if result.success: + print("Data:", result.data) +else: + print("Read failed:", result.error) +``` +For asynchronous reads, use `execute_read`: +```python +query_id = db.execute_read("SELECT * FROM data") +response = db.get_read_result(query_id) +if response.success: + print("Results:", response.data) +``` + +## Managing Databases +List all databases: +```python +print(db.get_databases_info()) +``` +Delete old databases: +```python +deleted_count = db.delete_databases_before(time.time() - 7 * 86400) +print(f"Deleted {deleted_count} old databases") +``` + +## Shutdown +To gracefully close connections: +```python +db.shutdown() +``` + +## Notes +- Ensure the schema is consistent across all rotated databases. +- All queries execute in separate threads, making it suitable for high-concurrency environments. +- The metadata database (`metadata.db`) tracks all database files and their time ranges. + + diff --git a/usage_by_grok.md b/usage_by_grok.md new file mode 100644 index 0000000..2efd5b5 --- /dev/null +++ b/usage_by_grok.md @@ -0,0 +1,500 @@ + +# SlidingSQLite Usage Documentation + +This document provides detailed instructions on how to use the `SlidingSQLite` library, including its API, configuration options, and best practices. + +## Table of Contents + +1. [Overview](#overview) +2. [Installation](#installation) +3. [Configuration](#configuration) +4. [Basic Usage](#basic-usage) + - [Initializing the Database](#initializing-the-database) + - [Executing Write Queries](#executing-write-queries) + - [Executing Read Queries](#executing-read-queries) + - [Retrieving Results](#retrieving-results) + - [Shutting Down](#shutting-down) +5. [Advanced Usage](#advanced-usage) + - [Multi-Threaded Applications](#multi-threaded-applications) + - [Managing Database Retention](#managing-database-retention) + - [Customizing Cleanup](#customizing-cleanup) + - [Querying Across Time Windows](#querying-across-time-windows) +6. [API Reference](#api-reference) +7. [Error Handling](#error-handling) +8. [Best Practices](#best-practices) +9. [Example](#example) + +## Overview + +`SlidingSQLite` is a thread-safe SQLite wrapper that supports time-based database rotation, making it ideal for applications that need to manage time-series data or logs with automatic cleanup. It provides asynchronous query execution, automatic database rotation, and retention policies, all while ensuring thread safety through a queue-based worker system. + +## Installation + +To use `SlidingSQLite`, ensure you have Python 3.7 or higher installed. The library uses only the standard library and SQLite, which is included with Python. + +1. Copy the `SlidingSqlite.py` file into your project directory. +2. Import the `SlidingSQLite` class in your Python code: + ```python + from SlidingSqlite import SlidingSQLite + ``` + +## Configuration + +The `SlidingSQLite` class is initialized with several configuration parameters: + +- **`db_dir`**: Directory where database files will be stored. +- **`schema`**: SQL schema to initialize new database files (e.g., table definitions). +- **`rotation_interval`**: Time interval (in seconds) after which a new database file is created (default: 3600 seconds, or 1 hour). +- **`retention_period`**: Time period (in seconds) to retain database files before deletion (default: 604800 seconds, or 7 days). +- **`cleanup_interval`**: Frequency (in seconds) of the cleanup process for old databases and stale queries (default: 3600 seconds, or 1 hour). +- **`auto_delete_old_dbs`**: Boolean flag to enable or disable automatic deletion of old databases (default: `True`). + +Example configuration: + +```python +schema = """ +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + message TEXT +); +""" + +db = SlidingSQLite( + db_dir="./databases", + schema=schema, + rotation_interval=3600, # Rotate every hour + retention_period=604800, # Keep databases for 7 days + cleanup_interval=3600, # Run cleanup every hour + auto_delete_old_dbs=True +) +``` + +## Basic Usage + +### Initializing the Database + +Create an instance of `SlidingSQLite` with your desired configuration. This will set up the database directory, initialize the metadata database, and start the background workers for write operations and cleanup. + +```python +from SlidingSqlite import SlidingSQLite +import logging + +logging.basicConfig(level=logging.INFO) + +schema = """ +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + message TEXT +); +""" + +db = SlidingSQLite( + db_dir="./databases", + schema=schema +) +``` + +### Executing Write Queries + +Use the `execute_write` method to perform write operations (e.g., `INSERT`, `UPDATE`, `DELETE`). This method is asynchronous and returns a UUID that can be used to retrieve the result. + +```python +import time + +query_id = db.execute_write( + "INSERT INTO logs (timestamp, message) VALUES (?, ?)", + (time.time(), "Hello, SlidingSQLite!") +) +``` + +For synchronous execution, use `execute_write_sync`, which blocks until the operation completes or times out: + +```python +result = db.execute_write_sync( + "INSERT INTO logs (timestamp, message) VALUES (?, ?)", + (time.time(), "Synchronous write"), + timeout=5.0 +) +if result.success: + logging.info("Write operation successful") +else: + logging.error(f"Write operation failed: {result.error}") +``` + +### Executing Read Queries + +Use the `execute_read` method to perform read operations (e.g., `SELECT`). This method executes the query across all relevant database files, providing a seamless view of time-windowed data. It is asynchronous and returns a UUID. + +```python +query_id = db.execute_read( + "SELECT * FROM logs WHERE timestamp > ? ORDER BY timestamp DESC", + (time.time() - 86400,) # Last 24 hours +) +``` + +For synchronous execution, use `execute_read_sync`: + +```python +result = db.execute_read_sync( + "SELECT * FROM logs WHERE timestamp > ? ORDER BY timestamp DESC", + (time.time() - 86400,), + timeout=5.0 +) +if result.success: + logging.info(f"Found {len(result.data)} log entries: {result.data}") +else: + logging.error(f"Read operation failed: {result.error}") +``` + +### Retrieving Results + +For asynchronous operations, use `get_result` (for write queries) or `get_read_result` (for read queries) to retrieve the results using the UUID returned by `execute_write` or `execute_read`. + +```python +# Write result +result = db.get_result(query_id, timeout=5.0) +if result.success: + logging.info("Write operation successful") +else: + logging.error(f"Write operation failed: {result.error}") + +# Read result +result = db.get_read_result(query_id, timeout=5.0) +if result.success: + logging.info(f"Found {len(result.data)} log entries: {result.data}") +else: + logging.error(f"Read operation failed: {result.error}") +``` + +### Shutting Down + +Always call the `shutdown` method when you are done with the database to ensure graceful cleanup of resources: + +```python +db.shutdown() +``` + +## Advanced Usage + +### Multi-Threaded Applications + +`SlidingSQLite` is designed for multi-threaded environments. It uses queues and locks to ensure thread safety. Here is an example of using multiple writer and reader threads: + +```python +import threading +import time +import random +from SlidingSqlite import SlidingSQLite +import logging + +logging.basicConfig(level=logging.INFO) + +schema = """ +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + message TEXT +); +""" + +db = SlidingSQLite( + db_dir="./databases", + schema=schema, + rotation_interval=10, # Rotate every 10 seconds for testing + retention_period=60, # Keep databases for 60 seconds + cleanup_interval=30 # Run cleanup every 30 seconds +) + +def writer_thread(): + while True: + db.execute_write( + "INSERT INTO logs (timestamp, message) VALUES (?, ?)", + (time.time(), f"Message from thread {threading.current_thread().name}") + ) + time.sleep(random.uniform(0.05, 0.15)) + +def reader_thread(): + while True: + result = db.execute_read_sync( + "SELECT * FROM logs ORDER BY timestamp DESC LIMIT 5", + timeout=5.0 + ) + if result.success: + logging.info(f"Recent logs: {result.data}") + time.sleep(random.uniform(0.5, 1.5)) + +threads = [] +for _ in range(4): # Start 4 writer threads + t = threading.Thread(target=writer_thread, daemon=True) + t.start() + threads.append(t) +for _ in range(2): # Start 2 reader threads + t = threading.Thread(target=reader_thread, daemon=True) + t.start() + threads.append(t) + +try: + while True: + time.sleep(1) +except KeyboardInterrupt: + print("\nShutting down...") + db.shutdown() +``` + +### Managing Database Retention + +You can configure the retention period and control database deletion: + +- **Set Retention Period**: Use `set_retention_period` to change how long databases are kept: + + ```python + db.set_retention_period(86400) # Keep databases for 1 day + ``` + +- **Enable/Disable Auto-Delete**: Use `set_auto_delete` to control automatic deletion of old databases: + + ```python + db.set_auto_delete(False) # Disable automatic deletion + ``` + +- **Manual Deletion**: Use `delete_databases_before` or `delete_databases_in_range` to manually delete databases: + + ```python + import time + + # Delete all databases before a specific timestamp + count = db.delete_databases_before(time.time() - 86400) + logging.info(f"Deleted {count} databases") + + # Delete databases in a specific time range + count = db.delete_databases_in_range(time.time() - 172800, time.time() - 86400) + logging.info(f"Deleted {count} databases in range") + ``` + +### Customizing Cleanup + +You can adjust the cleanup interval to control how often the system checks for old databases and stale queries: + +```python +db = SlidingSQLite( + db_dir="./databases", + schema=schema, + cleanup_interval=1800 # Run cleanup every 30 minutes +) +``` + +### Querying Across Time Windows + +Read queries are automatically executed across all relevant database files, providing a unified view of data across time windows. This is particularly useful for time-series data or logs. For example: + +```python +result = db.execute_read_sync( + "SELECT timestamp, message FROM logs WHERE timestamp > ? ORDER BY timestamp DESC", + (time.time() - 604800,) # Last 7 days +) +if result.success: + logging.info(f"Found {len(result.data)} log entries: {result.data}") +``` + +## API Reference + +### `SlidingSQLite` Class + +#### Initialization + +```python +SlidingSQLite( + db_dir: str, + schema: str, + retention_period: int = 604800, + rotation_interval: int = 3600, + cleanup_interval: int = 3600, + auto_delete_old_dbs: bool = True +) +``` + +- **Parameters**: + - `db_dir`: Directory to store database files. + - `schema`: SQL schema to initialize new databases. + - `retention_period`: Seconds to keep databases before deletion. + - `rotation_interval`: Seconds between database rotations. + - `cleanup_interval`: Seconds between cleanup operations. + - `auto_delete_old_dbs`: Whether to automatically delete old databases. + +#### Methods + +- **`execute(query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID`**: + Smart query executor that routes read or write operations appropriately. + +- **`execute_write(query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID`**: + Execute a write query asynchronously. Returns a UUID for result retrieval. + +- **`execute_write_sync(query: str, params: Tuple[Any, ...] = (), timeout: float = 5.0) -> QueryResult[bool]`**: + Execute a write query synchronously. Returns a `QueryResult` object. + +- **`execute_read(query: str, params: Tuple[Any, ...] = ()) -> uuid.UUID`**: + Execute a read query asynchronously across all databases. Returns a UUID. + +- **`execute_read_sync(query: str, params: Tuple[Any, ...] = (), timeout: float = 5.0) -> QueryResult[List[Tuple[Any, ...]]]`**: + Execute a read query synchronously across all databases. Returns a `QueryResult`. + +- **`get_result(query_id: uuid.UUID, timeout: float = 5.0) -> QueryResult[bool]`**: + Retrieve the result of a write query using its UUID. + +- **`get_read_result(query_id: uuid.UUID, timeout: float = 5.0) -> QueryResult[List[Tuple[Any, ...]]]`**: + Retrieve the result of a read query using its UUID. + +- **`set_retention_period(seconds: int) -> None`**: + Set the retention period for databases. + +- **`set_auto_delete(enabled: bool) -> None`**: + Enable or disable automatic deletion of old databases. + +- **`delete_databases_before(timestamp: float) -> int`**: + Delete all databases with `end_time` before the specified timestamp. Returns the number of databases deleted. + +- **`delete_databases_in_range(start_time: float, end_time: float) -> int`**: + Delete all databases overlapping with the specified time range. Returns the number of databases deleted. + +- **`get_databases_info() -> List[DatabaseTimeframe]`**: + Get information about all available databases, including file paths and time ranges. + +- **`shutdown() -> None`**: + Gracefully shut down the database, stopping workers and closing connections. + +### `QueryResult` Class + +A generic class to handle query results with error handling. + +- **Attributes**: + - `data`: The result data (if successful). + - `error`: The exception (if failed). + - `success`: Boolean indicating if the query was successful. + +- **Usage**: + ```python + result = db.execute_write_sync("INSERT INTO logs (timestamp, message) VALUES (?, ?)", (time.time(), "Test")) + if result.success: + print("Success:", result.data) + else: + print("Error:", result.error) + ``` + +### Exceptions + +- **`DatabaseError`**: Base exception for all database errors. +- **`QueryError`**: Exception raised when a query fails. + +## Error Handling + +`SlidingSQLite` provides robust error handling through the `QueryResult` class and custom exceptions. Always check the `success` attribute of a `QueryResult` object and handle potential errors: + +```python +result = db.execute_read_sync("SELECT * FROM logs", timeout=5.0) +if result.success: + print("Data:", result.data) +else: + print("Error:", result.error) +``` + +Common errors include: + +- **Query Timeout**: If a query takes longer than the specified timeout, a `QueryError` with "Query timed out" is returned. +- **Invalid Query ID**: Attempting to retrieve results with an invalid UUID results in a `QueryError`. +- **Database Errors**: SQLite errors are wrapped in `DatabaseError` or `QueryError`. + +## Best Practices + +1. **Always Shut Down**: Call `db.shutdown()` when your application exits to ensure resources are cleaned up properly. +2. **Use Timeouts**: Specify appropriate timeouts for synchronous operations to avoid blocking indefinitely. +3. **Handle Errors**: Always check the `success` attribute of `QueryResult` objects and handle errors appropriately. +4. **Configure Retention**: Choose a retention period that balances disk usage and data availability needs. +5. **Monitor Disk Space**: Even with automatic cleanup, monitor disk space usage in production environments. +6. **Thread Safety**: Use `SlidingSQLite` in multi-threaded applications without additional synchronization, as it is thread-safe by design. +7. **Optimize Queries**: For read operations across many databases, optimize your queries to reduce execution time, especially if the number of database files is large. + +## Example + +Here is a complete example demonstrating multi-threaded usage, including configuration, query execution, and cleanup: + +```python +import time +import uuid +import threading +import random +from datetime import datetime, timezone +from SlidingSqlite import SlidingSQLite +import logging + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], +) + +# Configuration +NUM_WRITER_THREADS = 4 +NUM_READER_THREADS = 2 +TARGET_OPS_PER_SECOND = 10 + +# Define a schema +db_schema = """ +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + message TEXT +); +""" + +# Initialize SlidingSQLite +db = SlidingSQLite( + db_dir="./databases", + schema=db_schema, + rotation_interval=10, # Rotate every 10 seconds for testing + retention_period=60, # Keep databases for 60 seconds + cleanup_interval=30, # Run cleanup every 30 seconds + auto_delete_old_dbs=True, +) + +def writer_thread(): + while True: + db.execute_write( + "INSERT INTO logs (timestamp, message) VALUES (?, ?)", + (time.time(), f"Message from thread {threading.current_thread().name}") + ) + time.sleep(random.uniform(0.05, 0.15)) # Target ~10 ops/sec + +def reader_thread(): + while True: + result = db.execute_read_sync( + "SELECT * FROM logs ORDER BY timestamp DESC LIMIT 5", + timeout=5.0 + ) + if result.success: + logging.info(f"Recent logs: {result.data}") + time.sleep(random.uniform(0.5, 1.5)) # Randomized sleep for natural load + +# Start threads +threads = [] +for _ in range(NUM_WRITER_THREADS): + t = threading.Thread(target=writer_thread, daemon=True) + t.start() + threads.append(t) +for _ in range(NUM_READER_THREADS): + t = threading.Thread(target=reader_thread, daemon=True) + t.start() + threads.append(t) + +try: + print("Running multi-threaded SlidingSQLite test. Press Ctrl+C to stop.") + while True: + time.sleep(1) +except KeyboardInterrupt: + print("\nShutting down...") + db.shutdown() +``` + +This example demonstrates how to set up a multi-threaded application with `SlidingSQLite`, including logging, configuration, and proper shutdown handling.