Some what working demo. The query result are still little bit wonky..
This commit is contained in:
		
							
								
								
									
										525
									
								
								markdown_sqlite.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										525
									
								
								markdown_sqlite.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,525 @@
 | 
				
			|||||||
 | 
					# markdown_sqlite.py
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					This script processes a markdown file by reading its content and
 | 
				
			||||||
 | 
					updates an SQLite database with document content and metadata.
 | 
				
			||||||
 | 
					The user can then select specific topics based on command-line arguments.
 | 
				
			||||||
 | 
					The script tracks changes using MD5 hashes and ensures the database 
 | 
				
			||||||
 | 
					reflects the current state of the markdown file.
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import sqlite3
 | 
				
			||||||
 | 
					import hashlib
 | 
				
			||||||
 | 
					import argparse
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					from datetime import datetime
 | 
				
			||||||
 | 
					from typing import List, Tuple, Optional
 | 
				
			||||||
 | 
					from markdown_it import MarkdownIt
 | 
				
			||||||
 | 
					from thefuzz import fuzz, process
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class DatabaseManager:
 | 
				
			||||||
 | 
					    """Manages database connections and table creation."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, db_file: str):
 | 
				
			||||||
 | 
					        """Initialize the DatabaseManager."""
 | 
				
			||||||
 | 
					        self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10)
 | 
				
			||||||
 | 
					        self.cursor: sqlite3.Cursor = self.conn.cursor()
 | 
				
			||||||
 | 
					        self.create_tables()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def create_tables(self) -> None:
 | 
				
			||||||
 | 
					        """Create necessary tables in the database if they don't exist."""
 | 
				
			||||||
 | 
					        self.cursor.executescript('''
 | 
				
			||||||
 | 
					            CREATE TABLE IF NOT EXISTS documents (
 | 
				
			||||||
 | 
					                id INTEGER PRIMARY KEY AUTOINCREMENT,
 | 
				
			||||||
 | 
					                name TEXT NOT NULL UNIQUE,
 | 
				
			||||||
 | 
					                file_path TEXT NOT NULL,
 | 
				
			||||||
 | 
					                md5sum TEXT,
 | 
				
			||||||
 | 
					                added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
 | 
				
			||||||
 | 
					                updated_timestamp DATETIME,
 | 
				
			||||||
 | 
					                deleted_timestamp DATETIME,
 | 
				
			||||||
 | 
					                isDeleted BOOLEAN DEFAULT 0
 | 
				
			||||||
 | 
					            );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            CREATE TABLE IF NOT EXISTS headings (
 | 
				
			||||||
 | 
					                id INTEGER PRIMARY KEY AUTOINCREMENT,
 | 
				
			||||||
 | 
					                level INTEGER NOT NULL,
 | 
				
			||||||
 | 
					                title TEXT NOT NULL,
 | 
				
			||||||
 | 
					                parent_id INTEGER,
 | 
				
			||||||
 | 
					                document_id INTEGER NOT NULL,
 | 
				
			||||||
 | 
					                added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
 | 
				
			||||||
 | 
					                updated_timestamp DATETIME,
 | 
				
			||||||
 | 
					                deleted_timestamp DATETIME,
 | 
				
			||||||
 | 
					                isDeleted BOOLEAN DEFAULT 0,
 | 
				
			||||||
 | 
					                FOREIGN KEY (parent_id) REFERENCES headings(id),
 | 
				
			||||||
 | 
					                FOREIGN KEY (document_id) REFERENCES documents(id)
 | 
				
			||||||
 | 
					            );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            CREATE TABLE IF NOT EXISTS body (
 | 
				
			||||||
 | 
					                id INTEGER PRIMARY KEY AUTOINCREMENT,
 | 
				
			||||||
 | 
					                content TEXT,
 | 
				
			||||||
 | 
					                heading_id INTEGER NOT NULL,
 | 
				
			||||||
 | 
					                document_id INTEGER NOT NULL,
 | 
				
			||||||
 | 
					                md5sum TEXT,
 | 
				
			||||||
 | 
					                added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
 | 
				
			||||||
 | 
					                updated_timestamp DATETIME,
 | 
				
			||||||
 | 
					                deleted_timestamp DATETIME,
 | 
				
			||||||
 | 
					                isDeleted BOOLEAN DEFAULT 0,
 | 
				
			||||||
 | 
					                FOREIGN KEY (heading_id) REFERENCES headings(id),
 | 
				
			||||||
 | 
					                FOREIGN KEY (document_id) REFERENCES documents(id)
 | 
				
			||||||
 | 
					            );
 | 
				
			||||||
 | 
					        ''')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def close(self) -> None:
 | 
				
			||||||
 | 
					        """Close the database connection."""
 | 
				
			||||||
 | 
					        self.conn.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class DocumentManager:
 | 
				
			||||||
 | 
					    """Manages document-related operations in the database."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, db_manager: 'DatabaseManager'):
 | 
				
			||||||
 | 
					        self.db_manager: DatabaseManager = db_manager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def document_exists(self, document_name: str) -> Optional[Tuple[int]]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Check if a document exists in the database.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            document_name: Name of the document to check.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            Document ID if it exists, None otherwise.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,))
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.fetchone()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def create_document(self, name: str, file_path: str) -> Optional[int]:
 | 
				
			||||||
 | 
					        """Create a new document entry in the database."""
 | 
				
			||||||
 | 
					        now: str = datetime.now().isoformat()
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					            INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?)
 | 
				
			||||||
 | 
					        ''', (name, file_path, now))
 | 
				
			||||||
 | 
					        self.db_manager.conn.commit()
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.lastrowid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None:
 | 
				
			||||||
 | 
					        """Update an existing document in the database."""
 | 
				
			||||||
 | 
					        now: str = datetime.now().isoformat()
 | 
				
			||||||
 | 
					        if name:
 | 
				
			||||||
 | 
					            self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					                UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ?
 | 
				
			||||||
 | 
					            ''', (name, now, document_id))
 | 
				
			||||||
 | 
					        if file_path:
 | 
				
			||||||
 | 
					            self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					                UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ?
 | 
				
			||||||
 | 
					            ''', (file_path, now, document_id))
 | 
				
			||||||
 | 
					        self.db_manager.conn.commit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def soft_delete_document(self, document_id: int) -> None:
 | 
				
			||||||
 | 
					        """Soft delete a document by marking it as deleted in the database."""
 | 
				
			||||||
 | 
					        now: str = datetime.now().isoformat()
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					            UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ?
 | 
				
			||||||
 | 
					        ''', (now, document_id))
 | 
				
			||||||
 | 
					        self.db_manager.conn.commit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def read_documents(self) -> List[Tuple]:
 | 
				
			||||||
 | 
					        """Retrieve all non-deleted documents from the database."""
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0')
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.fetchall()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def update_document_hash(self, document_id: int, md5sum: str) -> None:
 | 
				
			||||||
 | 
					        """Update the MD5 hash of a document in the database."""
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id))
 | 
				
			||||||
 | 
					        self.db_manager.conn.commit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class MarkdownProcessor:
 | 
				
			||||||
 | 
					    """Processes markdown files and stores content in the database."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, db_manager: 'DatabaseManager') -> None:
 | 
				
			||||||
 | 
					        """Initialize the MarkdownProcessor."""
 | 
				
			||||||
 | 
					        self.db_manager = db_manager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def process_markdown(self, markdown_file: str, document_id: int) -> None:
 | 
				
			||||||
 | 
					        """Process a markdown file and store its content in the database."""
 | 
				
			||||||
 | 
					        markdown_text = self.read_markdown_file(markdown_file)
 | 
				
			||||||
 | 
					        md = MarkdownIt()
 | 
				
			||||||
 | 
					        tokens = md.parse(markdown_text)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.clear_document_content(document_id)
 | 
				
			||||||
 | 
					        self.store_markdown_content(tokens, document_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def read_markdown_file(self, file_path: str) -> str:
 | 
				
			||||||
 | 
					        """Read content from a markdown file."""
 | 
				
			||||||
 | 
					        with open(file_path, 'r', encoding='utf-8') as file:
 | 
				
			||||||
 | 
					            return file.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def clear_document_content(self, document_id: int) -> None:
 | 
				
			||||||
 | 
					        """Clear existing content for a document in the database."""
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('DELETE FROM headings WHERE document_id = ?', (document_id,))
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('DELETE FROM body WHERE document_id = ?', (document_id,))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def store_markdown_content(self, tokens: List, document_id: int) -> None:
 | 
				
			||||||
 | 
					        """Store parsed markdown content in the database."""
 | 
				
			||||||
 | 
					        parent_stack: List[Optional[int]] = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for token in tokens:
 | 
				
			||||||
 | 
					            if token.type == 'heading_open':
 | 
				
			||||||
 | 
					                level = int(token.tag.strip('h'))
 | 
				
			||||||
 | 
					                content_token = tokens[tokens.index(token) + 1]
 | 
				
			||||||
 | 
					                title = content_token.content
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                parent_id = parent_stack[-1] if parent_stack else None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                heading_id = self.insert_heading(level, title, parent_id, document_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if not parent_stack or level > len(parent_stack):
 | 
				
			||||||
 | 
					                    parent_stack.append(heading_id)
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    while parent_stack and level <= len(parent_stack):
 | 
				
			||||||
 | 
					                        parent_stack.pop()
 | 
				
			||||||
 | 
					                    parent_stack.append(heading_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            elif token.type == 'inline' and parent_stack:
 | 
				
			||||||
 | 
					                self.insert_body(token.content, parent_stack[-1], document_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.db_manager.conn.commit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def insert_heading(self, level: int, title: str, parent_id: Optional[int], document_id: int) -> int:
 | 
				
			||||||
 | 
					        """Insert a heading into the database."""
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					            INSERT INTO headings (level, title, parent_id, document_id)
 | 
				
			||||||
 | 
					            VALUES (?, ?, ?, ?)
 | 
				
			||||||
 | 
					        ''', (level, title, parent_id, document_id))
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.lastrowid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def insert_body(self, content: str, heading_id: int, document_id: int) -> None:
 | 
				
			||||||
 | 
					        """Insert body content into the database with checksumming."""
 | 
				
			||||||
 | 
					        md5sum = hashlib.md5(content.encode()).hexdigest()
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					            INSERT INTO body (content, heading_id, document_id, md5sum)
 | 
				
			||||||
 | 
					            VALUES (?, ?, ?, ?)
 | 
				
			||||||
 | 
					        ''', (content, heading_id, document_id, md5sum))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TopicReader:
 | 
				
			||||||
 | 
					    """Reads and retrieves topics from the database."""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def __init__(self, db_manager: 'DatabaseManager'):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Initialize the TopicReader.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            db_manager (DatabaseManager): An instance of DatabaseManager.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        self.db_manager = db_manager
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def fetch_headings(self) -> List[Tuple[int, str, int]]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Fetch all non-deleted headings from the database.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('SELECT id, title, level FROM headings WHERE isDeleted = 0 ORDER BY level, id')
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.fetchall()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def fetch_topic_chain(self, heading_id: int) -> List[Tuple[int, str, int]]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Fetch the topic chain (hierarchy of parent topics) for a given heading.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            List[Tuple[int, str, int]]: List of (id, title, level) tuples representing the topic chain.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        chain = []
 | 
				
			||||||
 | 
					        current_id = heading_id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        while current_id is not None:
 | 
				
			||||||
 | 
					            self.db_manager.cursor.execute('SELECT id, title, level, parent_id FROM headings WHERE id = ?', (current_id,))
 | 
				
			||||||
 | 
					            result = self.db_manager.cursor.fetchone()
 | 
				
			||||||
 | 
					            if result:
 | 
				
			||||||
 | 
					                chain.append((result[0], result[1], result[2]))
 | 
				
			||||||
 | 
					                current_id = result[3]
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return list(reversed(chain))
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					    def list_headings(self) -> str:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        List all available headings in a hierarchical structure.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            str: A formatted string containing all headings.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        headings = self.fetch_headings()
 | 
				
			||||||
 | 
					        result = "Available headings:\n"
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        for _, title, level in headings:
 | 
				
			||||||
 | 
					            indent = "  " * (level - 1)
 | 
				
			||||||
 | 
					            result += f"{indent}- {title}\n"
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        return result.strip()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def fetch_body_and_subtopics(self, heading_id: int, include_subtopics: bool = True) -> str:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Fetch body content and subtopics for a given heading.
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            heading_id (int): ID of the heading to fetch.
 | 
				
			||||||
 | 
					            include_subtopics (bool): Whether to include subtopics in the result.
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            str: Formatted string containing the heading content and subtopics.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        # Fetch the current heading and body content
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('SELECT level, title FROM headings WHERE id = ?', (heading_id,))
 | 
				
			||||||
 | 
					        level, title = self.db_manager.cursor.fetchone()
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        # Fetch the content for this heading (the days in the calendar)
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('SELECT content FROM body WHERE heading_id = ?', (heading_id,))
 | 
				
			||||||
 | 
					        rows = self.db_manager.cursor.fetchall()
 | 
				
			||||||
 | 
					        body_content = '\n'.join([row[0] for row in rows])
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        # Write the heading once and then its body content
 | 
				
			||||||
 | 
					        result = f"{'#' * level} {title}\n{body_content.strip()}\n"
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        if include_subtopics:
 | 
				
			||||||
 | 
					            # Fetch all subtopics (e.g., days) that are children of the current heading
 | 
				
			||||||
 | 
					            subtopics = self._fetch_subtopics(heading_id, level)
 | 
				
			||||||
 | 
					            for subtopic_id, subtopic_level, subtopic_title in subtopics:
 | 
				
			||||||
 | 
					                # Recursively fetch subtopic content
 | 
				
			||||||
 | 
					                subtopic_content = self.fetch_body_and_subtopics(subtopic_id, include_subtopics=True)
 | 
				
			||||||
 | 
					                result += subtopic_content
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        return result.strip()  # Strip extra newlines
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def _fetch_subtopics(self, heading_id: int, parent_level: int) -> List[Tuple[int, int, str]]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Fetch all subtopics that are children of the given heading.
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        Args:
 | 
				
			||||||
 | 
					            heading_id (int): The parent heading ID.
 | 
				
			||||||
 | 
					            parent_level (int): The level of the parent heading.
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            List of tuples containing the subtopic's ID, level, and title.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        self.db_manager.cursor.execute('''
 | 
				
			||||||
 | 
					            SELECT id, level, title 
 | 
				
			||||||
 | 
					            FROM headings 
 | 
				
			||||||
 | 
					            WHERE parent_id = ? AND isDeleted = 0
 | 
				
			||||||
 | 
					            ORDER BY level, id
 | 
				
			||||||
 | 
					        ''', (heading_id,))
 | 
				
			||||||
 | 
					        return self.db_manager.cursor.fetchall()
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    def get_topic_content(self, input_title: str) -> Optional[str]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Get the content of a topic based on the input title, including its topic chain and subtopics.
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            str or None: Formatted string containing the topic chain, content, and subtopics, or None if not found.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        heading_id = self.find_closest_heading(input_title)
 | 
				
			||||||
 | 
					        if heading_id:
 | 
				
			||||||
 | 
					            topic_chain = self.fetch_topic_chain(heading_id)
 | 
				
			||||||
 | 
					            result = ""
 | 
				
			||||||
 | 
					            for id, title, level in topic_chain:
 | 
				
			||||||
 | 
					                if id == heading_id:
 | 
				
			||||||
 | 
					                    # Fetch the full content for the selected topic and its subtopics
 | 
				
			||||||
 | 
					                    result += self.fetch_body_and_subtopics(id, include_subtopics=True)
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # Include only the heading chain without duplicating content
 | 
				
			||||||
 | 
					                    result += f"{'#' * level} {title}\n\n"
 | 
				
			||||||
 | 
					            return result.strip()  # Ensure there are no trailing newlines
 | 
				
			||||||
 | 
					        print(f"No topic found matching '{input_title}'.")
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def find_closest_heading(self, input_title: str) -> Optional[int]:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        Find the closest matching heading to the input title using fuzzy matching.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Returns:
 | 
				
			||||||
 | 
					            int or None: ID of the closest matching heading, or None if no match found.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        headings = self.fetch_headings()
 | 
				
			||||||
 | 
					        if not headings:
 | 
				
			||||||
 | 
					            print("No topics found in the database.")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        heading_titles = [title for _, title, _ in headings]
 | 
				
			||||||
 | 
					        closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if confidence < 50:
 | 
				
			||||||
 | 
					            print(f"No close matches found for '{input_title}' (Confidence: {confidence})")
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for heading_id, title, level in headings:
 | 
				
			||||||
 | 
					            if title == closest_match:
 | 
				
			||||||
 | 
					                return heading_id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def compute_file_hash(file_path: str) -> str:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Compute the MD5 hash of a file.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    hash_md5 = hashlib.md5()
 | 
				
			||||||
 | 
					    with open(file_path, "rb") as f:
 | 
				
			||||||
 | 
					        for chunk in iter(lambda: f.read(4096), b""):
 | 
				
			||||||
 | 
					            hash_md5.update(chunk)
 | 
				
			||||||
 | 
					    return hash_md5.hexdigest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_calendar(year: int) -> str:
 | 
				
			||||||
 | 
					    """Generate a markdown calendar for the specified year."""
 | 
				
			||||||
 | 
					    calendar_markdown = f"# {year}\n\n"
 | 
				
			||||||
 | 
					    current_date = datetime.now().date()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for month in range(1, 13):
 | 
				
			||||||
 | 
					        month_name = datetime(year, month, 1).strftime('%B')
 | 
				
			||||||
 | 
					        calendar_markdown += f"## {month:02d} / {month_name}\n\n"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Calculate the number of days in the month
 | 
				
			||||||
 | 
					        num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days if month < 12 else (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # Generate calendar entries for each day
 | 
				
			||||||
 | 
					        for day in range(1, num_days + 1):
 | 
				
			||||||
 | 
					            day_date = datetime(year, month, day).date()
 | 
				
			||||||
 | 
					            day_name = day_date.strftime('%a')
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            # Check if this is the current day and make it bold if so
 | 
				
			||||||
 | 
					            if str(day_date) == str(current_date):
 | 
				
			||||||
 | 
					                calendar_markdown += f"**{day:02d} / {day_name}**\n"
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                calendar_markdown += f"{day:02d} / {day_name}\n"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        calendar_markdown += "\n"  # Add a newline after each month
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return calendar_markdown
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def bootstrap_calendar(year: int, db_manager: DatabaseManager, markdown_file: str):
 | 
				
			||||||
 | 
					    """Generate and store a full year's markdown calendar in the database."""
 | 
				
			||||||
 | 
					    calendar_content = generate_calendar(year)
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    # Write the calendar to the specified markdown file
 | 
				
			||||||
 | 
					    with open(markdown_file, 'w', encoding='utf-8') as f:
 | 
				
			||||||
 | 
					        f.write(calendar_content)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Now use the DocumentManager and MarkdownProcessor to read this file into the database
 | 
				
			||||||
 | 
					    document_manager = DocumentManager(db_manager)
 | 
				
			||||||
 | 
					    document_id = document_manager.create_document(os.path.basename(markdown_file), markdown_file)
 | 
				
			||||||
 | 
					    markdown_processor = MarkdownProcessor(db_manager)
 | 
				
			||||||
 | 
					    markdown_processor.process_markdown(markdown_file, document_id)
 | 
				
			||||||
 | 
					    print(f"Calendar for year {year} has been generated and stored in the database.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def main():
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    This script processes a markdown file, updates an SQLite database,
 | 
				
			||||||
 | 
					    and optionally selects a topic based on user input.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Initializes managers for database and markdown handling. Updates documents based on 
 | 
				
			||||||
 | 
					    MD5 hash changes, and if a topic is provided, retrieves and writes its content 
 | 
				
			||||||
 | 
					    to the markdown file.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Args:
 | 
				
			||||||
 | 
					        -m, --markdown: Path to markdown file (default: 'calendar.md').
 | 
				
			||||||
 | 
					        -d, --database: Path to SQLite database file (default: 'markdown.db').
 | 
				
			||||||
 | 
					        topic_title: Optional topic for content selection (fuzzy matching enabled).
 | 
				
			||||||
 | 
					        --bootstrap: If provided, generates markdown calendar for the current year and loads it to the database.
 | 
				
			||||||
 | 
					        --ls: If provided, lists all available headings.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    # Set up command-line argument parsing
 | 
				
			||||||
 | 
					    parser = argparse.ArgumentParser(description='Process markdown file and optionally select a topic.')
 | 
				
			||||||
 | 
					    parser.add_argument('-m', '--markdown', type=str, default='calendar.md', help='Input/output markdown file (default: calendar.md)')
 | 
				
			||||||
 | 
					    parser.add_argument('-d', '--database', type=str, default='markdown.db', help='SQLite database file (default: markdown.db)')
 | 
				
			||||||
 | 
					    parser.add_argument('topic_title', nargs='?', type=str, help='Topic title to select (fuzzy matching enabled)')
 | 
				
			||||||
 | 
					    parser.add_argument('--bootstrap', action='store_true', help='Generate markdown calendar for the current year and load it to the database.')
 | 
				
			||||||
 | 
					    parser.add_argument('--ls', action='store_true', help='List all available headings.')
 | 
				
			||||||
 | 
					    args = parser.parse_args()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Use the provided or default file paths
 | 
				
			||||||
 | 
					    markdown_file = args.markdown
 | 
				
			||||||
 | 
					    database_file = args.database
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Initialize manager objects for database operations
 | 
				
			||||||
 | 
					    db_manager = DatabaseManager(database_file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if args.bootstrap:
 | 
				
			||||||
 | 
					        bootstrap_calendar(datetime.now().year, db_manager, markdown_file)
 | 
				
			||||||
 | 
					        db_manager.close()
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if args.ls:
 | 
				
			||||||
 | 
					        topic_reader = TopicReader(db_manager)
 | 
				
			||||||
 | 
					        print(topic_reader.list_headings())
 | 
				
			||||||
 | 
					        db_manager.close()
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Check if the markdown file exists
 | 
				
			||||||
 | 
					    if not os.path.exists(markdown_file):
 | 
				
			||||||
 | 
					        print(f"Error: Markdown file '{markdown_file}' not found. Use --bootstrap to create a new calendar.")
 | 
				
			||||||
 | 
					        db_manager.close()
 | 
				
			||||||
 | 
					        return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    document_manager = DocumentManager(db_manager)
 | 
				
			||||||
 | 
					    markdown_processor = MarkdownProcessor(db_manager)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Get the base name of the markdown file (without path)
 | 
				
			||||||
 | 
					    document_name = os.path.basename(markdown_file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Check if the document already exists in the database
 | 
				
			||||||
 | 
					    document = db_manager.cursor.execute('SELECT id, file_path, md5sum, updated_timestamp FROM documents WHERE name = ?', (document_name,)).fetchone()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Compute the current MD5 hash of the markdown file
 | 
				
			||||||
 | 
					    current_file_hash = compute_file_hash(markdown_file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if document:
 | 
				
			||||||
 | 
					        # If the document exists in the database
 | 
				
			||||||
 | 
					        document_id, stored_file_path, stored_md5sum, last_updated = document
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if stored_file_path != markdown_file:
 | 
				
			||||||
 | 
					            print(f"Updating file path for '{document_name}' in the database...")
 | 
				
			||||||
 | 
					            document_manager.update_document(document_id, file_path=markdown_file)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if stored_md5sum != current_file_hash:
 | 
				
			||||||
 | 
					            # If the file has changed since last update
 | 
				
			||||||
 | 
					            print(f"File '{document_name}' has changed. Updating the database...")
 | 
				
			||||||
 | 
					            document_manager.update_document_hash(document_id, current_file_hash)
 | 
				
			||||||
 | 
					            markdown_processor.process_markdown(markdown_file, document_id)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            # If the file hasn't changed
 | 
				
			||||||
 | 
					            print(f"File '{document_name}' has not changed. Skipping update.")
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        # If the document doesn't exist in the database
 | 
				
			||||||
 | 
					        print(f"Document '{document_name}' not found in the database. Adding new entry...")
 | 
				
			||||||
 | 
					        document_id = document_manager.create_document(document_name, markdown_file)
 | 
				
			||||||
 | 
					        document_manager.update_document_hash(document_id, current_file_hash)
 | 
				
			||||||
 | 
					        markdown_processor.process_markdown(markdown_file, document_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Check if a topic title argument is provided
 | 
				
			||||||
 | 
					    if args.topic_title:
 | 
				
			||||||
 | 
					        # Initialize TopicReader
 | 
				
			||||||
 | 
					        topic_reader = TopicReader(db_manager)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Retrieve the content for the specified topic
 | 
				
			||||||
 | 
					        result = topic_reader.get_topic_content(args.topic_title)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if result:
 | 
				
			||||||
 | 
					            # If content is found, write it back to the original markdown file
 | 
				
			||||||
 | 
					            with open(markdown_file, 'w', encoding='utf-8') as file:
 | 
				
			||||||
 | 
					                file.write(result)
 | 
				
			||||||
 | 
					            print(f"Selected topic and subtopics written to {markdown_file}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # Update the document hash in the database
 | 
				
			||||||
 | 
					            new_file_hash = compute_file_hash(markdown_file)
 | 
				
			||||||
 | 
					            document_manager.update_document_hash(document_id, new_file_hash)
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            # If no content is found
 | 
				
			||||||
 | 
					            print("No result to write. The original file remains unchanged.")
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        print("No topic title provided. The database has been updated/added without modifying the file.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Close the database connection
 | 
				
			||||||
 | 
					    db_manager.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__':
 | 
				
			||||||
 | 
					    main()
 | 
				
			||||||
		Reference in New Issue
	
	Block a user