From 822ea3a453e5feb8ade73aa89c3e0fa5e8fddf53 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Sun, 16 Mar 2025 19:49:00 +0200 Subject: [PATCH] The library is now more robust but the test script needs two Ctrl+C's to stop. --- SlidingSqlite.py | 176 +++++++++++++++++++++++++++++------------- test_slidingsqlite.py | 8 +- 2 files changed, 128 insertions(+), 56 deletions(-) diff --git a/SlidingSqlite.py b/SlidingSqlite.py index 736d266..58b6efd 100644 --- a/SlidingSqlite.py +++ b/SlidingSqlite.py @@ -111,7 +111,7 @@ class SlidingSQLite: self.retention_period = retention_period # In seconds self.rotation_interval = rotation_interval # In seconds self.cleanup_interval = cleanup_interval # In seconds - self.auto_delete_old_dbs = auto_delete_old_dbs # New field + self.auto_delete_old_dbs = auto_delete_old_dbs # Queues for operations self.write_queue: queue.Queue[Tuple[str, Tuple[Any, ...], uuid.UUID]] = ( @@ -125,30 +125,42 @@ class SlidingSQLite: # Thread synchronization self.shutdown_flag = threading.Event() self.worker_thread = None + self.cleanup_thread = None + self._init_complete = threading.Event() # Added to delay cleanup worker # Cache for database connections self.connections: Dict[str, sqlite3.Connection] = {} self.conn_lock = threading.Lock() + # Thread-local storage for metadata connections + self._thread_local = threading.local() + # Track active query IDs for cleanup self.active_queries: Set[uuid.UUID] = set() self.query_lock = threading.Lock() + # Cache for current database + self._current_db_cache = None + self._current_db_expiry = 0 + # Initialize system self._setup() + self._init_complete.set() # Signal that initialization is complete def _setup(self) -> None: """Setup the database directory and initialize workers""" try: + print("Creating database directory...") os.makedirs(self.db_dir, exist_ok=True) + print("Initializing metadata database...") self._init_metadata() - - # Start worker threads + print("Registering current database...") + self._register_current_db() + print("Starting write worker thread...") self._start_worker() + print("Starting cleanup worker thread...") self._start_cleanup_worker() - # Register current database - self._register_current_db() except Exception as e: logging.error(f"Failed to initialize SlidingSQLite: {e}") raise DatabaseError(f"Failed to initialize SlidingSQLite: {e}") @@ -183,6 +195,17 @@ class SlidingSQLite: Returns: SQLite connection object """ + if db_file == self._get_metadata_db(): + # Use thread-local storage for metadata DB to avoid threading issues + if not hasattr(self._thread_local, "metadata_conn"): + conn = sqlite3.connect( + db_file, isolation_level=None, check_same_thread=False + ) + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA busy_timeout=5000;") + self._thread_local.metadata_conn = conn + return self._thread_local.metadata_conn + with self.conn_lock: if db_file not in self.connections or self.connections[db_file] is None: try: @@ -190,11 +213,7 @@ class SlidingSQLite: db_file, isolation_level=None, check_same_thread=False ) conn.execute("PRAGMA journal_mode=WAL;") - conn.execute( - "PRAGMA busy_timeout=5000;" - ) # Wait up to 5 seconds when database is locked - - # Initialize schema if this is a data database (not metadata) + conn.execute("PRAGMA busy_timeout=5000;") if db_file != self._get_metadata_db(): conn.executescript(self.schema) @@ -210,45 +229,56 @@ class SlidingSQLite: return os.path.join(self.db_dir, "metadata.db") def _get_current_db(self) -> str: - """Get the path to the current time-based database""" - # Generate timestamped DB name based on rotation interval + """Get the path to the current time-based database, using cache""" now = time.time() - interval_timestamp = int(now // self.rotation_interval) * self.rotation_interval - timestamp_str = datetime.datetime.fromtimestamp(interval_timestamp).strftime( - "%Y%m%d_%H%M%S" - ) - return os.path.join(self.db_dir, f"data_{timestamp_str}.db") + if now >= self._current_db_expiry or not self._current_db_cache: + # Generate timestamped DB name based on rotation interval + interval_timestamp = int(now // self.rotation_interval) * self.rotation_interval + timestamp_str = datetime.datetime.fromtimestamp(interval_timestamp).strftime( + "%Y%m%d_%H%M%S" + ) + self._current_db_cache = os.path.join(self.db_dir, f"data_{timestamp_str}.db") + self._current_db_expiry = interval_timestamp + self.rotation_interval + return self._current_db_cache def _register_current_db(self) -> None: """Register the current database in the metadata table""" current_db = self._get_current_db() + if not isinstance(current_db, str) or not current_db: + logging.error(f"Invalid current_db path: {current_db}") + return + now = time.time() # Calculate time boundaries for the current database interval_start = int(now // self.rotation_interval) * self.rotation_interval interval_end = interval_start + self.rotation_interval - try: - with self._get_connection(self._get_metadata_db()) as conn: - # Check if this database is already registered - existing = conn.execute( - "SELECT id FROM metadata WHERE db_file = ?", (current_db,) - ).fetchone() - - if not existing: - conn.execute( - "INSERT INTO metadata (db_file, start_time, end_time) VALUES (?, ?, ?)", - (current_db, interval_start, interval_end), + with self.conn_lock: # Synchronize access to prevent race conditions + try: + with self._get_connection(self._get_metadata_db()) as conn: + # Check if this database is already registered + cursor = conn.execute( + "SELECT id FROM metadata WHERE db_file = ?", (current_db,) ) - conn.commit() - except sqlite3.Error as e: - logging.error(f"Failed to register current database: {e}") - # Continue execution as this is not critical + existing = cursor.fetchone() + if existing: + logging.debug(f"Database {current_db} already registered") + else: + conn.execute( + "INSERT OR IGNORE INTO metadata (db_file, start_time, end_time) VALUES (?, ?, ?)", + (current_db, interval_start, interval_end), + ) + conn.commit() + logging.debug(f"Registered new database: {current_db}") + except sqlite3.Error as e: + logging.error(f"Failed to register current database: {e}") + # Continue execution as this is not critical def _rotate_databases(self) -> None: """Delete databases that are older than the retention period""" if not self.auto_delete_old_dbs: - return # Skip deletion if auto-delete is disabled + return cutoff_time = time.time() - self.retention_period @@ -293,12 +323,6 @@ class SlidingSQLite: def _delete_database_file(self, db_file: str) -> bool: """ Delete a database file and clean up resources. - - Args: - db_file: Path to the database file to delete - - Returns: - True if deleted successfully, False otherwise """ # Close and remove connection if it exists with self.conn_lock: @@ -306,10 +330,16 @@ class SlidingSQLite: try: self.connections[db_file].close() except sqlite3.Error: - pass # Ignore errors on close + pass del self.connections[db_file] - # Delete the file + if db_file == self._get_metadata_db() and hasattr(self._thread_local, "metadata_conn"): + try: + self._thread_local.metadata_conn.close() + except sqlite3.Error: + pass + delattr(self._thread_local, "metadata_conn") + if os.path.exists(db_file): try: os.remove(db_file) @@ -318,7 +348,7 @@ class SlidingSQLite: except OSError as e: logging.error(f"Failed to delete database {db_file}: {e}") return False - return False # File didn't exist + return False def set_retention_period(self, seconds: int) -> None: """ @@ -327,7 +357,7 @@ class SlidingSQLite: Args: seconds: Number of seconds to keep databases before deletion """ - self.retention_period = max(0, seconds) # Ensure positive value + self.retention_period = max(0, seconds) def set_auto_delete(self, enabled: bool) -> None: """ @@ -453,8 +483,8 @@ class SlidingSQLite: Returns: UUID that can be used to retrieve the result """ - # look for new database files - self._register_current_db() + if not self._current_db_cache or time.time() >= self._current_db_expiry: + self._register_current_db() query_upper = query.strip().upper() @@ -479,8 +509,8 @@ class SlidingSQLite: Returns: UUID that can be used to retrieve the result """ - # look for new database files - self._register_current_db() + if not self._current_db_cache or time.time() >= self._current_db_expiry: + self._register_current_db() query_id = uuid.uuid4() @@ -520,8 +550,8 @@ class SlidingSQLite: Returns: UUID that can be used to retrieve the result """ - # look for new database files - self._register_current_db() + if not self._current_db_cache or time.time() >= self._current_db_expiry: + self._register_current_db() query_id = uuid.uuid4() @@ -582,8 +612,18 @@ class SlidingSQLite: "SELECT db_file FROM metadata ORDER BY end_time DESC" ).fetchall() + if not db_files: + logging.warning("No database files found in metadata for read operation") + with QueryLockManager(self, query_id) as is_active: + if is_active: + self.read_queues[query_id].put(QueryResult(data=[])) + return + all_results: List[Tuple[Any, ...]] = [] for (db_file,) in db_files: + if not isinstance(db_file, str): + logging.error(f"Invalid db_file in metadata: {db_file}") + continue if os.path.exists(db_file): try: with self._get_connection(db_file) as conn: @@ -656,7 +696,7 @@ class SlidingSQLite: if query_id not in self.read_queues: return QueryResult(error=QueryError("Invalid query ID")) if query_id not in self.active_queries: - self.active_queries.add(query_id) # Re-add if it was removed + self.active_queries.add(query_id) try: result = self.read_queues[query_id].get(timeout=timeout) @@ -677,7 +717,7 @@ class SlidingSQLite: def worker() -> None: while not self.shutdown_flag.is_set(): try: - task = self.write_queue.get(timeout=1) # Adjust timeout as needed + task = self.write_queue.get(timeout=1) if task: self._process_write_task(task) except queue.Empty: @@ -690,7 +730,19 @@ class SlidingSQLite: def _start_cleanup_worker(self) -> None: """Start the cleanup worker thread for database rotation.""" - threading.Thread(target=self._cleanup_worker, daemon=True).start() + if self.cleanup_thread and self.cleanup_thread.is_alive(): + return + + def cleanup_worker() -> None: + # Wait for initialization to complete before starting cleanup + self._init_complete.wait() + while not self.shutdown_flag.is_set(): + self._rotate_databases() + self._cleanup_stale_queries() + time.sleep(self.cleanup_interval) + + self.cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) + self.cleanup_thread.start() def _process_write_task(self, task: Tuple[str, Tuple[Any, ...], uuid.UUID]) -> None: """Process a single write task from the queue.""" @@ -707,14 +759,30 @@ class SlidingSQLite: def _cleanup_worker(self) -> None: """Worker thread for handling database rotation and cleanup.""" + self._init_complete.wait() while not self.shutdown_flag.is_set(): self._rotate_databases() - self._cleanup_stale_queries() # Also clean up stale queries - time.sleep(self.cleanup_interval) # Use the configurable interval + self._cleanup_stale_queries() + time.sleep(self.cleanup_interval) def shutdown(self) -> None: """Gracefully shut down the workers and close connections.""" self.shutdown_flag.set() if self.worker_thread: self.worker_thread.join() + if self.cleanup_thread: + self.cleanup_thread.join() + with self.conn_lock: + for db_file, conn in list(self.connections.items()): + try: + conn.close() + except sqlite3.Error: + pass + del self.connections[db_file] + if hasattr(self._thread_local, "metadata_conn"): + try: + self._thread_local.metadata_conn.close() + except sqlite3.Error: + pass + delattr(self._thread_local, "metadata_conn") logging.info("SlidingSQLite shutdown completed.") diff --git a/test_slidingsqlite.py b/test_slidingsqlite.py index 2615a2f..c8e049c 100644 --- a/test_slidingsqlite.py +++ b/test_slidingsqlite.py @@ -11,7 +11,10 @@ import logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[logging.StreamHandler()], + handlers=[ + logging.StreamHandler(), + logging.FileHandler("test-slidingsql.log", mode="a") # "a" appends to the file + ], ) # Configuration @@ -76,7 +79,8 @@ def reader_thread(): query_id = db.execute_read("SELECT e.event_type, u.username, e.event_timestamp FROM events e JOIN users u ON e.user_id = u.id ORDER BY e.event_timestamp DESC LIMIT 5", ()) result = db.get_read_result(query_id) if result.success: - logging.info(f"Recent events: {result.data}") + log_message = f"Recent events: {result.data}" + logging.info(log_message[:100] + "..." if len(log_message) > 100 else log_message) time.sleep(random.uniform(0.5, 1.5)) # Randomized sleep for more natural load # Start multiple writer and reader threads