markdown-calendar/markdown_sqlite.py

622 lines
26 KiB
Python
Raw Permalink Normal View History

# markdown_sqlite.py
"""
This script processes a markdown file by reading its content and
updates an SQLite database with document content and metadata.
The user can then select specific topics based on command-line arguments.
The script tracks changes using MD5 hashes and ensures the database
reflects the current state of the markdown file.
"""
2024-10-04 11:57:30 +03:00
import os
import sqlite3
2024-10-04 21:16:00 +03:00
import uuid
import hashlib
import argparse
2024-10-04 11:57:30 +03:00
import logging
from datetime import datetime
from typing import List, Tuple, Dict, Set, Optional
from markdown_it import MarkdownIt
from thefuzz import fuzz, process
class DatabaseManager:
"""Manages database connections and table creation."""
def __init__(self, db_file: str):
"""Initialize the DatabaseManager."""
self.conn: sqlite3.Connection = sqlite3.connect(db_file, timeout=10)
self.cursor: sqlite3.Cursor = self.conn.cursor()
self.create_tables()
def create_tables(self) -> None:
"""Create necessary tables in the database if they don't exist."""
self.cursor.executescript('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
file_path TEXT NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0
);
CREATE TABLE IF NOT EXISTS headings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
level INTEGER NOT NULL,
title TEXT NOT NULL,
parent_uuid TEXT,
document_id INTEGER NOT NULL,
path TEXT NOT NULL,
headings_order INTEGER,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (parent_uuid) REFERENCES headings(uuid),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
CREATE TABLE IF NOT EXISTS body (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
content TEXT,
heading_uuid TEXT NOT NULL,
document_id INTEGER NOT NULL,
md5sum TEXT,
added_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_timestamp DATETIME,
deleted_timestamp DATETIME,
isDeleted BOOLEAN DEFAULT 0,
FOREIGN KEY (heading_uuid) REFERENCES headings(uuid),
FOREIGN KEY (document_id) REFERENCES documents(id)
);
''')
def close(self) -> None:
"""Close the database connection."""
self.conn.close()
class DocumentManager:
"""Manages document-related operations in the database."""
def __init__(self, db_manager: 'DatabaseManager'):
self.db_manager: DatabaseManager = db_manager
def document_exists(self, document_name: str) -> Optional[Tuple[int]]:
"""
Check if a document exists in the database.
"""
self.db_manager.cursor.execute('SELECT id FROM documents WHERE name = ?', (document_name,))
return self.db_manager.cursor.fetchone()
def create_document(self, name: str, file_path: str) -> Optional[int]:
"""Create a new document entry in the database."""
2024-10-04 12:19:17 +03:00
logging.debug(f"** Creating new document entry to database")
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
INSERT INTO documents (name, file_path, added_timestamp) VALUES (?, ?, ?)
''', (name, file_path, now))
self.db_manager.conn.commit()
return self.db_manager.cursor.lastrowid
def update_document(self, document_id: int, name: Optional[str] = None, file_path: Optional[str] = None) -> None:
"""Update an existing document in the database."""
2024-10-04 12:19:17 +03:00
logging.debug(f"** Updating document, document_id: {document_id}")
now: str = datetime.now().isoformat()
if name:
self.db_manager.cursor.execute('''
UPDATE documents SET name = ?, updated_timestamp = ? WHERE id = ?
''', (name, now, document_id))
if file_path:
self.db_manager.cursor.execute('''
UPDATE documents SET file_path = ?, updated_timestamp = ? WHERE id = ?
''', (file_path, now, document_id))
self.db_manager.conn.commit()
def soft_delete_document(self, document_id: int) -> None:
"""Soft delete a document by marking it as deleted in the database."""
logging.debug(f"** This document is now soft deleted, document_id: {document_id}")
now: str = datetime.now().isoformat()
self.db_manager.cursor.execute('''
UPDATE documents SET isDeleted = 1, deleted_timestamp = ? WHERE id = ?
''', (now, document_id))
self.db_manager.conn.commit()
def read_documents(self) -> List[Tuple]:
"""Retrieve all non-deleted documents from the database."""
self.db_manager.cursor.execute('SELECT * FROM documents WHERE isDeleted = 0')
return self.db_manager.cursor.fetchall()
def update_document_hash(self, document_id: int, md5sum: str) -> None:
"""Update the MD5 hash of a document in the database."""
self.db_manager.cursor.execute('UPDATE documents SET md5sum = ? WHERE id = ?', (md5sum, document_id))
self.db_manager.conn.commit()
class MarkdownProcessor:
"""Processes markdown files and stores content in the database."""
def __init__(self, db_manager: 'DatabaseManager') -> None:
self.db_manager = db_manager
def process_markdown(self, markdown_file: str, document_id: int) -> None:
markdown_text = self.read_markdown_file(markdown_file)
md = MarkdownIt()
tokens = md.parse(markdown_text)
print('### Calling update_document_content')
self.update_document_content(tokens, document_id)
def read_markdown_file(self, file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def update_document_content(self, tokens: List, document_id: int) -> None:
existing_structure = self.get_existing_document_structure(document_id)
new_structure = self.parse_new_structure(tokens, document_id)
print('### Calling merg_structures...')
self.merge_structures(existing_structure, new_structure, document_id)
def get_existing_document_structure(self, document_id: int) -> Dict:
structure = {}
self.db_manager.cursor.execute('''
SELECT h.uuid, h.level, h.title, h.parent_uuid, h.path, b.content, b.uuid
FROM headings h
LEFT JOIN body b ON h.uuid = b.heading_uuid
WHERE h.document_id = ? AND h.isDeleted = 0
ORDER BY h.level, h.id
''', (document_id,))
for heading_uuid, level, title, parent_uuid, path, content, body_uuid in self.db_manager.cursor.fetchall():
structure[heading_uuid] = {
'uuid': heading_uuid,
'level': level,
'title': title,
'parent_uuid': parent_uuid,
'path': path,
'content': content,
'body_uuid': body_uuid,
'children': []
}
# Build the tree structure
for uuid, node in structure.items():
if node['parent_uuid'] in structure:
structure[node['parent_uuid']]['children'].append(uuid)
2024-10-04 21:16:00 +03:00
return structure
def parse_new_structure(self, tokens: List, document_id: int) -> Dict:
structure = {}
current_heading = None
current_content = []
parent_stack = [{"uuid": None, "level": 0, "path": ""}]
for token in tokens:
if token.type == 'heading_open':
if current_heading:
structure[current_heading]['content'] = ''.join(current_content).strip()
level = int(token.tag.strip('h'))
while parent_stack[-1]['level'] >= level:
parent_stack.pop()
current_heading = str(uuid.uuid4())
parent_path = parent_stack[-1]['path']
structure[current_heading] = {
'uuid': current_heading,
'level': level,
'title': '',
'parent_uuid': parent_stack[-1]['uuid'],
'path': f"{parent_path}/{current_heading}" if parent_path else current_heading,
'content': '',
'children': []
}
parent_stack.append({"uuid": current_heading, "level": level, "path": structure[current_heading]['path']})
current_content = []
elif token.type == 'heading_close':
structure[current_heading]['content'] = ''.join(current_content).strip()
elif token.type == 'inline' and current_heading:
if structure[current_heading]['title'] == '':
structure[current_heading]['title'] = token.content
else:
current_content.append(token.content)
elif current_heading:
current_content.append(token.content)
if current_heading:
structure[current_heading]['content'] = ''.join(current_content).strip()
return structure
def merge_structures(self, existing: Dict, new: Dict, document_id: int) -> None:
def merge_recursive(existing_node, new_node, parent_uuid):
if not existing_node:
print('#### This seems to be a new stuff...')
# This is a new node, insert it
print(f"##### inserting new heading: {new_node['title']}")
heading_uuid = self.insert_heading(new_node['level'], new_node['title'], parent_uuid, document_id, new_node['path'])
##print(f"##### inserting new body with id: {heading_uuid}, :\n {new_node['content']}\n")
if new_node['content']:
body_uuid = self.insert_body(new_node['content'], heading_uuid, document_id)
for child in new_node['children']:
merge_recursive(None, new[child], heading_uuid)
else:
print('#### This seems to known ')
# Update existing node
self.update_heading(existing_node['uuid'], new_node['title'], new_node['level'], parent_uuid, new_node['path'])
self.update_body(existing_node['body_uuid'], new_node['content'], document_id)
print('#### Does these update_body and update_heading methods work?')
# Process children
existing_children = {child['title']: child for child in existing_node['children']}
new_children = {child['title']: child for child in new_node['children']}
for title, child in new_children.items():
if title in existing_children:
merge_recursive(existing_children[title], child, existing_node['uuid'])
else:
merge_recursive(None, child, existing_node['uuid'])
for title, child in existing_children.items():
if title not in new_children:
self.soft_delete_heading(child['uuid'])
for new_root in new.values():
print('#### Found a chapter!')
existing_root = next((node for node in existing.values() if node['path'] == new_root['path']), None)
merge_recursive(existing_root, new_root, None)
def insert_heading(self, level: int, title: str, parent_uuid: Optional[str], document_id: int, path: str) -> str:
heading_uuid = str(uuid.uuid4())
self.db_manager.cursor.execute('''
INSERT INTO headings (uuid, level, title, parent_uuid, document_id, path)
VALUES (?, ?, ?, ?, ?, ?)
''', (heading_uuid, level, title, parent_uuid, document_id, path))
return heading_uuid
def update_heading(self, heading_uuid: str, title: str, level: int, parent_uuid: Optional[str], path: str) -> None:
self.db_manager.cursor.execute('''
UPDATE headings
SET title = ?, level = ?, parent_uuid = ?, path = ?, updated_timestamp = CURRENT_TIMESTAMP
WHERE uuid = ?
''', (title, level, parent_uuid, path, heading_uuid))
def insert_body(self, content: str, heading_uuid: str, document_id: int) -> str:
body_uuid = str(uuid.uuid4())
md5sum = hashlib.md5(content.encode()).hexdigest()
print(f"###### Trying to insert body text with md5sum of: {md5sum} to uuid: {body_uuid}, with content: \n{content}\n")
# Verify input parameters
if not all([content, heading_uuid, document_id]):
raise ValueError("Missing required parameters for insert_body")
try:
# Check if heading_uuid exists
self.db_manager.cursor.execute("SELECT 1 FROM headings WHERE uuid = ?", (heading_uuid,))
if not self.db_manager.cursor.fetchone():
raise ValueError(f"heading_uuid {heading_uuid} does not exist in headings table")
# Check if document_id exists
self.db_manager.cursor.execute("SELECT 1 FROM documents WHERE id = ?", (document_id,))
if not self.db_manager.cursor.fetchone():
raise ValueError(f"document_id {document_id} does not exist in documents table")
# Insert the body
self.db_manager.cursor.execute('''
INSERT INTO body (uuid, content, heading_uuid, document_id, md5sum)
VALUES (?, ?, ?, ?, ?)
''', (body_uuid, content, heading_uuid, document_id, md5sum))
self.db_manager.conn.commit()
print(f"###### Successfully inserted body with uuid: {body_uuid}")
except sqlite3.Error as e:
print(f"An error occurred while inserting body: {e}")
self.db_manager.conn.rollback()
raise
except ValueError as e:
print(f"Validation error: {e}")
raise
return body_uuid
def update_body(self, body_uuid: str, content: str, document_id: int) -> None:
md5sum = hashlib.md5(content.encode()).hexdigest()
self.db_manager.cursor.execute('''
UPDATE body
SET content = ?, md5sum = ?, updated_timestamp = CURRENT_TIMESTAMP
WHERE uuid = ? AND document_id = ?
''', (content, md5sum, body_uuid, document_id))
def soft_delete_heading(self, heading_uuid: str) -> None:
now = datetime.now().isoformat()
self.db_manager.cursor.execute('''
UPDATE headings
SET isDeleted = 1, deleted_timestamp = ?
WHERE uuid = ?
''', (now, heading_uuid))
# Also soft delete associated body content
self.db_manager.cursor.execute('''
UPDATE body
SET isDeleted = 1, deleted_timestamp = ?
WHERE heading_uuid = ?
''', (now, heading_uuid))
class TopicReader:
"""Reads and retrieves topics from the database."""
def __init__(self, db_manager: 'DatabaseManager'):
self.db_manager = db_manager
def fetch_headings(self) -> List[Tuple[str, str, int, Optional[str]]]:
self.db_manager.cursor.execute('''
SELECT uuid, title, level, parent_uuid
FROM headings
WHERE isDeleted = 0
ORDER BY level, headings_order
''')
return self.db_manager.cursor.fetchall()
def fetch_topic_chain(self, heading_uuid: str) -> List[Tuple[str, str, int]]:
chain = []
current_uuid = heading_uuid
while current_uuid is not None:
self.db_manager.cursor.execute('''
SELECT uuid, title, level, parent_uuid
FROM headings
WHERE uuid = ?
''', (current_uuid,))
result = self.db_manager.cursor.fetchone()
if result:
chain.append((result[0], result[1], result[2]))
current_uuid = result[3]
else:
break
return list(reversed(chain))
def list_headings(self) -> str:
headings = self.fetch_headings()
result = "Available headings:\n"
def build_tree(parent_uuid, level):
tree = ""
for uuid, title, hlevel, parent in headings:
if parent == parent_uuid:
indent = " " * (hlevel - 1)
tree += f"{indent}- {title}\n"
tree += build_tree(uuid, hlevel + 1)
return tree
result += build_tree(None, 1)
return result.strip()
def get_topic_content(self, input_title: str) -> Optional[str]:
heading_uuid = self.find_closest_heading(input_title)
if heading_uuid:
topic_chain = self.fetch_topic_chain(heading_uuid)
result = self.build_full_content(topic_chain[-1][0])
return result
return None
def build_full_content(self, heading_uuid: str, level_offset: int = 0) -> str:
self.db_manager.cursor.execute('''
SELECT h.level, h.title, b.content
FROM headings h
LEFT JOIN body b ON h.uuid = b.heading_uuid
WHERE h.uuid = ? AND h.isDeleted = 0
''', (heading_uuid,))
heading = self.db_manager.cursor.fetchone()
if not heading:
return ""
2024-10-04 11:57:30 +03:00
level, title, content = heading
adjusted_level = max(1, level - level_offset)
result = f"{'#' * adjusted_level} {title}\n\n"
if content:
result += f"{content.strip()}\n\n"
# Fetch and process all child headings
self.db_manager.cursor.execute('''
SELECT uuid FROM headings
WHERE parent_uuid = ? AND isDeleted = 0
ORDER BY level, headings_order
''', (heading_uuid,))
children = self.db_manager.cursor.fetchall()
for child in children:
result += self.build_full_content(child[0], level_offset)
return result
def find_closest_heading(self, input_title: str) -> Optional[str]:
headings = self.fetch_headings()
if not headings:
print("No topics found in the database.")
return None
heading_titles = [title for _, title, _, _ in headings]
closest_match, confidence = process.extractOne(input_title, heading_titles, scorer=fuzz.token_sort_ratio)
if confidence < 50:
print(f"No close matches found for '{input_title}' (Confidence: {confidence})")
return None
for heading_uuid, title, _, _ in headings:
if title == closest_match:
return heading_uuid
return None
def compute_file_hash(file_path: str) -> str:
"""
Compute the MD5 hash of a file.
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def generate_calendar(year: int, db_manager: 'DatabaseManager', document_id: int) -> str:
"""
Generate a markdown calendar for the specified year.
"""
calendar_markdown = f"# {year}\n\n"
current_date = datetime.now().date()
# Loop through the months
for month in range(1, 13):
month_name = datetime(year, month, 1).strftime('%B')
calendar_markdown += f"## {month:02d} / {month_name}\n\n"
# Determine the number of days in the month
if month == 12:
num_days = (datetime(year + 1, 1, 1) - datetime(year, month, 1)).days
else:
num_days = (datetime(year, month + 1, 1) - datetime(year, month, 1)).days
# Create calendar entries for each day in order
for day in range(1, num_days + 1):
day_date = datetime(year, month, day).date()
day_name = day_date.strftime('%a')
# Add bold styling for the current date
if str(day_date) == str(current_date):
calendar_markdown += f"**{day:02d} / {day_name}**\n"
else:
calendar_markdown += f"{day:02d} / {day_name}\n"
calendar_markdown += '\n'
# Now parse the markdown and insert into the database
#parse_and_insert_markdown(calendar_markdown, db_manager, document_id, year)
return calendar_markdown
def convert_to_html(markdown_content: str, heading_uuid: Optional[str] = None) -> str:
"""
Convert Markdown content (or specific section) to HTML.
"""
md = MarkdownIt()
if heading_uuid:
# Fetch content for a specific heading and its sub-headings from the database
# Example SQL to get heading content based on UUID:
# SELECT title, content FROM headings WHERE uuid = ?
pass
html_content = md.render(markdown_content)
# Wrap the content in a basic HTML structure
html_document = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; padding: 20px; }}
h1, h2, h3, h4, h5, h6 {{ margin-top: 24px; margin-bottom: 16px; }}
h1 {{ font-size: 2em; }}
h2 {{ font-size: 1.5em; }}
h3 {{ font-size: 1.25em; }}
</style>
</head>
<body>
{html_content}
</body>
</html>
"""
return html_document
def bootstrap_calendar(year: int, db_manager: DatabaseManager, markdown_file: str):
"""Generate and store a full year's markdown calendar in the database using UUIDs."""
document_manager = DocumentManager(db_manager)
markdown_processor = MarkdownProcessor(db_manager)
# Generate calendar markdown and insert into the database
print('## Generating calendar')
calendar_content = generate_calendar(year, db_manager, document_id=document_manager.create_document(f"{year} Calendar", markdown_file))
# Write the calendar to the markdown file
print('## Reading the newly created calendar')
with open(markdown_file, 'w', encoding='utf-8') as f:
f.write(calendar_content)
# Process the markdown to update or store in the database
markdown_processor.process_markdown(markdown_file, document_manager.document_exists(f"{year} Calendar")[0])
print(f"Calendar for year {year} has been generated and stored in the database.")
def main():
"""
This script processes a markdown file, updates an SQLite database,
and optionally selects a topic based on user input.
"""
parser = argparse.ArgumentParser(description='Process markdown file and optionally select a topic.')
parser.add_argument('-m', '--markdown', type=str, default='calendar.md', help='Input/output markdown file (default: calendar.md)')
parser.add_argument('-d', '--database', type=str, default='markdown.db', help='SQLite database file (default: markdown.db)')
parser.add_argument('topic_title', nargs='?', type=str, help='Topic title to select (fuzzy matching enabled)')
parser.add_argument('--bootstrap', action='store_true', help='Generate markdown calendar for the current year and load it to the database.')
parser.add_argument('--ls', action='store_true', help='List all available headings.')
parser.add_argument('--html', action='store_true', help='Generate an HTML version of the output.')
parser.add_argument('--uuid', type=str, help='Specify a UUID to retrieve content.')
2024-10-04 11:57:30 +03:00
parser.add_argument('--debug', action='store_true', help='Enable debug printing')
args = parser.parse_args()
2024-10-04 11:57:30 +03:00
# Setup basic logging
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
# Check for markdown file presence
if not os.path.exists(args.markdown) and not args.bootstrap:
print(f"Error: Markdown file '{args.markdown}' not found. Use --bootstrap to create a new calendar.")
db_manager.close()
return
# Check for databse file presence
if not os.path.exists(args.database) and not args.bootstrap:
print(f"Error: Database file '{args.database}' not found. Use --bootstrap to create a new calendar.")
db_manager.close()
return
# Initialize manager objects
db_manager = DatabaseManager(args.database)
if args.bootstrap:
print('## Running calendar bootstrap')
bootstrap_calendar(datetime.now().year, db_manager, args.markdown)
db_manager.close()
return
document_manager = DocumentManager(db_manager)
if args.ls:
topic_reader = TopicReader(db_manager)
print(topic_reader.list_headings())
db_manager.close()
return
# Topic or UUID-based content retrieval
if args.topic_title or args.uuid:
topic_reader = TopicReader(db_manager)
if args.uuid:
content = topic_reader.build_full_content(args.uuid)
else:
content = topic_reader.get_topic_content(args.topic_title)
if content:
# Write the selected content to the markdown file
with open(args.markdown, 'w', encoding='utf-8') as file:
file.write(content)
file.write('\n')
print(f"Selected content written to {args.markdown}")
# Optionally convert to HTML
if args.html:
html_file = f"{args.markdown}.html"
with open(html_file, 'w', encoding='utf-8') as file:
file.write(convert_to_html(content))
print(f"HTML version written to {html_file}")
db_manager.close()
if __name__ == '__main__':
main()