markdown-calendar/markdown_sqlite.py

586 lines
24 KiB
Python
Raw Normal View History

# markdown_sqlite.py
"""
This script processes a markdown file by reading its content and
updates an SQLite database with document content and metadata.
The user can then select specific topics based on command-line arguments.
The script tracks changes using MD5 hashes and ensures the database
reflects the current state of the markdown file.
"""
2024-10-04 11:57:30 +03:00
import os
import sqlite3
import hashlib
import argparse
2024-10-04 11:57:30 +03:00
import logging
from datetime import datetime
from typing import List, Tuple, Optional
from markdown_it import MarkdownIt
from thefuzz import fuzz, process
class DatabaseManager:
"""Manages database connections and table creation."""
def __init__(self, db_file: str):
"""Initialize the DatabaseManager."""
self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10)
self.cursor: sqlite3.Cursor = self.conn.cursor()
self.create_tables()
def create_tables(self) -> None:
"""Create necessary tables in the database if they don't exist."""
self.cursor.executescript('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
file_path TEXT NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0
);
CREATE TABLE IF NOT EXISTS headings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
level INTEGER NOT NULL,
title TEXT NOT NULL,
parent_id INTEGER,
document_id INTEGER NOT NULL,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (parent_id) REFERENCES headings(id),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
CREATE TABLE IF NOT EXISTS body (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
heading_id INTEGER NOT NULL,
document_id INTEGER NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (heading_id) REFERENCES headings(id),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
''')
def close(self) -> None:
"""Close the database connection."""
self.conn.close()
class DocumentManager:
"""Manages document-related operations in the database."""
def __init__(self, db_manager: 'DatabaseManager'):
self.db_manager: DatabaseManager = db_manager
def document_exists(self, document_name: str) -> Optional[Tuple[int]]:
"""
Check if a document exists in the database.
Args:
document_name: Name of the document to check.
Returns:
Document ID if it exists, None otherwise.
"""
self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,))
return self.db_manager.cursor.fetchone()
def create_document(self, name: str, file_path: str) -> Optional[int]:
"""Create a new document entry in the database."""
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?)
''', (name, file_path, now))
self.db_manager.conn.commit()
return self.db_manager.cursor.lastrowid
def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None:
"""Update an existing document in the database."""
now: str = datetime.now().isoformat()
if name:
self.db_manager.cursor.execute('''
UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ?
''', (name, now, document_id))
if file_path:
self.db_manager.cursor.execute('''
UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ?
''', (file_path, now, document_id))
self.db_manager.conn.commit()
def soft_delete_document(self, document_id: int) -> None:
"""Soft delete a document by marking it as deleted in the database."""
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ?
''', (now, document_id))
self.db_manager.conn.commit()
def read_documents(self) -> List[Tuple]:
"""Retrieve all non-deleted documents from the database."""
self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0')
return self.db_manager.cursor.fetchall()
def update_document_hash(self, document_id: int, md5sum: str) -> None:
"""Update the MD5 hash of a document in the database."""
self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id))
self.db_manager.conn.commit()
class MarkdownProcessor:
"""Processes markdown files and stores content in the database."""
def __init__(self, db_manager: 'DatabaseManager') -> None:
"""Initialize the MarkdownProcessor."""
self.db_manager = db_manager
def process_markdown(self, markdown_file: str, document_id: int) -> None:
"""Process a markdown file and store its content in the database."""
markdown_text = self.read_markdown_file(markdown_file)
md = MarkdownIt()
tokens = md.parse(markdown_text)
self.clear_document_content(document_id)
self.store_markdown_content(tokens, document_id)
def read_markdown_file(self, file_path: str) -> str:
"""Read content from a markdown file."""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def clear_document_content(self, document_id: int) -> None:
"""Clear existing content for a document in the database."""
2024-10-04 11:57:30 +03:00
logging.debug(f"!! DELETING FROM DATABASE, document_id: {document_id}")
self.db_manager.cursor.execute('DELETE FROM headings WHERE document_id = ?', (document_id,))
self.db_manager.cursor.execute('DELETE FROM body WHERE document_id = ?', (document_id,))
def store_markdown_content(self, tokens: List, document_id: int) -> None:
"""Store parsed markdown content in the database."""
parent_stack: List[Tuple[int, int]] = [] # (level, heading_id)
current_heading_id = None
for token in tokens:
2024-10-04 11:57:30 +03:00
content_preview = ' '.join(token.content.split()[:10]) + '...' \
if len(token.content.split()) > 10 else token.content
logging.debug(f"Processing token: {token.type}, content: {content_preview}")
if token.type == 'heading_open':
level = int(token.tag.strip('h'))
content_token = tokens[tokens.index(token) + 1]
title = content_token.content
# Find the appropriate parent
while parent_stack and parent_stack[-1][0] >= level:
parent_stack.pop()
parent_id = parent_stack[-1][1] if parent_stack else None
current_heading_id = self.insert_heading(level, title, parent_id, document_id)
parent_stack.append((level, current_heading_id))
elif token.type == 'inline' and current_heading_id and token.content.strip():
# Only insert non-empty content that's not part of a heading
if tokens[tokens.index(token) - 1].type != 'heading_open':
self.insert_body(token.content, current_heading_id, document_id)
self.db_manager.conn.commit()
def insert_heading(self, level: int, title: str, parent_id: Optional[int], document_id: int) -> int:
"""Insert a heading into the database."""
self.db_manager.cursor.execute('''
INSERT INTO headings (level, title, parent_id, document_id)
VALUES (?, ?, ?, ?)
''', (level, title, parent_id, document_id))
return self.db_manager.cursor.lastrowid
def insert_body(self, content: str, heading_id: int, document_id: int) -> None:
"""Insert body content into the database with checksumming."""
md5sum = hashlib.md5(content.encode()).hexdigest()
self.db_manager.cursor.execute('''
INSERT INTO body (content, heading_id, document_id, md5sum)
VALUES (?, ?, ?, ?)
''', (content, heading_id, document_id, md5sum))
class TopicReader:
"""Reads and retrieves topics from the database."""
def __init__(self, db_manager: 'DatabaseManager'):
"""
Initialize the TopicReader.
Args:
db_manager (DatabaseManager): An instance of DatabaseManager.
"""
self.db_manager = db_manager
def fetch_headings(self) -> List[Tuple[int, str, int]]:
"""
Fetch all non-deleted headings from the database.
"""
self.db_manager.cursor.execute('SELECT id, title, level FROM headings WHERE isDeleted = 0 ORDER BY level, id')
return self.db_manager.cursor.fetchall()
def fetch_topic_chain(self, heading_id: int) -> List[Tuple[int, str, int]]:
"""
Fetch the topic chain (hierarchy of parent topics) for a given heading.
Returns:
List[Tuple[int, str, int]]: List of (id, title, level) tuples representing the topic chain.
"""
chain = []
current_id = heading_id
while current_id is not None:
self.db_manager.cursor.execute('SELECT id, title, level, parent_id FROM headings WHERE id = ?', (current_id,))
result = self.db_manager.cursor.fetchone()
if result:
chain.append((result[0], result[1], result[2]))
current_id = result[3]
else:
break
return list(reversed(chain))
def list_headings(self) -> str:
"""
List all available headings in a hierarchical structure.
Returns:
str: A formatted string containing all headings.
"""
headings = self.fetch_headings()
result = "Available headings:\n"
for _, title, level in headings:
indent = " " * (level - 1)
result += f"{indent}- {title}\n"
return result.strip()
def fetch_body_and_subtopics(self, heading_id: int, include_subtopics: bool = True, level_offset: int = 0) -> str:
"""
Fetch body content and subtopics for a given heading with improved Markdown formatting.
Args:
heading_id (int): ID of the heading to fetch.
include_subtopics (bool): Whether to include subtopics in the result.
level_offset (int): Offset to adjust heading levels for proper nesting.
Returns:
str: Formatted string containing the heading content and subtopics.
"""
# Fetch the current heading and body content
self.db_manager.cursor.execute('SELECT level, title FROM headings WHERE id = ?', (heading_id,))
level, title = self.db_manager.cursor.fetchone()
# Adjust the level based on the offset
adjusted_level = max(1, level - level_offset)
# Fetch the content for this heading
self.db_manager.cursor.execute('SELECT content FROM body WHERE heading_id = ?', (heading_id,))
rows = self.db_manager.cursor.fetchall()
body_content = '\n'.join([row[0] for row in rows])
# Construct the result with proper spacing
result = f"\n{'#' * adjusted_level} {title}\n\n"
if body_content.strip():
result += f"{body_content.strip()}\n\n"
if include_subtopics:
# Fetch all subtopics that are children of the current heading
subtopics = self._fetch_subtopics(heading_id, adjusted_level)
for subtopic_id, _, _ in subtopics:
# Recursively fetch subtopic content
subtopic_content = self.fetch_body_and_subtopics(subtopic_id, include_subtopics=True, level_offset=level_offset)
result += subtopic_content
return result.strip() + "\n" # Ensure there's a newline at the end of each section
def get_topic_content(self, input_title: str) -> Optional[str]:
"""
Get the content of a topic based on the input title, including its topic chain and subtopics.
Returns:
str or None: Formatted string containing the topic chain, content, and subtopics, or None if not found.
"""
heading_id = self.find_closest_heading(input_title)
if heading_id:
topic_chain = self.fetch_topic_chain(heading_id)
result = ""
for i, (id, title, level) in enumerate(topic_chain):
if id == heading_id:
# Fetch the full content for the selected topic and its subtopics
result += self.fetch_body_and_subtopics(id, include_subtopics=True, level_offset=i)
else:
# Include only the heading chain without duplicating content
result += f"\n{'#' * (level - i)} {title}\n\n"
return result.strip() + "\n" # Ensure there's a final newline
print(f"No topic found matching '{input_title}'.")
return None
def _fetch_subtopics(self, heading_id: int, parent_level: int) -> List[Tuple[int, int, str]]:
"""
Fetch all subtopics that are children of the given heading.
2024-10-04 11:57:30 +03:00
Returns:
List of tuples containing the subtopic's ID, level, and title.
"""
self.db_manager.cursor.execute('''
SELECT id, level, title
FROM headings
WHERE parent_id = ? AND isDeleted = 0
ORDER BY level, id
''', (heading_id,))
return self.db_manager.cursor.fetchall()
def find_closest_heading(self, input_title: str) -> Optional[int]:
"""
Find the closest matching heading to the input title using fuzzy matching.
Returns:
int or None: ID of the closest matching heading, or None if no match found.
"""
headings = self.fetch_headings()
if not headings:
print("No topics found in the database.")
return None
heading_titles = [title for _, title, _ in headings]
closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio)
if confidence < 50:
print(f"No close matches found for '{input_title}' (Confidence: {confidence})")
return None
for heading_id, title, level in headings:
if title == closest_match:
return heading_id
return None
def compute_file_hash(file_path: str) -> str:
"""
Compute the MD5 hash of a file.
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def generate_calendar(year: int) -> str:
"""Generate a markdown calendar for the specified year."""
calendar_markdown = f"# {year}\n\n"
current_date = datetime.now().date()
for month in range(1, 13):
month_name = datetime(year, month, 1).strftime('%B')
calendar_markdown += f"## {month:02d} / {month_name}\n\n"
# Calculate the number of days in the month
num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days if month < 12 else (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days
# Generate calendar entries for each day
for day in range(1, num_days + 1):
day_date = datetime(year, month, day).date()
day_name = day_date.strftime('%a')
# Check if this is the current day and make it bold if so
if str(day_date) == str(current_date):
calendar_markdown += f"**{day:02d} / {day_name}**\n"
else:
calendar_markdown += f"{day:02d} / {day_name}\n"
calendar_markdown += "\n" # Add a newline after each month
return calendar_markdown
def convert_to_html(markdown_content: str) -> str:
"""
Convert Markdown content to HTML.
"""
md = MarkdownIt()
html_content = md.render(markdown_content)
# Wrap the content in a basic HTML structure
html_document = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Calendar</title>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; padding: 20px; }}
h1, h2, h3, h4, h5, h6 {{ margin-top: 24px; margin-bottom: 16px; }}
h1 {{ font-size: 2em; }}
h2 {{ font-size: 1.5em; }}
h3 {{ font-size: 1.25em; }}
</style>
</head>
<body>
{html_content}
</body>
</html>
"""
return html_document
def bootstrap_calendar(year: int, db_manager: DatabaseManager, markdown_file: str):
"""Generate and store a full year's markdown calendar in the database."""
calendar_content = generate_calendar(year)
# Write the calendar to the specified markdown file
with open(markdown_file, 'w', encoding='utf-8') as f:
f.write(calendar_content)
# Now use the DocumentManager and MarkdownProcessor to read this file into the database
document_manager = DocumentManager(db_manager)
document_id = document_manager.create_document(os.path.basename(markdown_file), markdown_file)
markdown_processor = MarkdownProcessor(db_manager)
markdown_processor.process_markdown(markdown_file, document_id)
print(f"Calendar for year {year} has been generated and stored in the database.")
def main():
"""
This script processes a markdown file, updates an SQLite database,
and optionally selects a topic based on user input.
Initializes managers for database and markdown handling. Updates documents based on
MD5 hash changes, and if a topic is provided, retrieves and writes its content
to the markdown file.
Args:
-m, --markdown: Path to markdown file (default: 'calendar.md').
-d, --database: Path to SQLite database file (default: 'markdown.db').
topic_title: Optional topic for content selection (fuzzy matching enabled).
--bootstrap: If provided, generates markdown calendar for the current year and loads it to the database.
--ls: If provided, lists all available headings.
2024-10-04 11:57:30 +03:00
--html: If provided, will produce {filename}.html file along the markdown file.
"""
# Set up command-line argument parsing
parser = argparse.ArgumentParser(description='Process markdown file and optionally select a topic.')
parser.add_argument('-m', '--markdown', type=str, default='calendar.md', help='Input/output markdown file (default: calendar.md)')
parser.add_argument('-d', '--database', type=str, default='markdown.db', help='SQLite database file (default: markdown.db)')
parser.add_argument('topic_title', nargs='?', type=str, help='Topic title to select (fuzzy matching enabled)')
parser.add_argument('--bootstrap', action='store_true', help='Generate markdown calendar for the current year and load it to the database.')
parser.add_argument('--ls', action='store_true', help='List all available headings.')
parser.add_argument('--html', action='store_true', help='Generate an HTML version of the output')
2024-10-04 11:57:30 +03:00
parser.add_argument('--debug', action='store_true', help='Enable debug printing')
args = parser.parse_args()
2024-10-04 11:57:30 +03:00
# Set up logging
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
# Use the provided or default file paths
markdown_file = args.markdown
database_file = args.database
# Initialize manager objects for database operations
db_manager = DatabaseManager(database_file)
if args.bootstrap:
bootstrap_calendar(datetime.now().year, db_manager, markdown_file)
db_manager.close()
return
if args.ls:
topic_reader = TopicReader(db_manager)
print(topic_reader.list_headings())
db_manager.close()
return
# Check if the markdown file exists
if not os.path.exists(markdown_file):
print(f"Error: Markdown file '{markdown_file}' not found. Use --bootstrap to create a new calendar.")
db_manager.close()
return
document_manager = DocumentManager(db_manager)
markdown_processor = MarkdownProcessor(db_manager)
# Get the base name of the markdown file (without path)
document_name = os.path.basename(markdown_file)
# Check if the document already exists in the database
document = db_manager.cursor.execute('SELECT id, file_path, md5sum, updated_timestamp FROM documents WHERE name = ?', (document_name,)).fetchone()
# Compute the current MD5 hash of the markdown file
current_file_hash = compute_file_hash(markdown_file)
if document:
# If the document exists in the database
document_id, stored_file_path, stored_md5sum, last_updated = document
if stored_file_path != markdown_file:
print(f"Updating file path for '{document_name}' in the database...")
document_manager.update_document(document_id, file_path=markdown_file)
if stored_md5sum != current_file_hash:
# If the file has changed since last update
print(f"File '{document_name}' has changed. Updating the database...")
document_manager.update_document_hash(document_id, current_file_hash)
markdown_processor.process_markdown(markdown_file, document_id)
else:
# If the file hasn't changed
print(f"File '{document_name}' has not changed. Skipping update.")
else:
# If the document doesn't exist in the database
print(f"Document '{document_name}' not found in the database. Adding new entry...")
document_id = document_manager.create_document(document_name, markdown_file)
document_manager.update_document_hash(document_id, current_file_hash)
markdown_processor.process_markdown(markdown_file, document_id)
# Check if a topic title argument is provided
if args.topic_title:
# Initialize TopicReader
topic_reader = TopicReader(db_manager)
# Retrieve the content for the specified topic
result = topic_reader.get_topic_content(args.topic_title)
if result:
# If content is found, write it back to the original markdown file
with open(markdown_file, 'w', encoding='utf-8') as file:
file.write(result)
file.write('\n')
print(f"Selected topic and subtopics written to {markdown_file}")
# Generate HTML if --html option is specified
if args.html:
html_file = os.path.splitext(markdown_file)[0] + '.html'
html_content = convert_to_html(result)
with open(html_file, 'w', encoding='utf-8') as file:
file.write(html_content)
print(f"HTML version written to {html_file}")
# Update the document hash in the database
new_file_hash = compute_file_hash(markdown_file)
document_manager.update_document_hash(document_id, new_file_hash)
else:
# If no content is found
print("No result to write. The original file remains unchanged.")
else:
print("No topic title provided. The database has been updated/added without modifying the file.")
# Generate HTML for the entire document if --html option is specified
if args.html:
with open(markdown_file, 'r', encoding='utf-8') as file:
markdown_content = file.read()
html_file = os.path.splitext(markdown_file)[0] + '.html'
html_content = convert_to_html(markdown_content)
with open(html_file, 'w', encoding='utf-8') as file:
file.write(html_content)
print(f"HTML version of the entire document written to {html_file}")
# Close the database connection
db_manager.close()
if __name__ == '__main__':
main()