# markdown_sqlite.py """ This script processes a markdown file by reading its content and updates an SQLite database with document content and metadata. The user can then select specific topics based on command-line arguments. The script tracks changes using MD5 hashes and ensures the database reflects the current state of the markdown file. """ import os import sqlite3 import hashlib import argparse import logging from datetime import datetime from typing import List, Tuple, Dict, Set, Optional from markdown_it import MarkdownIt from thefuzz import fuzz, process class DatabaseManager: """Manages database connections and table creation.""" def __init__(self, db_file: str): """Initialize the DatabaseManager.""" self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10) self.cursor: sqlite3.Cursor = self.conn.cursor() self.create_tables() def create_tables(self) -> None: """Create necessary tables in the database if they don't exist.""" self.cursor.executescript(''' CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, file_path TEXT NOT NULL, md5sum TEXT, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0 ); CREATE TABLE IF NOT EXISTS headings ( id INTEGER PRIMARY KEY AUTOINCREMENT, level INTEGER NOT NULL, title TEXT NOT NULL, parent_id INTEGER, document_id INTEGER NOT NULL, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0, FOREIGN KEY (parent_id) REFERENCES headings(id), FOREIGN KEY (document_id) REFERENCES documents(id) ); CREATE TABLE IF NOT EXISTS body ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT, heading_id INTEGER NOT NULL, document_id INTEGER NOT NULL, md5sum TEXT, added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, updated_timestamp DATETIME, deleted_timestamp DATETIME, isDeleted BOOLEAN DEFAULT 0, FOREIGN KEY (heading_id) REFERENCES headings(id), FOREIGN KEY (document_id) REFERENCES documents(id) ); ''') def close(self) -> None: """Close the database connection.""" self.conn.close() class DocumentManager: """Manages document-related operations in the database.""" def __init__(self, db_manager: 'DatabaseManager'): self.db_manager: DatabaseManager = db_manager def document_exists(self, document_name: str) -> Optional[Tuple[int]]: """ Check if a document exists in the database. Args: document_name: Name of the document to check. Returns: Document ID if it exists, None otherwise. """ self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,)) return self.db_manager.cursor.fetchone() def create_document(self, name: str, file_path: str) -> Optional[int]: """Create a new document entry in the database.""" logging.debug(f"** Creating new document entry to database") now: str = datetime.now().isoformat() self.db_manager.cursor.execute(''' INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?) ''', (name, file_path, now)) self.db_manager.conn.commit() return self.db_manager.cursor.lastrowid def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None: """Update an existing document in the database.""" logging.debug(f"** Updating document, document_id: {document_id}") now: str = datetime.now().isoformat() if name: self.db_manager.cursor.execute(''' UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ? ''', (name, now, document_id)) if file_path: self.db_manager.cursor.execute(''' UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ? ''', (file_path, now, document_id)) self.db_manager.conn.commit() def soft_delete_document(self, document_id: int) -> None: """Soft delete a document by marking it as deleted in the database.""" logging.debug(f"** This now soft deleted, document_id: {document_id}") now: str = datetime.now().isoformat() self.db_manager.cursor.execute(''' UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ? ''', (now, document_id)) self.db_manager.conn.commit() def read_documents(self) -> List[Tuple]: """Retrieve all non-deleted documents from the database.""" self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0') return self.db_manager.cursor.fetchall() def update_document_hash(self, document_id: int, md5sum: str) -> None: """Update the MD5 hash of a document in the database.""" self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id)) self.db_manager.conn.commit() class MarkdownProcessor: """Processes markdown files and stores content in the database.""" def __init__(self, db_manager: 'DatabaseManager') -> None: self.db_manager = db_manager def process_markdown(self, markdown_file: str, document_id: int) -> None: markdown_text = self.read_markdown_file(markdown_file) md = MarkdownIt() tokens = md.parse(markdown_text) self.update_document_content(tokens, document_id) def read_markdown_file(self, file_path: str) -> str: with open(file_path, 'r', encoding='utf-8') as file: return file.read() def update_document_content(self, tokens: List, document_id: int) -> None: existing_structure = self.get_existing_document_structure(document_id) new_structure = self.parse_new_structure(tokens) self.merge_structures(existing_structure, new_structure, document_id) def get_existing_document_structure(self, document_id: int) -> Dict: structure = {} self.db_manager.cursor.execute(''' SELECT h.id, h.level, h.title, h.parent_id, b.content FROM headings h LEFT JOIN body b ON h.id = b.heading_id WHERE h.document_id = ? AND h.isDeleted = 0 ORDER BY h.level, h.id ''', (document_id,)) for heading_id, level, title, parent_id, content in self.db_manager.cursor.fetchall(): structure[heading_id] = { 'level': level, 'title': title, 'parent_id': parent_id, 'content': content, 'children': [] } # Build the tree structure root = {} for id, node in structure.items(): if node['parent_id'] in structure: structure[node['parent_id']]['children'].append(id) else: root[id] = node return root def parse_new_structure(self, tokens: List) -> Dict: structure = {} current_heading = None current_content = [] parent_stack = [{"id": None, "level": 0}] for token in tokens: if token.type == 'heading_open': if current_heading: structure[current_heading]['content'] = ''.join(current_content).strip() level = int(token.tag.strip('h')) while parent_stack[-1]['level'] >= level: parent_stack.pop() current_heading = str(uuid.uuid4()) # Generate a temporary ID structure[current_heading] = { 'level': level, 'title': '', 'parent_id': parent_stack[-1]['id'], 'content': '', 'children': [] } parent_stack.append({"id": current_heading, "level": level}) current_content = [] elif token.type == 'heading_close': structure[current_heading]['content'] = ''.join(current_content).strip() elif token.type == 'inline' and current_heading: if structure[current_heading]['title'] == '': structure[current_heading]['title'] = token.content else: current_content.append(token.content) elif current_heading: current_content.append(token.content) if current_heading: structure[current_heading]['content'] = ''.join(current_content).strip() return structure def merge_structures(self, existing: Dict, new: Dict, document_id: int) -> None: def merge_recursive(existing_node, new_node, parent_id): if not existing_node: # This is a new node, insert it heading_id = self.insert_heading(new_node['level'], new_node['title'], parent_id, document_id) self.insert_body(new_node['content'], heading_id, document_id) for child in new_node['children']: merge_recursive(None, new[child], heading_id) else: # Update existing node self.update_heading(existing_node['id'], new_node['title'], new_node['level'], parent_id) self.update_body(existing_node['id'], new_node['content'], document_id) # Process children existing_children = {child['title']: child for child in existing_node['children']} new_children = {child['title']: child for child in new_node['children']} for title, child in new_children.items(): if title in existing_children: merge_recursive(existing_children[title], child, existing_node['id']) else: merge_recursive(None, child, existing_node['id']) for title, child in existing_children.items(): if title not in new_children: self.soft_delete_heading(child['id']) for new_root in new.values(): existing_root = next((node for node in existing.values() if node['title'] == new_root['title']), None) merge_recursive(existing_root, new_root, None) def insert_heading(self, level: int, title: str, parent_id: Optional[int], document_id: int) -> int: self.db_manager.cursor.execute(''' INSERT INTO headings (level, title, parent_id, document_id) VALUES (?, ?, ?, ?) ''', (level, title, parent_id, document_id)) return self.db_manager.cursor.lastrowid def update_heading(self, heading_id: int, title: str, level: int, parent_id: Optional[int]) -> None: self.db_manager.cursor.execute(''' UPDATE headings SET title = ?, level = ?, parent_id = ?, updated_timestamp = CURRENT_TIMESTAMP WHERE id = ? ''', (title, level, parent_id, heading_id)) def insert_body(self, content: str, heading_id: int, document_id: int) -> None: md5sum = hashlib.md5(content.encode()).hexdigest() self.db_manager.cursor.execute(''' INSERT INTO body (content, heading_id, document_id, md5sum) VALUES (?, ?, ?, ?) ''', (content, heading_id, document_id, md5sum)) def update_body(self, heading_id: int, content: str, document_id: int) -> None: md5sum = hashlib.md5(content.encode()).hexdigest() self.db_manager.cursor.execute(''' UPDATE body SET content = ?, md5sum = ?, updated_timestamp = CURRENT_TIMESTAMP WHERE heading_id = ? AND document_id = ? ''', (content, md5sum, heading_id, document_id)) def soft_delete_heading(self, heading_id: int) -> None: now = datetime.now().isoformat() self.db_manager.cursor.execute(''' UPDATE headings SET isDeleted = 1, deleted_timestamp = ? WHERE id = ? ''', (now, heading_id)) # Also soft delete associated body content self.db_manager.cursor.execute(''' UPDATE body SET isDeleted = 1, deleted_timestamp = ? WHERE heading_id = ? ''', (now, heading_id)) class TopicReader: """Reads and retrieves topics from the database.""" def __init__(self, db_manager: 'DatabaseManager'): self.db_manager = db_manager def fetch_headings(self) -> List[Tuple[int, str, int, Optional[int]]]: self.db_manager.cursor.execute(''' SELECT id, title, level, parent_id FROM headings WHERE isDeleted = 0 ORDER BY level, id ''') return self.db_manager.cursor.fetchall() def fetch_topic_chain(self, heading_id: int) -> List[Tuple[int, str, int]]: chain = [] current_id = heading_id while current_id is not None: self.db_manager.cursor.execute(''' SELECT id, title, level, parent_id FROM headings WHERE id = ? ''', (current_id,)) result = self.db_manager.cursor.fetchone() if result: chain.append((result[0], result[1], result[2])) current_id = result[3] else: break return list(reversed(chain)) def list_headings(self) -> str: headings = self.fetch_headings() result = "Available headings:\n" def build_tree(parent_id, level): tree = "" for id, title, hlevel, parent in headings: if parent == parent_id: indent = " " * (hlevel - 1) tree += f"{indent}- {title}\n" tree += build_tree(id, hlevel + 1) return tree result += build_tree(None, 1) return result.strip() def get_topic_content(self, input_title: str) -> Optional[str]: heading_id = self.find_closest_heading(input_title) if heading_id: topic_chain = self.fetch_topic_chain(heading_id) result = self.build_full_content(topic_chain[-1][0]) return result return None def build_full_content(self, heading_id: int, level_offset: int = 0) -> str: self.db_manager.cursor.execute(''' SELECT h.level, h.title, b.content FROM headings h LEFT JOIN body b ON h.id = b.heading_id WHERE h.id = ? AND h.isDeleted = 0 ''', (heading_id,)) heading = self.db_manager.cursor.fetchone() if not heading: return "" level, title, content = heading adjusted_level = max(1, level - level_offset) result = f"{'#' * adjusted_level} {title}\n\n" if content: result += f"{content.strip()}\n\n" # Fetch and process all child headings self.db_manager.cursor.execute(''' SELECT id FROM headings WHERE parent_id = ? AND isDeleted = 0 ORDER BY level, id ''', (heading_id,)) children = self.db_manager.cursor.fetchall() for child in children: result += self.build_full_content(child[0], level_offset) return result def find_closest_heading(self, input_title: str) -> Optional[int]: headings = self.fetch_headings() if not headings: print("No topics found in the database.") return None heading_titles = [title for _, title, _, _ in headings] closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio) if confidence < 50: print(f"No close matches found for '{input_title}' (Confidence: {confidence})") return None for heading_id, title, _, _ in headings: if title == closest_match: return heading_id return None def compute_file_hash(file_path: str) -> str: """ Compute the MD5 hash of a file. """ hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def generate_calendar(year: int) -> str: """Generate a markdown calendar for the specified year.""" calendar_markdown = f"# {year}\n\n" current_date = datetime.now().date() for month in range(1, 13): month_name = datetime(year, month, 1).strftime('%B') calendar_markdown += f"## {month:02d} / {month_name}\n\n" # Calculate the number of days in the month num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days if month < 12 else (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days # Generate calendar entries for each day for day in range(1, num_days + 1): day_date = datetime(year, month, day).date() day_name = day_date.strftime('%a') # Check if this is the current day and make it bold if so if str(day_date) == str(current_date): calendar_markdown += f"**{day:02d} / {day_name}**\n" else: calendar_markdown += f"{day:02d} / {day_name}\n" calendar_markdown += "\n" # Add a newline after each month return calendar_markdown def convert_to_html(markdown_content: str) -> str: """ Convert Markdown content to HTML. """ md = MarkdownIt() html_content = md.render(markdown_content) # Wrap the content in a basic HTML structure html_document = f"""