import sqlite3 import uuid import hashlib import logging from datetime import datetime from typing import List, Dict, Optional from markdown_it import MarkdownIt class DatabaseManager: """Manages database connections and table creation.""" def __init__(self, db_file: str): """Initialize the DatabaseManager.""" self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10) self.cursor: sqlite3.Cursor = self.conn.cursor() self.create_tables() def create_tables(self) -> None: """Create necessary tables in the database if they don't exist.""" self.cursor.executescript(''' CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, file_path TEXT NOT NULL, md5sum TEXT, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0 ); CREATE TABLE IF NOT EXISTS headings ( id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT NOT NULL UNIQUE, level INTEGER NOT NULL, title TEXT NOT NULL, parent_uuid TEXT, document_id INTEGER NOT NULL, path TEXT NOT NULL, headings_order INTEGER, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0, FOREIGN KEY (parent_uuid) REFERENCES headings(uuid), FOREIGN KEY (document_id) REFERENCES documents(id) ); CREATE TABLE IF NOT EXISTS body ( id INTEGER PRIMARY KEY AUTOINCREMENT, uuid TEXT NOT NULL UNIQUE, content TEXT, heading_uuid TEXT NOT NULL, document_id INTEGER NOT NULL, md5sum TEXT, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0, FOREIGN KEY (heading_uuid) REFERENCES headings(uuid), FOREIGN KEY (document_id) REFERENCES documents(id) ); ''') def close(self) -> None: """Close the database connection.""" self.conn.close() class MarkdownProcessor: """Processes markdown files and stores content in the database.""" def __init__(self, db_manager: 'DatabaseManager') -> None: self.db_manager = db_manager def process_markdown(self, markdown_file: str, document_id: int) -> None: markdown_text = self.read_markdown_file(markdown_file) md = MarkdownIt() tokens = md.parse(markdown_text) print('### Calling update_document_content') self.update_document_content(tokens, document_id) def read_markdown_file(self, file_path: str) -> str: with open(file_path, 'r', encoding='utf-8') as file: return file.read() def update_document_content(self, tokens: List, document_id: int) -> None: existing_structure = {} existing_structure = self.get_existing_document_structure(document_id) new_structure = self.parse_new_structure(tokens, document_id, existing_structure) print('### Calling merge_structures...') self.merge_structures(existing_structure, new_structure, document_id) def get_existing_document_structure(self, document_id: int) -> Dict: structure = {} self.db_manager.cursor.execute(''' SELECT h.uuid, h.level, h.title, h.parent_uuid, h.path, b.content, b.uuid FROM headings h LEFT JOIN body b ON h.uuid = b.heading_uuid WHERE h.document_id = ? AND h.isDeleted = 0 ORDER BY h.level, h.id ''', (document_id,)) for heading_uuid, level, title, parent_uuid, path, content, body_uuid in self.db_manager.cursor.fetchall(): structure[heading_uuid] = { 'uuid': heading_uuid, 'level': level, 'title': title, 'parent_uuid': parent_uuid, 'path': path, 'content': content, 'body_uuid': body_uuid, 'children': [] } # Build the tree structure for uuid, node in structure.items(): if node['parent_uuid'] in structure: structure[node['parent_uuid']]['children'].append(uuid) return structure def parse_new_structure(self, tokens: List, document_id: int, existing_structure: Dict) -> Dict: structure = {} current_heading = None current_content = [] parent_stack = [{"uuid": None, "level": 0, "path": ""}] for token in tokens: if token.type == 'heading_open': if current_heading: structure[current_heading]['content'] = ''.join(current_content).strip() level = int(token.tag.strip('h')) while parent_stack[-1]['level'] >= level: parent_stack.pop() parent_path = parent_stack[-1]['path'] current_heading = str(uuid.uuid4()) # Always assign a new UUID here, may change later structure[current_heading] = { 'uuid': current_heading, 'level': level, 'title': '', 'parent_uuid': parent_stack[-1]['uuid'], 'path': f"{parent_path}/{current_heading}" if parent_path else current_heading, 'content': '', 'children': [] } parent_stack.append({"uuid": current_heading, "level": level, "path": structure[current_heading]['path']}) current_content = [] elif token.type == 'heading_close': structure[current_heading]['content'] = ''.join(current_content).strip() elif token.type == 'inline' and current_heading: if structure[current_heading]['title'] == '': # Populate the title structure[current_heading]['title'] = token.content # Now check for existing UUID based on title, level, and parent existing_uuid = next( (uuid for uuid, node in existing_structure.items() if node['title'] == structure[current_heading]['title'] and node['level'] == structure[current_heading]['level'] and node['parent_uuid'] == structure[current_heading]['parent_uuid']), None) if existing_uuid: # If found in existing structure, replace the new UUID structure[existing_uuid] = structure.pop(current_heading) current_heading = existing_uuid else: current_content.append(token.content) elif current_heading: current_content.append(token.content) if current_heading: structure[current_heading]['content'] = ''.join(current_content).strip() return structure def merge_structures(self, existing: Dict, new: Dict, document_id: int) -> None: logging.info(f"Starting merge_structures for document_id: {document_id}") def merge_recursive(existing_node, new_node, parent_uuid): logging.debug(f"Processing node: {new_node['title']}") if not existing_node: logging.debug(f"No existing node found for {new_node['title']}") # Check if a heading with the same title already exists at this level self.db_manager.cursor.execute(''' SELECT uuid FROM headings WHERE title = ? AND level = ? AND parent_uuid = ? AND document_id = ? AND isDeleted = 0 ''', (new_node['title'], new_node['level'], parent_uuid, document_id)) existing_uuid = self.db_manager.cursor.fetchone() if existing_uuid: heading_uuid = existing_uuid[0] logging.info(f"Updating existing heading: {new_node['title']} (UUID: {heading_uuid})") self.update_heading(heading_uuid, new_node['title'], new_node['level'], parent_uuid, new_node['path']) else: logging.info(f"Inserting new heading: {new_node['title']}") heading_uuid = self.insert_heading(new_node['level'], new_node['title'], parent_uuid, document_id, new_node['path']) if new_node['content']: logging.debug(f"Inserting body content for heading: {new_node['title']}") body_uuid = self.insert_body(new_node['content'], heading_uuid, document_id) for child in new_node['children']: merge_recursive(None, new[child], heading_uuid) else: logging.debug(f"Updating existing node: {existing_node['title']}") # Update existing node self.update_heading(existing_node['uuid'], new_node['title'], new_node['level'], parent_uuid, new_node['path']) if new_node['content']: if existing_node['body_uuid']: logging.debug(f"Updating body content for heading: {existing_node['title']}") self.update_body(existing_node['body_uuid'], new_node['content'], document_id) else: logging.debug(f"Inserting new body content for existing heading: {existing_node['title']}") self.insert_body(new_node['content'], existing_node['uuid'], document_id) # Process children existing_children = {child['title']: child for child in existing_node['children']} new_children = {child['title']: child for child in new_node['children']} for title, child in new_children.items(): if title in existing_children: merge_recursive(existing_children[title], child, existing_node['uuid']) else: merge_recursive(None, child, existing_node['uuid']) for title, child in existing_children.items(): if title not in new_children: logging.info(f"Soft deleting heading: {child['title']}") self.soft_delete_heading(child['uuid']) for new_root in new.values(): logging.info(f"Processing root node: {new_root['title']}") existing_root = next((node for node in existing.values() if node['path'] == new_root['path']), None) merge_recursive(existing_root, new_root, None) logging.info("Merge structures completed") def insert_heading(self, level: int, title: str, parent_uuid: Optional[str], document_id: int, path: str) -> str: heading_uuid = str(uuid.uuid4()) self.db_manager.cursor.execute(''' INSERT INTO headings (uuid, level, title, parent_uuid, document_id, path) VALUES (?, ?, ?, ?, ?, ?) ''', (heading_uuid, level, title, parent_uuid, document_id, path)) return heading_uuid def update_heading(self, heading_uuid: str, title: str, level: int, parent_uuid: Optional[str], path: str) -> None: self.db_manager.cursor.execute(''' UPDATE headings SET title = ?, level = ?, parent_uuid = ?, path = ?, updated_timestamp = CURRENT_TIMESTAMP WHERE uuid = ? ''', (title, level, parent_uuid, path, heading_uuid)) def insert_body(self, content: str, heading_uuid: str, document_id: int) -> str: body_uuid = str(uuid.uuid4()) md5sum = hashlib.md5(content.encode()).hexdigest() print(f"###### Trying to insert body text with md5sum of: {md5sum} to uuid: {body_uuid}, with content: \n{content}\n") # Verify input parameters if not all([content, heading_uuid, document_id]): raise ValueError("Missing required parameters for insert_body") try: # Check if heading_uuid exists self.db_manager.cursor.execute("SELECT 1 FROM headings WHERE uuid = ?", (heading_uuid,)) if not self.db_manager.cursor.fetchone(): raise ValueError(f"heading_uuid {heading_uuid} does not exist in headings table") # Check if document_id exists self.db_manager.cursor.execute("SELECT 1 FROM documents WHERE id = ?", (document_id,)) if not self.db_manager.cursor.fetchone(): raise ValueError(f"document_id {document_id} does not exist in documents table") # Insert the body self.db_manager.cursor.execute(''' INSERT INTO body (uuid, content, heading_uuid, document_id, md5sum) VALUES (?, ?, ?, ?, ?) ''', (body_uuid, content, heading_uuid, document_id, md5sum)) self.db_manager.conn.commit() print(f"###### Successfully inserted body with uuid: {body_uuid}") except sqlite3.Error as e: print(f"An error occurred while inserting body: {e}") self.db_manager.conn.rollback() raise except ValueError as e: print(f"Validation error: {e}") raise return body_uuid def update_body(self, body_uuid: str, content: str, document_id: int) -> None: md5sum = hashlib.md5(content.encode()).hexdigest() self.db_manager.cursor.execute(''' UPDATE body SET content = ?, md5sum = ?, updated_timestamp = CURRENT_TIMESTAMP WHERE uuid = ? AND document_id = ? ''', (content, md5sum, body_uuid, document_id)) def soft_delete_heading(self, heading_uuid: str) -> None: now = datetime.now().isoformat() self.db_manager.cursor.execute(''' UPDATE headings SET isDeleted = 1, deleted_timestamp = ? WHERE uuid = ? ''', (now, heading_uuid)) # Also soft delete associated body content self.db_manager.cursor.execute(''' UPDATE body SET isDeleted = 1, deleted_timestamp = ? WHERE heading_uuid = ? ''', (now, heading_uuid))