From ad9a80cfec7f0cf938c44b1fbf33e8b37a43bd85 Mon Sep 17 00:00:00 2001 From: kalzu rekku Date: Thu, 3 Oct 2024 20:51:28 +0300 Subject: [PATCH] Some what working demo. The query result are still little bit wonky.. --- markdown_sqlite.py | 525 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 markdown_sqlite.py diff --git a/markdown_sqlite.py b/markdown_sqlite.py new file mode 100644 index 0000000..57f2c8f --- /dev/null +++ b/markdown_sqlite.py @@ -0,0 +1,525 @@ +# markdown_sqlite.py +""" +This script processes a markdown file by reading its content and +updates an SQLite database with document content and metadata. +The user can then select specific topics based on command-line arguments. +The script tracks changes using MD5 hashes and ensures the database +reflects the current state of the markdown file. +""" + +import sqlite3 +import hashlib +import argparse +import os +from datetime import datetime +from typing import List, Tuple, Optional +from markdown_it import MarkdownIt +from thefuzz import fuzz, process + +class DatabaseManager: + """Manages database connections and table creation.""" + + def __init__(self, db_file: str): + """Initialize the DatabaseManager.""" + self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10) + self.cursor: sqlite3.Cursor = self.conn.cursor() + self.create_tables() + + def create_tables(self) -> None: + """Create necessary tables in the database if they don't exist.""" + self.cursor.executescript(''' + CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + file_path TEXT NOT NULL, + md5sum TEXT, + added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_timestamp DATETIME, + deleted_timestamp DATETIME, + isDeleted BOOLEAN DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS headings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + level INTEGER NOT NULL, + title TEXT NOT NULL, + parent_id INTEGER, + document_id INTEGER NOT NULL, + added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_timestamp DATETIME, + deleted_timestamp DATETIME, + isDeleted BOOLEAN DEFAULT 0, + FOREIGN KEY (parent_id) REFERENCES headings(id), + FOREIGN KEY (document_id) REFERENCES documents(id) + ); + + CREATE TABLE IF NOT EXISTS body ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT, + heading_id INTEGER NOT NULL, + document_id INTEGER NOT NULL, + md5sum TEXT, + added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_timestamp DATETIME, + deleted_timestamp DATETIME, + isDeleted BOOLEAN DEFAULT 0, + FOREIGN KEY (heading_id) REFERENCES headings(id), + FOREIGN KEY (document_id) REFERENCES documents(id) + ); + ''') + + def close(self) -> None: + """Close the database connection.""" + self.conn.close() + +class DocumentManager: + """Manages document-related operations in the database.""" + + def __init__(self, db_manager: 'DatabaseManager'): + self.db_manager: DatabaseManager = db_manager + + def document_exists(self, document_name: str) -> Optional[Tuple[int]]: + """ + Check if a document exists in the database. + + Args: + document_name: Name of the document to check. + + Returns: + Document ID if it exists, None otherwise. + """ + self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,)) + return self.db_manager.cursor.fetchone() + + def create_document(self, name: str, file_path: str) -> Optional[int]: + """Create a new document entry in the database.""" + now: str = datetime.now().isoformat() + self.db_manager.cursor.execute(''' + INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?) + ''', (name, file_path, now)) + self.db_manager.conn.commit() + return self.db_manager.cursor.lastrowid + + def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None: + """Update an existing document in the database.""" + now: str = datetime.now().isoformat() + if name: + self.db_manager.cursor.execute(''' + UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ? + ''', (name, now, document_id)) + if file_path: + self.db_manager.cursor.execute(''' + UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ? + ''', (file_path, now, document_id)) + self.db_manager.conn.commit() + + def soft_delete_document(self, document_id: int) -> None: + """Soft delete a document by marking it as deleted in the database.""" + now: str = datetime.now().isoformat() + self.db_manager.cursor.execute(''' + UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ? + ''', (now, document_id)) + self.db_manager.conn.commit() + + def read_documents(self) -> List[Tuple]: + """Retrieve all non-deleted documents from the database.""" + self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0') + return self.db_manager.cursor.fetchall() + + def update_document_hash(self, document_id: int, md5sum: str) -> None: + """Update the MD5 hash of a document in the database.""" + self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id)) + self.db_manager.conn.commit() + +class MarkdownProcessor: + """Processes markdown files and stores content in the database.""" + + def __init__(self, db_manager: 'DatabaseManager') -> None: + """Initialize the MarkdownProcessor.""" + self.db_manager = db_manager + + def process_markdown(self, markdown_file: str, document_id: int) -> None: + """Process a markdown file and store its content in the database.""" + markdown_text = self.read_markdown_file(markdown_file) + md = MarkdownIt() + tokens = md.parse(markdown_text) + + self.clear_document_content(document_id) + self.store_markdown_content(tokens, document_id) + + def read_markdown_file(self, file_path: str) -> str: + """Read content from a markdown file.""" + with open(file_path, 'r', encoding='utf-8') as file: + return file.read() + + def clear_document_content(self, document_id: int) -> None: + """Clear existing content for a document in the database.""" + self.db_manager.cursor.execute('DELETE FROM headings WHERE document_id = ?', (document_id,)) + self.db_manager.cursor.execute('DELETE FROM body WHERE document_id = ?', (document_id,)) + + def store_markdown_content(self, tokens: List, document_id: int) -> None: + """Store parsed markdown content in the database.""" + parent_stack: List[Optional[int]] = [] + + for token in tokens: + if token.type == 'heading_open': + level = int(token.tag.strip('h')) + content_token = tokens[tokens.index(token) + 1] + title = content_token.content + + parent_id = parent_stack[-1] if parent_stack else None + + heading_id = self.insert_heading(level, title, parent_id, document_id) + + if not parent_stack or level > len(parent_stack): + parent_stack.append(heading_id) + else: + while parent_stack and level <= len(parent_stack): + parent_stack.pop() + parent_stack.append(heading_id) + + elif token.type == 'inline' and parent_stack: + self.insert_body(token.content, parent_stack[-1], document_id) + + self.db_manager.conn.commit() + + def insert_heading(self, level: int, title: str, parent_id: Optional[int], document_id: int) -> int: + """Insert a heading into the database.""" + self.db_manager.cursor.execute(''' + INSERT INTO headings (level, title, parent_id, document_id) + VALUES (?, ?, ?, ?) + ''', (level, title, parent_id, document_id)) + return self.db_manager.cursor.lastrowid + + def insert_body(self, content: str, heading_id: int, document_id: int) -> None: + """Insert body content into the database with checksumming.""" + md5sum = hashlib.md5(content.encode()).hexdigest() + self.db_manager.cursor.execute(''' + INSERT INTO body (content, heading_id, document_id, md5sum) + VALUES (?, ?, ?, ?) + ''', (content, heading_id, document_id, md5sum)) + +class TopicReader: + """Reads and retrieves topics from the database.""" + + def __init__(self, db_manager: 'DatabaseManager'): + """ + Initialize the TopicReader. + + Args: + db_manager (DatabaseManager): An instance of DatabaseManager. + """ + self.db_manager = db_manager + + def fetch_headings(self) -> List[Tuple[int, str, int]]: + """ + Fetch all non-deleted headings from the database. + """ + self.db_manager.cursor.execute('SELECT id, title, level FROM headings WHERE isDeleted = 0 ORDER BY level, id') + return self.db_manager.cursor.fetchall() + + def fetch_topic_chain(self, heading_id: int) -> List[Tuple[int, str, int]]: + """ + Fetch the topic chain (hierarchy of parent topics) for a given heading. + + Returns: + List[Tuple[int, str, int]]: List of (id, title, level) tuples representing the topic chain. + """ + chain = [] + current_id = heading_id + + while current_id is not None: + self.db_manager.cursor.execute('SELECT id, title, level, parent_id FROM headings WHERE id = ?', (current_id,)) + result = self.db_manager.cursor.fetchone() + if result: + chain.append((result[0], result[1], result[2])) + current_id = result[3] + else: + break + + return list(reversed(chain)) + + def list_headings(self) -> str: + """ + List all available headings in a hierarchical structure. + + Returns: + str: A formatted string containing all headings. + """ + headings = self.fetch_headings() + result = "Available headings:\n" + + for _, title, level in headings: + indent = " " * (level - 1) + result += f"{indent}- {title}\n" + + return result.strip() + + def fetch_body_and_subtopics(self, heading_id: int, include_subtopics: bool = True) -> str: + """ + Fetch body content and subtopics for a given heading. + + Args: + heading_id (int): ID of the heading to fetch. + include_subtopics (bool): Whether to include subtopics in the result. + + Returns: + str: Formatted string containing the heading content and subtopics. + """ + # Fetch the current heading and body content + self.db_manager.cursor.execute('SELECT level, title FROM headings WHERE id = ?', (heading_id,)) + level, title = self.db_manager.cursor.fetchone() + + # Fetch the content for this heading (the days in the calendar) + self.db_manager.cursor.execute('SELECT content FROM body WHERE heading_id = ?', (heading_id,)) + rows = self.db_manager.cursor.fetchall() + body_content = '\n'.join([row[0] for row in rows]) + + # Write the heading once and then its body content + result = f"{'#' * level} {title}\n{body_content.strip()}\n" + + if include_subtopics: + # Fetch all subtopics (e.g., days) that are children of the current heading + subtopics = self._fetch_subtopics(heading_id, level) + for subtopic_id, subtopic_level, subtopic_title in subtopics: + # Recursively fetch subtopic content + subtopic_content = self.fetch_body_and_subtopics(subtopic_id, include_subtopics=True) + result += subtopic_content + + return result.strip() # Strip extra newlines + + def _fetch_subtopics(self, heading_id: int, parent_level: int) -> List[Tuple[int, int, str]]: + """ + Fetch all subtopics that are children of the given heading. + + Args: + heading_id (int): The parent heading ID. + parent_level (int): The level of the parent heading. + + Returns: + List of tuples containing the subtopic's ID, level, and title. + """ + self.db_manager.cursor.execute(''' + SELECT id, level, title + FROM headings + WHERE parent_id = ? AND isDeleted = 0 + ORDER BY level, id + ''', (heading_id,)) + return self.db_manager.cursor.fetchall() + + + def get_topic_content(self, input_title: str) -> Optional[str]: + """ + Get the content of a topic based on the input title, including its topic chain and subtopics. + + Returns: + str or None: Formatted string containing the topic chain, content, and subtopics, or None if not found. + """ + heading_id = self.find_closest_heading(input_title) + if heading_id: + topic_chain = self.fetch_topic_chain(heading_id) + result = "" + for id, title, level in topic_chain: + if id == heading_id: + # Fetch the full content for the selected topic and its subtopics + result += self.fetch_body_and_subtopics(id, include_subtopics=True) + else: + # Include only the heading chain without duplicating content + result += f"{'#' * level} {title}\n\n" + return result.strip() # Ensure there are no trailing newlines + print(f"No topic found matching '{input_title}'.") + return None + + + def find_closest_heading(self, input_title: str) -> Optional[int]: + """ + Find the closest matching heading to the input title using fuzzy matching. + + Returns: + int or None: ID of the closest matching heading, or None if no match found. + """ + headings = self.fetch_headings() + if not headings: + print("No topics found in the database.") + return None + + heading_titles = [title for _, title, _ in headings] + closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio) + + if confidence < 50: + print(f"No close matches found for '{input_title}' (Confidence: {confidence})") + return None + + for heading_id, title, level in headings: + if title == closest_match: + return heading_id + + return None + + + +def compute_file_hash(file_path: str) -> str: + """ + Compute the MD5 hash of a file. + """ + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + +def generate_calendar(year: int) -> str: + """Generate a markdown calendar for the specified year.""" + calendar_markdown = f"# {year}\n\n" + current_date = datetime.now().date() + + for month in range(1, 13): + month_name = datetime(year, month, 1).strftime('%B') + calendar_markdown += f"## {month:02d} / {month_name}\n\n" + + # Calculate the number of days in the month + num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days if month < 12 else (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days + + # Generate calendar entries for each day + for day in range(1, num_days + 1): + day_date = datetime(year, month, day).date() + day_name = day_date.strftime('%a') + + # Check if this is the current day and make it bold if so + if str(day_date) == str(current_date): + calendar_markdown += f"**{day:02d} / {day_name}**\n" + else: + calendar_markdown += f"{day:02d} / {day_name}\n" + + calendar_markdown += "\n" # Add a newline after each month + + return calendar_markdown + +def bootstrap_calendar(year: int, db_manager: DatabaseManager, markdown_file: str): + """Generate and store a full year's markdown calendar in the database.""" + calendar_content = generate_calendar(year) + + # Write the calendar to the specified markdown file + with open(markdown_file, 'w', encoding='utf-8') as f: + f.write(calendar_content) + + # Now use the DocumentManager and MarkdownProcessor to read this file into the database + document_manager = DocumentManager(db_manager) + document_id = document_manager.create_document(os.path.basename(markdown_file), markdown_file) + markdown_processor = MarkdownProcessor(db_manager) + markdown_processor.process_markdown(markdown_file, document_id) + print(f"Calendar for year {year} has been generated and stored in the database.") + +def main(): + """ + This script processes a markdown file, updates an SQLite database, + and optionally selects a topic based on user input. + + Initializes managers for database and markdown handling. Updates documents based on + MD5 hash changes, and if a topic is provided, retrieves and writes its content + to the markdown file. + + Args: + -m, --markdown: Path to markdown file (default: 'calendar.md'). + -d, --database: Path to SQLite database file (default: 'markdown.db'). + topic_title: Optional topic for content selection (fuzzy matching enabled). + --bootstrap: If provided, generates markdown calendar for the current year and loads it to the database. + --ls: If provided, lists all available headings. + """ + # Set up command-line argument parsing + parser = argparse.ArgumentParser(description='Process markdown file and optionally select a topic.') + parser.add_argument('-m', '--markdown', type=str, default='calendar.md', help='Input/output markdown file (default: calendar.md)') + parser.add_argument('-d', '--database', type=str, default='markdown.db', help='SQLite database file (default: markdown.db)') + parser.add_argument('topic_title', nargs='?', type=str, help='Topic title to select (fuzzy matching enabled)') + parser.add_argument('--bootstrap', action='store_true', help='Generate markdown calendar for the current year and load it to the database.') + parser.add_argument('--ls', action='store_true', help='List all available headings.') + args = parser.parse_args() + + # Use the provided or default file paths + markdown_file = args.markdown + database_file = args.database + + # Initialize manager objects for database operations + db_manager = DatabaseManager(database_file) + + if args.bootstrap: + bootstrap_calendar(datetime.now().year, db_manager, markdown_file) + db_manager.close() + return + + if args.ls: + topic_reader = TopicReader(db_manager) + print(topic_reader.list_headings()) + db_manager.close() + return + + # Check if the markdown file exists + if not os.path.exists(markdown_file): + print(f"Error: Markdown file '{markdown_file}' not found. Use --bootstrap to create a new calendar.") + db_manager.close() + return + + document_manager = DocumentManager(db_manager) + markdown_processor = MarkdownProcessor(db_manager) + + # Get the base name of the markdown file (without path) + document_name = os.path.basename(markdown_file) + + # Check if the document already exists in the database + document = db_manager.cursor.execute('SELECT id, file_path, md5sum, updated_timestamp FROM documents WHERE name = ?', (document_name,)).fetchone() + + # Compute the current MD5 hash of the markdown file + current_file_hash = compute_file_hash(markdown_file) + + if document: + # If the document exists in the database + document_id, stored_file_path, stored_md5sum, last_updated = document + + if stored_file_path != markdown_file: + print(f"Updating file path for '{document_name}' in the database...") + document_manager.update_document(document_id, file_path=markdown_file) + + if stored_md5sum != current_file_hash: + # If the file has changed since last update + print(f"File '{document_name}' has changed. Updating the database...") + document_manager.update_document_hash(document_id, current_file_hash) + markdown_processor.process_markdown(markdown_file, document_id) + else: + # If the file hasn't changed + print(f"File '{document_name}' has not changed. Skipping update.") + else: + # If the document doesn't exist in the database + print(f"Document '{document_name}' not found in the database. Adding new entry...") + document_id = document_manager.create_document(document_name, markdown_file) + document_manager.update_document_hash(document_id, current_file_hash) + markdown_processor.process_markdown(markdown_file, document_id) + + # Check if a topic title argument is provided + if args.topic_title: + # Initialize TopicReader + topic_reader = TopicReader(db_manager) + + # Retrieve the content for the specified topic + result = topic_reader.get_topic_content(args.topic_title) + + if result: + # If content is found, write it back to the original markdown file + with open(markdown_file, 'w', encoding='utf-8') as file: + file.write(result) + print(f"Selected topic and subtopics written to {markdown_file}") + + # Update the document hash in the database + new_file_hash = compute_file_hash(markdown_file) + document_manager.update_document_hash(document_id, new_file_hash) + else: + # If no content is found + print("No result to write. The original file remains unchanged.") + else: + print("No topic title provided. The database has been updated/added without modifying the file.") + + # Close the database connection + db_manager.close() + +if __name__ == '__main__': + main()