markdown-calendar/markdown_sqlite.py
2024-10-05 15:46:58 +03:00

622 lines
26 KiB
Python

# markdown_sqlite.py
"""
This script processes a markdown file by reading its content and
updates an SQLite database with document content and metadata.
The user can then select specific topics based on command-line arguments.
The script tracks changes using MD5 hashes and ensures the database
reflects the current state of the markdown file.
"""
import os
import sqlite3
import uuid
import hashlib
import argparse
import logging
from datetime import datetime
from typing import List, Tuple, Dict, Set, Optional
from markdown_it import MarkdownIt
from thefuzz import fuzz, process
class DatabaseManager:
"""Manages database connections and table creation."""
def __init__(self, db_file: str):
"""Initialize the DatabaseManager."""
self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10)
self.cursor: sqlite3.Cursor = self.conn.cursor()
self.create_tables()
def create_tables(self) -> None:
"""Create necessary tables in the database if they don't exist."""
self.cursor.executescript('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
file_path TEXT NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0
);
CREATE TABLE IF NOT EXISTS headings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
level INTEGER NOT NULL,
title TEXT NOT NULL,
parent_uuid TEXT,
document_id INTEGER NOT NULL,
path TEXT NOT NULL,
headings_order INTEGER,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (parent_uuid) REFERENCES headings(uuid),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
CREATE TABLE IF NOT EXISTS body (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
content TEXT,
heading_uuid TEXT NOT NULL,
document_id INTEGER NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (heading_uuid) REFERENCES headings(uuid),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
''')
def close(self) -> None:
"""Close the database connection."""
self.conn.close()
class DocumentManager:
"""Manages document-related operations in the database."""
def __init__(self, db_manager: 'DatabaseManager'):
self.db_manager: DatabaseManager = db_manager
def document_exists(self, document_name: str) -> Optional[Tuple[int]]:
"""
Check if a document exists in the database.
"""
self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,))
return self.db_manager.cursor.fetchone()
def create_document(self, name: str, file_path: str) -> Optional[int]:
"""Create a new document entry in the database."""
logging.debug(f"** Creating new document entry to database")
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?)
''', (name, file_path, now))
self.db_manager.conn.commit()
return self.db_manager.cursor.lastrowid
def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None:
"""Update an existing document in the database."""
logging.debug(f"** Updating document, document_id: {document_id}")
now: str = datetime.now().isoformat()
if name:
self.db_manager.cursor.execute('''
UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ?
''', (name, now, document_id))
if file_path:
self.db_manager.cursor.execute('''
UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ?
''', (file_path, now, document_id))
self.db_manager.conn.commit()
def soft_delete_document(self, document_id: int) -> None:
"""Soft delete a document by marking it as deleted in the database."""
logging.debug(f"** This document is now soft deleted, document_id: {document_id}")
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ?
''', (now, document_id))
self.db_manager.conn.commit()
def read_documents(self) -> List[Tuple]:
"""Retrieve all non-deleted documents from the database."""
self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0')
return self.db_manager.cursor.fetchall()
def update_document_hash(self, document_id: int, md5sum: str) -> None:
"""Update the MD5 hash of a document in the database."""
self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id))
self.db_manager.conn.commit()
class MarkdownProcessor:
"""Processes markdown files and stores content in the database."""
def __init__(self, db_manager: 'DatabaseManager') -> None:
self.db_manager = db_manager
def process_markdown(self, markdown_file: str, document_id: int) -> None:
markdown_text = self.read_markdown_file(markdown_file)
md = MarkdownIt()
tokens = md.parse(markdown_text)
print('### Calling update_document_content')
self.update_document_content(tokens, document_id)
def read_markdown_file(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def update_document_content(self, tokens: List, document_id: int) -> None:
existing_structure = self.get_existing_document_structure(document_id)
new_structure = self.parse_new_structure(tokens, document_id)
print('### Calling merg_structures...')
self.merge_structures(existing_structure, new_structure, document_id)
def get_existing_document_structure(self, document_id: int) -> Dict:
structure = {}
self.db_manager.cursor.execute('''
SELECT h.uuid, h.level, h.title, h.parent_uuid, h.path, b.content, b.uuid
FROM headings h
LEFT JOIN body b ON h.uuid = b.heading_uuid
WHERE h.document_id = ? AND h.isDeleted = 0
ORDER BY h.level, h.id
''', (document_id,))
for heading_uuid, level, title, parent_uuid, path, content, body_uuid in self.db_manager.cursor.fetchall():
structure[heading_uuid] = {
'uuid': heading_uuid,
'level': level,
'title': title,
'parent_uuid': parent_uuid,
'path': path,
'content': content,
'body_uuid': body_uuid,
'children': []
}
# Build the tree structure
for uuid, node in structure.items():
if node['parent_uuid'] in structure:
structure[node['parent_uuid']]['children'].append(uuid)
return structure
def parse_new_structure(self, tokens: List, document_id: int) -> Dict:
structure = {}
current_heading = None
current_content = []
parent_stack = [{"uuid": None, "level": 0, "path": ""}]
for token in tokens:
if token.type == 'heading_open':
if current_heading:
structure[current_heading]['content'] = ''.join(current_content).strip()
level = int(token.tag.strip('h'))
while parent_stack[-1]['level'] >= level:
parent_stack.pop()
current_heading = str(uuid.uuid4())
parent_path = parent_stack[-1]['path']
structure[current_heading] = {
'uuid': current_heading,
'level': level,
'title': '',
'parent_uuid': parent_stack[-1]['uuid'],
'path': f"{parent_path}/{current_heading}" if parent_path else current_heading,
'content': '',
'children': []
}
parent_stack.append({"uuid": current_heading, "level": level, "path": structure[current_heading]['path']})
current_content = []
elif token.type == 'heading_close':
structure[current_heading]['content'] = ''.join(current_content).strip()
elif token.type == 'inline' and current_heading:
if structure[current_heading]['title'] == '':
structure[current_heading]['title'] = token.content
else:
current_content.append(token.content)
elif current_heading:
current_content.append(token.content)
if current_heading:
structure[current_heading]['content'] = ''.join(current_content).strip()
return structure
def merge_structures(self, existing: Dict, new: Dict, document_id: int) -> None:
def merge_recursive(existing_node, new_node, parent_uuid):
if not existing_node:
print('#### This seems to be a new stuff...')
# This is a new node, insert it
print(f"##### inserting new heading: {new_node['title']}")
heading_uuid = self.insert_heading(new_node['level'], new_node['title'], parent_uuid, document_id, new_node['path'])
##print(f"##### inserting new body with id: {heading_uuid}, :\n {new_node['content']}\n")
if new_node['content']:
body_uuid = self.insert_body(new_node['content'], heading_uuid, document_id)
for child in new_node['children']:
merge_recursive(None, new[child], heading_uuid)
else:
print('#### This seems to known ')
# Update existing node
self.update_heading(existing_node['uuid'], new_node['title'], new_node['level'], parent_uuid, new_node['path'])
self.update_body(existing_node['body_uuid'], new_node['content'], document_id)
print('#### Does these update_body and update_heading methods work?')
# Process children
existing_children = {child['title']: child for child in existing_node['children']}
new_children = {child['title']: child for child in new_node['children']}
for title, child in new_children.items():
if title in existing_children:
merge_recursive(existing_children[title], child, existing_node['uuid'])
else:
merge_recursive(None, child, existing_node['uuid'])
for title, child in existing_children.items():
if title not in new_children:
self.soft_delete_heading(child['uuid'])
for new_root in new.values():
print('#### Found a chapter!')
existing_root = next((node for node in existing.values() if node['path'] == new_root['path']), None)
merge_recursive(existing_root, new_root, None)
def insert_heading(self, level: int, title: str, parent_uuid: Optional[str], document_id: int, path: str) -> str:
heading_uuid = str(uuid.uuid4())
self.db_manager.cursor.execute('''
INSERT INTO headings (uuid, level, title, parent_uuid, document_id, path)
VALUES (?, ?, ?, ?, ?, ?)
''', (heading_uuid, level, title, parent_uuid, document_id, path))
return heading_uuid
def update_heading(self, heading_uuid: str, title: str, level: int, parent_uuid: Optional[str], path: str) -> None:
self.db_manager.cursor.execute('''
UPDATE headings
SET title = ?, level = ?, parent_uuid = ?, path = ?, updated_timestamp = CURRENT_TIMESTAMP
WHERE uuid = ?
''', (title, level, parent_uuid, path, heading_uuid))
def insert_body(self, content: str, heading_uuid: str, document_id: int) -> str:
body_uuid = str(uuid.uuid4())
md5sum = hashlib.md5(content.encode()).hexdigest()
print(f"###### Trying to insert body text with md5sum of: {md5sum} to uuid: {body_uuid}, with content: \n{content}\n")
# Verify input parameters
if not all([content, heading_uuid, document_id]):
raise ValueError("Missing required parameters for insert_body")
try:
# Check if heading_uuid exists
self.db_manager.cursor.execute("SELECT 1 FROM headings WHERE uuid = ?", (heading_uuid,))
if not self.db_manager.cursor.fetchone():
raise ValueError(f"heading_uuid {heading_uuid} does not exist in headings table")
# Check if document_id exists
self.db_manager.cursor.execute("SELECT 1 FROM documents WHERE id = ?", (document_id,))
if not self.db_manager.cursor.fetchone():
raise ValueError(f"document_id {document_id} does not exist in documents table")
# Insert the body
self.db_manager.cursor.execute('''
INSERT INTO body (uuid, content, heading_uuid, document_id, md5sum)
VALUES (?, ?, ?, ?, ?)
''', (body_uuid, content, heading_uuid, document_id, md5sum))
self.db_manager.conn.commit()
print(f"###### Successfully inserted body with uuid: {body_uuid}")
except sqlite3.Error as e:
print(f"An error occurred while inserting body: {e}")
self.db_manager.conn.rollback()
raise
except ValueError as e:
print(f"Validation error: {e}")
raise
return body_uuid
def update_body(self, body_uuid: str, content: str, document_id: int) -> None:
md5sum = hashlib.md5(content.encode()).hexdigest()
self.db_manager.cursor.execute('''
UPDATE body
SET content = ?, md5sum = ?, updated_timestamp = CURRENT_TIMESTAMP
WHERE uuid = ? AND document_id = ?
''', (content, md5sum, body_uuid, document_id))
def soft_delete_heading(self, heading_uuid: str) -> None:
now = datetime.now().isoformat()
self.db_manager.cursor.execute('''
UPDATE headings
SET isDeleted = 1, deleted_timestamp = ?
WHERE uuid = ?
''', (now, heading_uuid))
# Also soft delete associated body content
self.db_manager.cursor.execute('''
UPDATE body
SET isDeleted = 1, deleted_timestamp = ?
WHERE heading_uuid = ?
''', (now, heading_uuid))
class TopicReader:
"""Reads and retrieves topics from the database."""
def __init__(self, db_manager: 'DatabaseManager'):
self.db_manager = db_manager
def fetch_headings(self) -> List[Tuple[str, str, int, Optional[str]]]:
self.db_manager.cursor.execute('''
SELECT uuid, title, level, parent_uuid
FROM headings
WHERE isDeleted = 0
ORDER BY level, headings_order
''')
return self.db_manager.cursor.fetchall()
def fetch_topic_chain(self, heading_uuid: str) -> List[Tuple[str, str, int]]:
chain = []
current_uuid = heading_uuid
while current_uuid is not None:
self.db_manager.cursor.execute('''
SELECT uuid, title, level, parent_uuid
FROM headings
WHERE uuid = ?
''', (current_uuid,))
result = self.db_manager.cursor.fetchone()
if result:
chain.append((result[0], result[1], result[2]))
current_uuid = result[3]
else:
break
return list(reversed(chain))
def list_headings(self) -> str:
headings = self.fetch_headings()
result = "Available headings:\n"
def build_tree(parent_uuid, level):
tree = ""
for uuid, title, hlevel, parent in headings:
if parent == parent_uuid:
indent = " " * (hlevel - 1)
tree += f"{indent}- {title}\n"
tree += build_tree(uuid, hlevel + 1)
return tree
result += build_tree(None, 1)
return result.strip()
def get_topic_content(self, input_title: str) -> Optional[str]:
heading_uuid = self.find_closest_heading(input_title)
if heading_uuid:
topic_chain = self.fetch_topic_chain(heading_uuid)
result = self.build_full_content(topic_chain[-1][0])
return result
return None
def build_full_content(self, heading_uuid: str, level_offset: int = 0) -> str:
self.db_manager.cursor.execute('''
SELECT h.level, h.title, b.content
FROM headings h
LEFT JOIN body b ON h.uuid = b.heading_uuid
WHERE h.uuid = ? AND h.isDeleted = 0
''', (heading_uuid,))
heading = self.db_manager.cursor.fetchone()
if not heading:
return ""
level, title, content = heading
adjusted_level = max(1, level - level_offset)
result = f"{'#' * adjusted_level} {title}\n\n"
if content:
result += f"{content.strip()}\n\n"
# Fetch and process all child headings
self.db_manager.cursor.execute('''
SELECT uuid FROM headings
WHERE parent_uuid = ? AND isDeleted = 0
ORDER BY level, headings_order
''', (heading_uuid,))
children = self.db_manager.cursor.fetchall()
for child in children:
result += self.build_full_content(child[0], level_offset)
return result
def find_closest_heading(self, input_title: str) -> Optional[str]:
headings = self.fetch_headings()
if not headings:
print("No topics found in the database.")
return None
heading_titles = [title for _, title, _, _ in headings]
closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio)
if confidence < 50:
print(f"No close matches found for '{input_title}' (Confidence: {confidence})")
return None
for heading_uuid, title, _, _ in headings:
if title == closest_match:
return heading_uuid
return None
def compute_file_hash(file_path: str) -> str:
"""
Compute the MD5 hash of a file.
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def generate_calendar(year: int, db_manager: 'DatabaseManager', document_id: int) -> str:
"""
Generate a markdown calendar for the specified year.
"""
calendar_markdown = f"# {year}\n\n"
current_date = datetime.now().date()
# Loop through the months
for month in range(1, 13):
month_name = datetime(year, month, 1).strftime('%B')
calendar_markdown += f"## {month:02d} / {month_name}\n\n"
# Determine the number of days in the month
if month == 12:
num_days = (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days
else:
num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days
# Create calendar entries for each day in order
for day in range(1, num_days + 1):
day_date = datetime(year, month, day).date()
day_name = day_date.strftime('%a')
# Add bold styling for the current date
if str(day_date) == str(current_date):
calendar_markdown += f"**{day:02d} / {day_name}**\n"
else:
calendar_markdown += f"{day:02d} / {day_name}\n"
calendar_markdown += '\n'
# Now parse the markdown and insert into the database
#parse_and_insert_markdown(calendar_markdown, db_manager, document_id, year)
return calendar_markdown
def convert_to_html(markdown_content: str, heading_uuid: Optional[str] = None) -> str:
"""
Convert Markdown content (or specific section) to HTML.
"""
md = MarkdownIt()
if heading_uuid:
# Fetch content for a specific heading and its sub-headings from the database
# Example SQL to get heading content based on UUID:
# SELECT title, content FROM headings WHERE uuid = ?
pass
html_content = md.render(markdown_content)
# Wrap the content in a basic HTML structure
html_document = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; padding: 20px; }}
h1, h2, h3, h4, h5, h6 {{ margin-top: 24px; margin-bottom: 16px; }}
h1 {{ font-size: 2em; }}
h2 {{ font-size: 1.5em; }}
h3 {{ font-size: 1.25em; }}
</style>
</head>
<body>
{html_content}
</body>
</html>
"""
return html_document
def bootstrap_calendar(year: int, db_manager: DatabaseManager, markdown_file: str):
"""Generate and store a full year's markdown calendar in the database using UUIDs."""
document_manager = DocumentManager(db_manager)
markdown_processor = MarkdownProcessor(db_manager)
# Generate calendar markdown and insert into the database
print('## Generating calendar')
calendar_content = generate_calendar(year, db_manager, document_id=document_manager.create_document(f"{year} Calendar", markdown_file))
# Write the calendar to the markdown file
print('## Reading the newly created calendar')
with open(markdown_file, 'w', encoding='utf-8') as f:
f.write(calendar_content)
# Process the markdown to update or store in the database
markdown_processor.process_markdown(markdown_file, document_manager.document_exists(f"{year} Calendar")[0])
print(f"Calendar for year {year} has been generated and stored in the database.")
def main():
"""
This script processes a markdown file, updates an SQLite database,
and optionally selects a topic based on user input.
"""
parser = argparse.ArgumentParser(description='Process markdown file and optionally select a topic.')
parser.add_argument('-m', '--markdown', type=str, default='calendar.md', help='Input/output markdown file (default: calendar.md)')
parser.add_argument('-d', '--database', type=str, default='markdown.db', help='SQLite database file (default: markdown.db)')
parser.add_argument('topic_title', nargs='?', type=str, help='Topic title to select (fuzzy matching enabled)')
parser.add_argument('--bootstrap', action='store_true', help='Generate markdown calendar for the current year and load it to the database.')
parser.add_argument('--ls', action='store_true', help='List all available headings.')
parser.add_argument('--html', action='store_true', help='Generate an HTML version of the output.')
parser.add_argument('--uuid', type=str, help='Specify a UUID to retrieve content.')
parser.add_argument('--debug', action='store_true', help='Enable debug printing')
args = parser.parse_args()
# Setup basic logging
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
# Check for markdown file presence
if not os.path.exists(args.markdown) and not args.bootstrap:
print(f"Error: Markdown file '{args.markdown}' not found. Use --bootstrap to create a new calendar.")
db_manager.close()
return
# Check for databse file presence
if not os.path.exists(args.database) and not args.bootstrap:
print(f"Error: Database file '{args.database}' not found. Use --bootstrap to create a new calendar.")
db_manager.close()
return
# Initialize manager objects
db_manager = DatabaseManager(args.database)
if args.bootstrap:
print('## Running calendar bootstrap')
bootstrap_calendar(datetime.now().year, db_manager, args.markdown)
db_manager.close()
return
document_manager = DocumentManager(db_manager)
if args.ls:
topic_reader = TopicReader(db_manager)
print(topic_reader.list_headings())
db_manager.close()
return
# Topic or UUID-based content retrieval
if args.topic_title or args.uuid:
topic_reader = TopicReader(db_manager)
if args.uuid:
content = topic_reader.build_full_content(args.uuid)
else:
content = topic_reader.get_topic_content(args.topic_title)
if content:
# Write the selected content to the markdown file
with open(args.markdown, 'w', encoding='utf-8') as file:
file.write(content)
file.write('\n')
print(f"Selected content written to {args.markdown}")
# Optionally convert to HTML
if args.html:
html_file = f"{args.markdown}.html"
with open(html_file, 'w', encoding='utf-8') as file:
file.write(convert_to_html(content))
print(f"HTML version written to {html_file}")
db_manager.close()
if __name__ == '__main__':
main()