The library is now more robust but the test script needs two Ctrl+C's to stop.
This commit is contained in:
parent
dc5d652503
commit
822ea3a453
176
SlidingSqlite.py
176
SlidingSqlite.py
@ -111,7 +111,7 @@ class SlidingSQLite:
|
||||
self.retention_period = retention_period # In seconds
|
||||
self.rotation_interval = rotation_interval # In seconds
|
||||
self.cleanup_interval = cleanup_interval # In seconds
|
||||
self.auto_delete_old_dbs = auto_delete_old_dbs # New field
|
||||
self.auto_delete_old_dbs = auto_delete_old_dbs
|
||||
|
||||
# Queues for operations
|
||||
self.write_queue: queue.Queue[Tuple[str, Tuple[Any, ...], uuid.UUID]] = (
|
||||
@ -125,30 +125,42 @@ class SlidingSQLite:
|
||||
# Thread synchronization
|
||||
self.shutdown_flag = threading.Event()
|
||||
self.worker_thread = None
|
||||
self.cleanup_thread = None
|
||||
self._init_complete = threading.Event() # Added to delay cleanup worker
|
||||
|
||||
# Cache for database connections
|
||||
self.connections: Dict[str, sqlite3.Connection] = {}
|
||||
self.conn_lock = threading.Lock()
|
||||
|
||||
# Thread-local storage for metadata connections
|
||||
self._thread_local = threading.local()
|
||||
|
||||
# Track active query IDs for cleanup
|
||||
self.active_queries: Set[uuid.UUID] = set()
|
||||
self.query_lock = threading.Lock()
|
||||
|
||||
# Cache for current database
|
||||
self._current_db_cache = None
|
||||
self._current_db_expiry = 0
|
||||
|
||||
# Initialize system
|
||||
self._setup()
|
||||
self._init_complete.set() # Signal that initialization is complete
|
||||
|
||||
def _setup(self) -> None:
|
||||
"""Setup the database directory and initialize workers"""
|
||||
try:
|
||||
print("Creating database directory...")
|
||||
os.makedirs(self.db_dir, exist_ok=True)
|
||||
print("Initializing metadata database...")
|
||||
self._init_metadata()
|
||||
|
||||
# Start worker threads
|
||||
print("Registering current database...")
|
||||
self._register_current_db()
|
||||
print("Starting write worker thread...")
|
||||
self._start_worker()
|
||||
print("Starting cleanup worker thread...")
|
||||
self._start_cleanup_worker()
|
||||
|
||||
# Register current database
|
||||
self._register_current_db()
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to initialize SlidingSQLite: {e}")
|
||||
raise DatabaseError(f"Failed to initialize SlidingSQLite: {e}")
|
||||
@ -183,6 +195,17 @@ class SlidingSQLite:
|
||||
Returns:
|
||||
SQLite connection object
|
||||
"""
|
||||
if db_file == self._get_metadata_db():
|
||||
# Use thread-local storage for metadata DB to avoid threading issues
|
||||
if not hasattr(self._thread_local, "metadata_conn"):
|
||||
conn = sqlite3.connect(
|
||||
db_file, isolation_level=None, check_same_thread=False
|
||||
)
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
conn.execute("PRAGMA busy_timeout=5000;")
|
||||
self._thread_local.metadata_conn = conn
|
||||
return self._thread_local.metadata_conn
|
||||
|
||||
with self.conn_lock:
|
||||
if db_file not in self.connections or self.connections[db_file] is None:
|
||||
try:
|
||||
@ -190,11 +213,7 @@ class SlidingSQLite:
|
||||
db_file, isolation_level=None, check_same_thread=False
|
||||
)
|
||||
conn.execute("PRAGMA journal_mode=WAL;")
|
||||
conn.execute(
|
||||
"PRAGMA busy_timeout=5000;"
|
||||
) # Wait up to 5 seconds when database is locked
|
||||
|
||||
# Initialize schema if this is a data database (not metadata)
|
||||
conn.execute("PRAGMA busy_timeout=5000;")
|
||||
if db_file != self._get_metadata_db():
|
||||
conn.executescript(self.schema)
|
||||
|
||||
@ -210,45 +229,56 @@ class SlidingSQLite:
|
||||
return os.path.join(self.db_dir, "metadata.db")
|
||||
|
||||
def _get_current_db(self) -> str:
|
||||
"""Get the path to the current time-based database"""
|
||||
# Generate timestamped DB name based on rotation interval
|
||||
"""Get the path to the current time-based database, using cache"""
|
||||
now = time.time()
|
||||
interval_timestamp = int(now // self.rotation_interval) * self.rotation_interval
|
||||
timestamp_str = datetime.datetime.fromtimestamp(interval_timestamp).strftime(
|
||||
"%Y%m%d_%H%M%S"
|
||||
)
|
||||
return os.path.join(self.db_dir, f"data_{timestamp_str}.db")
|
||||
if now >= self._current_db_expiry or not self._current_db_cache:
|
||||
# Generate timestamped DB name based on rotation interval
|
||||
interval_timestamp = int(now // self.rotation_interval) * self.rotation_interval
|
||||
timestamp_str = datetime.datetime.fromtimestamp(interval_timestamp).strftime(
|
||||
"%Y%m%d_%H%M%S"
|
||||
)
|
||||
self._current_db_cache = os.path.join(self.db_dir, f"data_{timestamp_str}.db")
|
||||
self._current_db_expiry = interval_timestamp + self.rotation_interval
|
||||
return self._current_db_cache
|
||||
|
||||
def _register_current_db(self) -> None:
|
||||
"""Register the current database in the metadata table"""
|
||||
current_db = self._get_current_db()
|
||||
if not isinstance(current_db, str) or not current_db:
|
||||
logging.error(f"Invalid current_db path: {current_db}")
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
|
||||
# Calculate time boundaries for the current database
|
||||
interval_start = int(now // self.rotation_interval) * self.rotation_interval
|
||||
interval_end = interval_start + self.rotation_interval
|
||||
|
||||
try:
|
||||
with self._get_connection(self._get_metadata_db()) as conn:
|
||||
# Check if this database is already registered
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM metadata WHERE db_file = ?", (current_db,)
|
||||
).fetchone()
|
||||
|
||||
if not existing:
|
||||
conn.execute(
|
||||
"INSERT INTO metadata (db_file, start_time, end_time) VALUES (?, ?, ?)",
|
||||
(current_db, interval_start, interval_end),
|
||||
with self.conn_lock: # Synchronize access to prevent race conditions
|
||||
try:
|
||||
with self._get_connection(self._get_metadata_db()) as conn:
|
||||
# Check if this database is already registered
|
||||
cursor = conn.execute(
|
||||
"SELECT id FROM metadata WHERE db_file = ?", (current_db,)
|
||||
)
|
||||
conn.commit()
|
||||
except sqlite3.Error as e:
|
||||
logging.error(f"Failed to register current database: {e}")
|
||||
# Continue execution as this is not critical
|
||||
existing = cursor.fetchone()
|
||||
if existing:
|
||||
logging.debug(f"Database {current_db} already registered")
|
||||
else:
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO metadata (db_file, start_time, end_time) VALUES (?, ?, ?)",
|
||||
(current_db, interval_start, interval_end),
|
||||
)
|
||||
conn.commit()
|
||||
logging.debug(f"Registered new database: {current_db}")
|
||||
except sqlite3.Error as e:
|
||||
logging.error(f"Failed to register current database: {e}")
|
||||
# Continue execution as this is not critical
|
||||
|
||||
def _rotate_databases(self) -> None:
|
||||
"""Delete databases that are older than the retention period"""
|
||||
if not self.auto_delete_old_dbs:
|
||||
return # Skip deletion if auto-delete is disabled
|
||||
return
|
||||
|
||||
cutoff_time = time.time() - self.retention_period
|
||||
|
||||
@ -293,12 +323,6 @@ class SlidingSQLite:
|
||||
def _delete_database_file(self, db_file: str) -> bool:
|
||||
"""
|
||||
Delete a database file and clean up resources.
|
||||
|
||||
Args:
|
||||
db_file: Path to the database file to delete
|
||||
|
||||
Returns:
|
||||
True if deleted successfully, False otherwise
|
||||
"""
|
||||
# Close and remove connection if it exists
|
||||
with self.conn_lock:
|
||||
@ -306,10 +330,16 @@ class SlidingSQLite:
|
||||
try:
|
||||
self.connections[db_file].close()
|
||||
except sqlite3.Error:
|
||||
pass # Ignore errors on close
|
||||
pass
|
||||
del self.connections[db_file]
|
||||
|
||||
# Delete the file
|
||||
if db_file == self._get_metadata_db() and hasattr(self._thread_local, "metadata_conn"):
|
||||
try:
|
||||
self._thread_local.metadata_conn.close()
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
delattr(self._thread_local, "metadata_conn")
|
||||
|
||||
if os.path.exists(db_file):
|
||||
try:
|
||||
os.remove(db_file)
|
||||
@ -318,7 +348,7 @@ class SlidingSQLite:
|
||||
except OSError as e:
|
||||
logging.error(f"Failed to delete database {db_file}: {e}")
|
||||
return False
|
||||
return False # File didn't exist
|
||||
return False
|
||||
|
||||
def set_retention_period(self, seconds: int) -> None:
|
||||
"""
|
||||
@ -327,7 +357,7 @@ class SlidingSQLite:
|
||||
Args:
|
||||
seconds: Number of seconds to keep databases before deletion
|
||||
"""
|
||||
self.retention_period = max(0, seconds) # Ensure positive value
|
||||
self.retention_period = max(0, seconds)
|
||||
|
||||
def set_auto_delete(self, enabled: bool) -> None:
|
||||
"""
|
||||
@ -453,8 +483,8 @@ class SlidingSQLite:
|
||||
Returns:
|
||||
UUID that can be used to retrieve the result
|
||||
"""
|
||||
# look for new database files
|
||||
self._register_current_db()
|
||||
if not self._current_db_cache or time.time() >= self._current_db_expiry:
|
||||
self._register_current_db()
|
||||
|
||||
query_upper = query.strip().upper()
|
||||
|
||||
@ -479,8 +509,8 @@ class SlidingSQLite:
|
||||
Returns:
|
||||
UUID that can be used to retrieve the result
|
||||
"""
|
||||
# look for new database files
|
||||
self._register_current_db()
|
||||
if not self._current_db_cache or time.time() >= self._current_db_expiry:
|
||||
self._register_current_db()
|
||||
|
||||
query_id = uuid.uuid4()
|
||||
|
||||
@ -520,8 +550,8 @@ class SlidingSQLite:
|
||||
Returns:
|
||||
UUID that can be used to retrieve the result
|
||||
"""
|
||||
# look for new database files
|
||||
self._register_current_db()
|
||||
if not self._current_db_cache or time.time() >= self._current_db_expiry:
|
||||
self._register_current_db()
|
||||
|
||||
query_id = uuid.uuid4()
|
||||
|
||||
@ -582,8 +612,18 @@ class SlidingSQLite:
|
||||
"SELECT db_file FROM metadata ORDER BY end_time DESC"
|
||||
).fetchall()
|
||||
|
||||
if not db_files:
|
||||
logging.warning("No database files found in metadata for read operation")
|
||||
with QueryLockManager(self, query_id) as is_active:
|
||||
if is_active:
|
||||
self.read_queues[query_id].put(QueryResult(data=[]))
|
||||
return
|
||||
|
||||
all_results: List[Tuple[Any, ...]] = []
|
||||
for (db_file,) in db_files:
|
||||
if not isinstance(db_file, str):
|
||||
logging.error(f"Invalid db_file in metadata: {db_file}")
|
||||
continue
|
||||
if os.path.exists(db_file):
|
||||
try:
|
||||
with self._get_connection(db_file) as conn:
|
||||
@ -656,7 +696,7 @@ class SlidingSQLite:
|
||||
if query_id not in self.read_queues:
|
||||
return QueryResult(error=QueryError("Invalid query ID"))
|
||||
if query_id not in self.active_queries:
|
||||
self.active_queries.add(query_id) # Re-add if it was removed
|
||||
self.active_queries.add(query_id)
|
||||
|
||||
try:
|
||||
result = self.read_queues[query_id].get(timeout=timeout)
|
||||
@ -677,7 +717,7 @@ class SlidingSQLite:
|
||||
def worker() -> None:
|
||||
while not self.shutdown_flag.is_set():
|
||||
try:
|
||||
task = self.write_queue.get(timeout=1) # Adjust timeout as needed
|
||||
task = self.write_queue.get(timeout=1)
|
||||
if task:
|
||||
self._process_write_task(task)
|
||||
except queue.Empty:
|
||||
@ -690,7 +730,19 @@ class SlidingSQLite:
|
||||
|
||||
def _start_cleanup_worker(self) -> None:
|
||||
"""Start the cleanup worker thread for database rotation."""
|
||||
threading.Thread(target=self._cleanup_worker, daemon=True).start()
|
||||
if self.cleanup_thread and self.cleanup_thread.is_alive():
|
||||
return
|
||||
|
||||
def cleanup_worker() -> None:
|
||||
# Wait for initialization to complete before starting cleanup
|
||||
self._init_complete.wait()
|
||||
while not self.shutdown_flag.is_set():
|
||||
self._rotate_databases()
|
||||
self._cleanup_stale_queries()
|
||||
time.sleep(self.cleanup_interval)
|
||||
|
||||
self.cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
|
||||
self.cleanup_thread.start()
|
||||
|
||||
def _process_write_task(self, task: Tuple[str, Tuple[Any, ...], uuid.UUID]) -> None:
|
||||
"""Process a single write task from the queue."""
|
||||
@ -707,14 +759,30 @@ class SlidingSQLite:
|
||||
|
||||
def _cleanup_worker(self) -> None:
|
||||
"""Worker thread for handling database rotation and cleanup."""
|
||||
self._init_complete.wait()
|
||||
while not self.shutdown_flag.is_set():
|
||||
self._rotate_databases()
|
||||
self._cleanup_stale_queries() # Also clean up stale queries
|
||||
time.sleep(self.cleanup_interval) # Use the configurable interval
|
||||
self._cleanup_stale_queries()
|
||||
time.sleep(self.cleanup_interval)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""Gracefully shut down the workers and close connections."""
|
||||
self.shutdown_flag.set()
|
||||
if self.worker_thread:
|
||||
self.worker_thread.join()
|
||||
if self.cleanup_thread:
|
||||
self.cleanup_thread.join()
|
||||
with self.conn_lock:
|
||||
for db_file, conn in list(self.connections.items()):
|
||||
try:
|
||||
conn.close()
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
del self.connections[db_file]
|
||||
if hasattr(self._thread_local, "metadata_conn"):
|
||||
try:
|
||||
self._thread_local.metadata_conn.close()
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
delattr(self._thread_local, "metadata_conn")
|
||||
logging.info("SlidingSQLite shutdown completed.")
|
||||
|
@ -11,7 +11,10 @@ import logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler("test-slidingsql.log", mode="a") # "a" appends to the file
|
||||
],
|
||||
)
|
||||
|
||||
# Configuration
|
||||
@ -76,7 +79,8 @@ def reader_thread():
|
||||
query_id = db.execute_read("SELECT e.event_type, u.username, e.event_timestamp FROM events e JOIN users u ON e.user_id = u.id ORDER BY e.event_timestamp DESC LIMIT 5", ())
|
||||
result = db.get_read_result(query_id)
|
||||
if result.success:
|
||||
logging.info(f"Recent events: {result.data}")
|
||||
log_message = f"Recent events: {result.data}"
|
||||
logging.info(log_message[:100] + "..." if len(log_message) > 100 else log_message)
|
||||
time.sleep(random.uniform(0.5, 1.5)) # Randomized sleep for more natural load
|
||||
|
||||
# Start multiple writer and reader threads
|
||||
|
Loading…
x
Reference in New Issue
Block a user