From 5aa1f078bcc1cae85da943d88692c39a2cedeace Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Thu, 6 Feb 2025 23:04:50 +0200 Subject: [PATCH] More acurate name for alarm_api, => clock. Made the UI parts to their own package and separate them to smaller files. --- clock/alarm_api.py | 151 +++++++++++++++++++++ clock/alarm_siren.py | 260 ++++++++++++++++++++++++++++++++++++ clock/alarm_storage.py | 263 +++++++++++++++++++++++++++++++++++++ clock/data_classes.py | 170 ++++++++++++++++++++++++ clock/logging_config.py | 16 +++ clock/main.py | 190 +++++++++++++++++++++++++++ clock/tests.sh | 173 ++++++++++++++++++++++++ clock/ui/__init__.py | 0 clock/ui/active_alarm.py | 69 ++++++++++ clock/ui/add_alarm.py | 219 ++++++++++++++++++++++++++++++ clock/ui/big_digits.py | 102 ++++++++++++++ clock/ui/input_handlers.py | 202 ++++++++++++++++++++++++++++ clock/ui/list_alarms.py | 86 ++++++++++++ clock/ui/main_clock.py | 41 ++++++ clock/ui/ncurses_ui.py | 186 ++++++++++++++++++++++++++ clock/ui/utils.py | 34 +++++ 16 files changed, 2162 insertions(+) create mode 100644 clock/alarm_api.py create mode 100644 clock/alarm_siren.py create mode 100644 clock/alarm_storage.py create mode 100644 clock/data_classes.py create mode 100644 clock/logging_config.py create mode 100755 clock/main.py create mode 100644 clock/tests.sh create mode 100644 clock/ui/__init__.py create mode 100644 clock/ui/active_alarm.py create mode 100644 clock/ui/add_alarm.py create mode 100644 clock/ui/big_digits.py create mode 100644 clock/ui/input_handlers.py create mode 100644 clock/ui/list_alarms.py create mode 100644 clock/ui/main_clock.py create mode 100644 clock/ui/ncurses_ui.py create mode 100644 clock/ui/utils.py diff --git a/clock/alarm_api.py b/clock/alarm_api.py new file mode 100644 index 0000000..8754e47 --- /dev/null +++ b/clock/alarm_api.py @@ -0,0 +1,151 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer +import logging +import os +import json +import hashlib +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +from datetime import datetime +from http import HTTPStatus +import re + +from alarm_storage import AlarmStorage +from data_classes import RepeatRule, Snooze, Metadata, Alarm +from logging_config import setup_logging + +# Set up logging configuration +logger = setup_logging() + +class AlertApi(BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + self.storage = AlarmStorage("data/alerts.json") + self.logger = logger + super().__init__(*args, **kwargs) + + def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None: + """Send a JSON response with the given status code and data/error""" + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + response = {} + if data is not None: + response["data"] = data + if error is not None: + response["error"] = error + + response_json = json.dumps(response) + self.logger.debug(f"Sending response: {response_json}") + self.wfile.write(response_json.encode("utf-8")) + + def _handle_request(self, method: str) -> None: + """Handle incoming requests with proper error handling""" + self.logger.info(f"Received {method} request from {self.client_address[0]}") + + try: + if method in ["POST", "PUT", "DELETE"]: + content_length = int(self.headers.get("Content-Length", 0)) + if content_length == 0: + raise ValueError("Missing request body") + + post_data = self.rfile.read(content_length) + self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}") + + post_data = json.loads(post_data) + else: + post_data = None + + # Route request to appropriate handler + handler = getattr(self, f"_handle_{method.lower()}", None) + if handler: + self.logger.debug(f"Routing to handler: _handle_{method.lower()}") + handler(post_data) + else: + self.logger.warning(f"Method not allowed: {method}") + self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed") + + except json.JSONDecodeError as e: + self.logger.error(f"JSON decode error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body") + except ValueError as e: + self.logger.error(f"Validation error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + except Exception as e: + self.logger.error(f"Unexpected error: {str(e)}", exc_info=True) + self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error") + + def _handle_get(self, _) -> None: + """Handle GET request""" + self.logger.debug("Processing GET request for all alarms") + alarms = self.storage.get_saved_alerts() + self.logger.debug(f"Retrieved {len(alarms)} alarms") + self._send_response(HTTPStatus.OK, data=alarms) + + def _handle_post(self, data: dict) -> None: + """Handle POST request""" + self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}") + try: + alarm_id = self.storage.save_new_alert(data) + self.logger.info(f"Successfully created new alarm with ID: {alarm_id}") + self._send_response(HTTPStatus.CREATED, data={"id": alarm_id}) + except ValueError as e: + self.logger.error(f"Failed to create alarm: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + + def _handle_put(self, data: dict) -> None: + """Handle PUT request""" + alarm_id = data.pop("id", None) + self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}") + + if alarm_id is None: + self.logger.error("PUT request missing alarm ID") + self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID") + return + + if self.storage.update_alert(alarm_id, data): + self.logger.info(f"Successfully updated alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"}) + else: + self.logger.warning(f"Alarm not found for update: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def _handle_delete(self, data: dict) -> None: + """Handle DELETE request""" + alarm_id = data.get("id") + self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}") + + if not isinstance(alarm_id, int): + self.logger.error(f"Invalid alarm ID format: {alarm_id}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID") + return + + if self.storage.remove_saved_alert(alarm_id): + self.logger.info(f"Successfully deleted alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"}) + else: + self.logger.warning(f"Alarm not found for deletion: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def do_GET(self): self._handle_request("GET") + def do_POST(self): self._handle_request("POST") + def do_PUT(self): self._handle_request("PUT") + def do_DELETE(self): self._handle_request("DELETE") + + +def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): + logger.info(f"Starting AlertApi on port {port}") + + server_address = ("", port) + httpd = server_class(server_address, handler_class) + + try: + logger.info("Server is ready to handle requests") + httpd.serve_forever() + except KeyboardInterrupt: + logger.info("Received shutdown signal") + except Exception as e: + logger.error(f"Server error: {str(e)}", exc_info=True) + finally: + httpd.server_close() + logger.info("Server stopped") diff --git a/clock/alarm_siren.py b/clock/alarm_siren.py new file mode 100644 index 0000000..5dd6dcd --- /dev/null +++ b/clock/alarm_siren.py @@ -0,0 +1,260 @@ +import os +import time +import threading +import subprocess +import queue +import logging +from datetime import datetime, timedelta +from typing import Optional, Dict, Any + +from logging_config import setup_logging + +# Set up logging +logger = setup_logging() + +class AlarmSiren: + def __init__(self): + # Communication queues + self.alarm_queue = queue.Queue() + self.control_queue = queue.Queue() + + # Tracking active alarms + self.active_alarms: Dict[int, Dict[str, Any]] = {} + + # Playback thread + self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True) + self.playback_thread.start() + + def schedule_alarm(self, alarm_config: Dict[str, Any]): + """Schedule an alarm based on its configuration""" + logger.info(f"Scheduling alarm: {alarm_config}") + self.alarm_queue.put(alarm_config) + + def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]: + """Calculate the next alarm trigger time based on repeat rule""" + now = datetime.now() + current_time = now.time() + + # Parse alarm time + alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time() + + # Determine the next trigger + repeat_rule = alarm_config['repeat_rule'] + + # Determine repeat rule type and details + if isinstance(repeat_rule, dict): + repeat_type = repeat_rule.get('type') + repeat_days = repeat_rule.get('days_of_week', []) + repeat_at = repeat_rule.get('at') + else: + # Assume it's an object-like structure with attributes + repeat_type = getattr(repeat_rule, 'type', None) + repeat_days = getattr(repeat_rule, 'days_of_week', []) + repeat_at = getattr(repeat_rule, 'at', None) + + # Sanity check + if repeat_type is None: + logger.error(f"Invalid repeat rule configuration: {repeat_rule}") + return None + + if repeat_type == 'once': + # For one-time alarm, check the specific date + try: + # If 'at' is None, use current date + if repeat_at is None: + specific_date = now.date() + else: + specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date() + return datetime.combine(specific_date, alarm_time) + except ValueError: + logger.error("Invalid one-time alarm configuration") + return None + + elif repeat_type == 'daily': + # Daily alarm - trigger today or tomorrow + next_trigger = datetime.combine(now.date(), alarm_time) + if current_time < alarm_time: + return next_trigger + return next_trigger + timedelta(days=1) + + elif repeat_type == 'weekly': + # Weekly alarm - check configured days + today = now.strftime("%A").lower() + configured_days = [day.lower() for day in repeat_days] + + if today in configured_days and current_time < alarm_time: + return datetime.combine(now.date(), alarm_time) + + # Find next configured day + days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] + current_index = days_order.index(today) + + for offset in range(1, 8): + next_day_index = (current_index + offset) % 7 + next_day = days_order[next_day_index] + + if next_day in configured_days: + next_date = now.date() + timedelta(days=offset) + return datetime.combine(next_date, alarm_time) + + return None + + def _play_audio(self, file_path: str, volume: int = 100): + """Play audio file using mpg123 in the background.""" + try: + if not os.path.exists(file_path): + logger.error(f"Audio file not found: {file_path}") + return None + + # Construct mpg123 command with volume control + volume_adjust = f"-g {volume}" + cmd = ["mpg123", volume_adjust, file_path] + + logger.info(f"Playing alarm: {file_path}") + + # Run mpg123 in the background, suppressing stdout/stderr + process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + return process + except Exception as e: + logger.error(f"Error playing audio: {e}") + return None + + def _playback_worker(self): + """Background thread for managing alarm playback""" + while True: + try: + # Check for new alarms to schedule + try: + new_alarm = self.alarm_queue.get(timeout=1) + alarm_time = self._calculate_next_alarm_time(new_alarm) + if alarm_time: + alarm_id = new_alarm.get('id', id(new_alarm)) + self.active_alarms[alarm_id] = { + 'config': new_alarm, + 'trigger_time': alarm_time, + 'snooze_count': 0, + 'process': None + } + except queue.Empty: + pass + + # Check for control signals (snooze/dismiss) + try: + control_msg = self.control_queue.get(timeout=0.1) + alarm_id = control_msg.get('alarm_id') + if control_msg['type'] == 'snooze': + self.snooze_alarm(alarm_id) + elif control_msg['type'] == 'dismiss': + self.dismiss_alarm(alarm_id) + except queue.Empty: + pass + + # Check for alarms to trigger + now = datetime.now() + for alarm_id, alarm_info in list(self.active_alarms.items()): + # Check if alarm is more than 1 hour late + if now > alarm_info['trigger_time'] + timedelta(hours=1): + logger.warning(f"Alarm {alarm_id} is over 1 hour late. Disabling.") + del self.active_alarms[alarm_id] + continue + if now >= alarm_info['trigger_time']: + # Trigger alarm if not already active + if alarm_info['process'] is None: + alarm_info['process'] = self._play_audio( + alarm_info['config']['file_to_play'], + alarm_info['config'].get('metadata', {}).get('volume', 100) + ) + logger.info(f"Alarm {alarm_id} triggered at {now}.") + + # Notify UI about the triggered alarm + self.control_queue.put({ + 'type': 'trigger', + 'alarm_id': alarm_id, + 'info': alarm_info + }) + + # Handle alarms that have been snoozed or dismissed + if alarm_info['process'] and alarm_info['process'].poll() is not None: + # Process has finished naturally + next_trigger = self._calculate_next_alarm_time(alarm_info['config']) + if next_trigger: + alarm_info['trigger_time'] = next_trigger + alarm_info['process'] = None + else: + logger.info(f"Removing non-repeating alarm {alarm_id}.") + del self.active_alarms[alarm_id] + + # Actively clean up zombie processes + for alarm_id, alarm_info in list(self.active_alarms.items()): + process = alarm_info.get('process') + if process and process.poll() is not None: + # Remove terminated processes + alarm_info['process'] = None + + time.sleep(0.5) # Prevent tight loop + except Exception as e: + logger.error(f"Error in playback worker: {e}") + time.sleep(1) + + def snooze_alarm(self, alarm_id: int): + """Snooze a specific alarm""" + if alarm_id not in self.active_alarms: + logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms") + return False + + alarm_info = self.active_alarms[alarm_id] + alarm_config = alarm_info['config'] + + # Default snooze configuration if not provided + snooze_config = alarm_config.get('snooze', { + 'enabled': True, + 'duration': 5, # Default 5 minutes + 'max_count': 3 # Default max 3 snoozes + }) + + if not snooze_config.get('enabled', True): + logger.warning(f"Snooze not enabled for alarm {alarm_id}") + return False + + # Check snooze count + if alarm_info.get('snooze_count', 0) >= snooze_config['max_count']: + logger.warning(f"Maximum snooze count reached for alarm {alarm_id}") + return False + + # Increment snooze count and set next trigger time + alarm_info['snooze_count'] = alarm_info.get('snooze_count', 0) + 1 + snooze_duration = snooze_config.get('duration', 5) + alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration) + + # Stop any active playback + process = alarm_info.get('process') + if process: + process.terminate() + process.wait() + alarm_info['process'] = None + + logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes.") + return True + +def dismiss_alarm(self, alarm_id: int): + """Dismiss a specific alarm.""" + if alarm_id not in self.active_alarms: + logger.warning(f"Cannot dismiss alarm {alarm_id} - not found in active alarms") + return False + + # Stop playback and terminate any running process + alarm_info = self.active_alarms.pop(alarm_id, None) + if alarm_info and alarm_info.get('process'): + try: + alarm_info['process'].terminate() + alarm_info['process'].wait(timeout=2) + except Exception as e: + logger.error(f"Error terminating alarm process: {e}") + # Force kill if terminate fails + try: + alarm_info['process'].kill() + except Exception: + pass + + logger.info(f"Dismissed alarm {alarm_id}.") + return True diff --git a/clock/alarm_storage.py b/clock/alarm_storage.py new file mode 100644 index 0000000..90dc2ba --- /dev/null +++ b/clock/alarm_storage.py @@ -0,0 +1,263 @@ +import logging +import os +import json +import hashlib +from typing import List, Optional, Dict, Any +from datetime import datetime +import re +from dataclasses import asdict + +from data_classes import RepeatRule, Snooze, Metadata, Alarm + +# Set up logging configuration +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] +) + +logger = logging.getLogger('AlertApi') + +class AlarmStorage: + def __init__(self, file_path: str, siren=None): + """Initialize AlarmStorage with optional Siren integration""" + self.file_path = file_path + self.siren = siren + self._ensure_storage_directory() + + def _ensure_storage_directory(self) -> None: + """Ensure the directory for the storage file exists""" + directory = os.path.dirname(os.path.abspath(self.file_path)) + if directory: + os.makedirs(directory, exist_ok=True) + + @staticmethod + def _generate_alarm_hash(alarm: dict) -> str: + """Generate an MD5 hash for the given alarm data""" + def to_dict(value): + return asdict(value) if hasattr(value, '__dataclass_fields__') else value + + alarm_data = alarm.copy() + alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) + alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) + alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) + + hash_input = json.dumps({ + "name": alarm_data["name"], + "time": alarm_data["time"], + "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), + "repeat_rule": alarm_data["repeat_rule"] + }, sort_keys=True) + return hashlib.md5(hash_input.encode('utf-8')).hexdigest() + + def load_data(self) -> dict: + """Load the JSON data from the file""" + try: + with open(self.file_path, 'r') as file: + data = json.load(file) + if not isinstance(data, dict): + raise ValueError("Invalid data format") + return data + except (FileNotFoundError, json.JSONDecodeError): + return {"last_id": 0, "alarms": []} + + def save_data(self, data: dict) -> None: + """Save the JSON data back to the file""" + def convert_to_dict(obj): + """Convert dataclass instances to dictionaries recursively""" + if isinstance(obj, list): + return [convert_to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {key: convert_to_dict(value) for key, value in obj.items()} + elif hasattr(obj, "__dataclass_fields__"): + return asdict(obj) + else: + return obj + + # Convert data to JSON-serializable format + serializable_data = convert_to_dict(data) + + with open(self.file_path, 'w') as file: + json.dump(serializable_data, file, indent=4) + + def save_new_alert(self, alarm_data: dict) -> int: + """ + Add a new alarm to the storage and notify Siren if configured + """ + # Check for potential duplicates before saving + existing_alarms = self.get_saved_alerts() + for alarm in existing_alarms: + # Compare key characteristics + if (alarm['name'] == alarm_data['name'] and + alarm['time'] == alarm_data['time'] and + alarm['repeat_rule'] == alarm_data['repeat_rule']): + raise ValueError("Duplicate alarm already exists") + + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + # Load existing data + data = self.load_data() + + # Increment last_id and assign to new alarm + data['last_id'] += 1 + alarm_data['id'] = data['last_id'] + + # Generate hash for the alarm + alarm_data['hash'] = self._generate_alarm_hash(alarm_data) + + # Check for duplicates + if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]): + raise ValueError("Duplicate alarm detected") + + # Add new alarm to the list + data['alarms'].append(alarm_data) + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren and alarm_data.get('enabled', True): + try: + self.siren.schedule_alarm(alarm_data) + logger.info(f"Siren notified of new alarm: {alarm_data['id']}") + except Exception as e: + logger.error(f"Failed to notify Siren about new alarm: {e}") + + return data['last_id'] + + def get_saved_alerts(self) -> list: + """Retrieve the list of saved alarms""" + return self.load_data().get("alarms", []) + + def remove_saved_alert(self, alarm_id: int) -> bool: + """Remove an alarm by its ID""" + data = self.load_data() + original_length = len(data["alarms"]) + + # Filter out the alarm with the given ID + data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] + + if len(data["alarms"]) == original_length: + return False + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren: + try: + self.siren.dismiss_alarm(alarm_id) + logger.info(f"Siren dismissed alarm: {alarm_id}") + except Exception as e: + logger.error(f"Failed to notify Siren about removed alarm: {e}") + + return True + + def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: + """Update an existing alarm""" + alarm_data["id"] = alarm_id + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + # Load existing data + data = self.load_data() + + # Find and update the alarm + for i, existing_alarm in enumerate(data["alarms"]): + if existing_alarm["id"] == alarm_id: + # Generate new hash + alarm_data["hash"] = self._generate_alarm_hash(alarm_data) + + # Replace the existing alarm + data["alarms"][i] = alarm_data + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren: + try: + # Remove existing alarm and reschedule + self.siren.dismiss_alarm(alarm_id) + + if alarm_data.get('enabled', True): + self.siren.schedule_alarm(alarm_data) + logger.info(f"Siren updated for alarm: {alarm_id}") + else: + logger.info(f"Siren dismissed disabled alarm: {alarm_id}") + except Exception as e: + logger.error(f"Failed to notify Siren about updated alarm: {e}") + + return True + + # Alarm not found + return False + + def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]: + """ + Retrieve a specific alarm by its ID + + Args: + alarm_id (int): ID of the alarm to retrieve + + Returns: + dict or None: Alarm configuration or None if not found + """ + data = self.load_data() + for alarm in data.get('alarms', []): + if alarm.get('id') == alarm_id: + return alarm + return None + + def set_siren(self, siren): + """ + Set or update the Siren instance for alarm synchronization + + Args: + siren (AlarmSiren): Siren instance to use for alarm management + """ + self.siren = siren diff --git a/clock/data_classes.py b/clock/data_classes.py new file mode 100644 index 0000000..cb4ebea --- /dev/null +++ b/clock/data_classes.py @@ -0,0 +1,170 @@ +import hashlib +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +from datetime import datetime +import os +import re +from logging_config import setup_logging + +# Set up logging +logger = setup_logging() + +@dataclass +class RepeatRule: + type: str # "daily" or "weekly" or "once" + days_of_week: Optional[List[str]] = field(default_factory=list) + at: Optional[str] = None + + def validate(self) -> bool: + """Validate repeat rule configuration""" + valid_types = {"daily", "weekly", "once"} + valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} + + if self.type not in valid_types: + logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") + return False + + if self.type == "weekly": + invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] + if invalid_days: + logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") + return False + if not self.days_of_week: + logger.error("Weekly repeat rule must specify at least one day") + return False + + if self.type == "once" and self.days_of_week: + if self.days_of_week: + logger.error("One-time alert does not support days_of_week") + return False + if not self.at: + logger.error("One-time repeat rule must specify a valid 'at' date") + return False + try: + datetime.strptime(self.at, "%d.%m.%Y") + except ValueError: + logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'") + return False + + logger.debug(f"RepeatRule validation passed: {self.__dict__}") + return True + + +@dataclass +class Snooze: + enabled: bool = True + duration: int = 10 # minutes + max_count: int = 3 + + def validate(self) -> bool: + """Validate snooze configuration""" + if not isinstance(self.enabled, bool): + logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}") + return False + + if not isinstance(self.duration, int): + logger.error(f"Snooze duration must be integer, got {type(self.duration)}") + return False + + if self.duration <= 0: + logger.error(f"Snooze duration must be positive, got {self.duration}") + return False + + if not isinstance(self.max_count, int): + logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}") + return False + + if self.max_count <= 0: + logger.error(f"Snooze max_count must be positive, got {self.max_count}") + return False + + logger.debug(f"Snooze validation passed: {self.__dict__}") + return True + + +@dataclass +class Metadata: + volume: int = 100 + notes: str = "" + md5sum: str = "" + + def validate(self) -> bool: + """Validate metadata configuration""" + if not isinstance(self.volume, int): + logger.error(f"Volume must be integer, got {type(self.volume)}") + return False + + if not 0 <= self.volume <= 100: + logger.error(f"Volume must be between 0 and 100, got {self.volume}") + return False + + logger.debug(f"Metadata validation passed: {self.__dict__}") + return True + + +@dataclass +class Alarm: + name: str + time: str # Format: "HH:MM:SS" + repeat_rule: RepeatRule + file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + enabled: bool = field(default=True) + snooze: Snooze = field(default_factory=Snooze) + metadata: Metadata = field(default_factory=Metadata) + id: Optional[int] = field(default=None) + + def validate(self) -> bool: + """Validate complete alarm configuration""" + logger.debug(f"Starting validation for alarm: {self.name}") + + # Validate name + if not isinstance(self.name, str) or len(self.name.strip()) == 0: + logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string") + return False + + # Validate time format + time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$') + if not time_pattern.match(self.time): + logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS") + return False + + # Validate enabled flag + if not isinstance(self.enabled, bool): + logger.error(f"Enabled must be boolean, got {type(self.enabled)}") + return False + + # Create default alarms directory if it doesn't exist + default_alarms_dir = os.path.expanduser("~/.alarms") + os.makedirs(default_alarms_dir, exist_ok=True) + + # Validate audio file + if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play): + logger.error(f"Audio file not found: '{self.file_to_play}'") + return False + + # Validate repeat rule + if not isinstance(self.repeat_rule, RepeatRule): + logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}") + return False + if not self.repeat_rule.validate(): + logger.error("Repeat rule validation failed") + return False + + # Validate snooze + if not isinstance(self.snooze, Snooze): + logger.error(f"Invalid snooze type: {type(self.snooze)}") + return False + if not self.snooze.validate(): + logger.error("Snooze validation failed") + return False + + # Validate metadata + if not isinstance(self.metadata, Metadata): + logger.error(f"Invalid metadata type: {type(self.metadata)}") + return False + if not self.metadata.validate(): + logger.error("Metadata validation failed") + return False + + logger.debug(f"Alarm validation passed: {self.name}") + return True diff --git a/clock/logging_config.py b/clock/logging_config.py new file mode 100644 index 0000000..044ba8a --- /dev/null +++ b/clock/logging_config.py @@ -0,0 +1,16 @@ +import logging +import os + +def setup_logging(log_dir="logs", log_file="alarm_system.log", level=logging.DEBUG): + os.makedirs(log_dir, exist_ok=True) + log_path = os.path.join(log_dir, log_file) + + logging.basicConfig( + level=level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_path, mode='a', encoding='utf-8') + ] + ) + logger = logging.getLogger("AlarmSystem") + return logger diff --git a/clock/main.py b/clock/main.py new file mode 100755 index 0000000..03c2602 --- /dev/null +++ b/clock/main.py @@ -0,0 +1,190 @@ +#!/usr/bin/python3 + +import os +import sys +import signal +import threading +import logging +import curses +from http.server import HTTPServer +from multiprocessing import Queue +import subprocess + +# Import our custom modules +from alarm_api import AlertApi, run as run_api +from alarm_storage import AlarmStorage +from alarm_siren import AlarmSiren +from ui.ncurses_ui import UI +from logging_config import setup_logging + +# Set up logging +logger = setup_logging() + +class AlarmSystemManager: + def __init__(self, + api_port: int = 8000, + storage_path: str = "data/alerts.json", + log_level: int = logging.DEBUG): + """ + Initialize and manage the entire alarm system + + Args: + api_port (int): Port for the HTTP API server + storage_path (str): Path to the JSON storage file + log_level (int): Logging level + """ + # Configure logging + logging.getLogger().setLevel(log_level) + + # Ensure storage directory exists + os.makedirs(os.path.dirname(os.path.abspath(storage_path)), exist_ok=True) + + # Initialize components + self.siren = AlarmSiren() + self.storage = AlarmStorage(storage_path, siren=self.siren) + + # API server setup + self.api_port = api_port + self.api_server = None + self.api_thread = None + + # Synchronization and lifecycle management + self.stop_event = threading.Event() + + # Signal handling setup + self._setup_signal_handlers() + + # Alarm synchronization + self._sync_alarms() + + # UI.. + self.ui = UI(self, control_queue=self.siren.control_queue) + + def _setup_signal_handlers(self): + """Set up signal handlers for graceful shutdown""" + signal.signal(signal.SIGINT, self._handle_shutdown) + signal.signal(signal.SIGTERM, self._handle_shutdown) + + def _handle_shutdown(self, signum, frame): + """ + Handle system shutdown signals + + Args: + signum (int): Signal number + frame (frame): Current stack frame + """ + logger.info(f"Received shutdown signal {signum}. Initiating graceful shutdown...") + self.stop_event.set() + + def _sync_alarms(self): + """ + Synchronize stored alarms with the siren + This ensures any saved alarms are loaded and scheduled + """ + try: + saved_alarms = self.storage.get_saved_alerts() + logger.info(f"Synchronizing {len(saved_alarms)} saved alarms") + + for alarm in saved_alarms: + if alarm.get('enabled', True): + self.siren.schedule_alarm(alarm) + except Exception as e: + logger.error(f"Error synchronizing alarms: {e}") + + def _start_api_server(self): + """ + Start the HTTP API server in a separate thread + """ + try: + def run_server(): + server_address = ("", self.api_port) + self.api_server = HTTPServer(server_address, AlertApi) + logger.info(f"API Server started on port {self.api_port}") + + # Run until stopped + while not self.stop_event.is_set(): + self.api_server.handle_request() + + self.api_server.server_close() + logger.info("API Server stopped") + + # Start API server in a thread + self.api_thread = threading.Thread(target=run_server, daemon=True) + self.api_thread.start() + except Exception as e: + logger.error(f"Failed to start API server: {e}") + self.stop_event.set() + + def run(self): + """ + Main run method to start all system components + """ + try: + # Start API server + self._start_api_server() + + # Start UI + ui_thread = self.ui.run() + + # Log system startup + logger.info("Alarm System started successfully") + + # Wait for shutdown signal + while not self.stop_event.is_set(): + self.stop_event.wait(timeout=1) + + # Graceful shutdown + logger.info("Initiating system shutdown...") + except Exception as e: + logger.error(f"System startup error: {e}") + finally: + self._cleanup() + + def _cleanup(self): + """ + Perform cleanup operations during shutdown + """ + # Close API server if running + if self.api_server: + try: + self.api_server.server_close() + except Exception as e: + logger.error(f"Error closing API server: {e}") + + # Stop threads + if self.api_thread and self.api_thread.is_alive(): + self.api_thread.join(timeout=2) + + # Kill any remaining mpg123 processes + try: + subprocess.run(['pkill', 'mpg123'], check=False) + except Exception as e: + logger.error(f"Error killing mpg123 processes: {e}") + + logger.info("Alarm System shutdown complete") + +def main(): + """ + Entry point for the Alarm System + Parse command-line arguments and start the system + """ + # Default configurations + default_port = 8000 + default_storage_path = "data/alerts.json" + + # Parse port from command line if provided + port = int(sys.argv[1]) if len(sys.argv) > 1 else default_port + + # Create and run the Alarm System + try: + alarm_system = AlarmSystemManager( + api_port=port, + storage_path=default_storage_path + ) + alarm_system.run() + except Exception as e: + logger.error(f"Fatal error starting Alarm System: {e}", exc_info=True) + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/clock/tests.sh b/clock/tests.sh new file mode 100644 index 0000000..36088c1 --- /dev/null +++ b/clock/tests.sh @@ -0,0 +1,173 @@ +#!/bin/bash + +# Configuration +API_URL="http://localhost:8000" +TEST_ALARM_NAME="Test Alarm" +TEST_ALARM_TIME="08:30:00" +TEST_AUDIO_FILE="/tmp/test.mp3" + +# Color codes for output +GREEN='\033[0;32m' +RED='\033[0;31m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Helper function to print status messages +print_status() { + echo -e "${BLUE}=== $1 ===${NC}" +} + +# Helper function to check test results +check_result() { + if [ $? -eq 0 ]; then + echo -e "${GREEN}✓ $1${NC}" + else + echo -e "${RED}✗ $1${NC}" + exit 1 + fi +} + +# 1. Test server connectivity +print_status "Testing server connectivity" +curl -s "$API_URL" > /dev/null +check_result "Server connectivity check" + +# 2. Get initial state +print_status "Getting initial state" +INITIAL_STATE=$(curl -s -X GET "$API_URL") +echo "$INITIAL_STATE" | jq . +check_result "Retrieved initial state" + +# 3. Add a new alarm (POST) +print_status "Adding new alarm" +POST_DATA=$(jq -n \ + --arg name "$TEST_ALARM_NAME" \ + --arg time "$TEST_ALARM_TIME" \ + '{ + "name": $name, + "time": $time, + "repeat_rule": { + "type": "weekly", + "days_of_week": ["monday", "wednesday", "friday"] + }, + "enabled": true, + "snooze": { + "enabled": true, + "duration": 5, + "max_count": 3 + }, + "metadata": { + "volume": 75, + "notes": "Test alarm with full configuration" + } + }' +) + +ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") +NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id') + +if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then + echo -e "${RED}Failed to add alarm${NC}" + echo "Response: $ADD_RESPONSE" + exit 1 +fi + +echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}" + +#j# 4. Test duplicate alarm detection +#jprint_status "Testing duplicate alarm detection" +#jDUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") +#jif [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then +#j echo -e "${GREEN}✓ Duplicate detection working${NC}" +#jelse +#j echo -e "${RED}✗ Duplicate detection failed${NC}" +#j echo "Response: $DUPLICATE_RESPONSE" +#jfi +#j +#j# 5. Update the alarm (PUT) +#jprint_status "Updating alarm" +#jUPDATE_DATA=$(jq -n \ +#j --arg name "$TEST_ALARM_NAME Updated" \ +#j --arg time "$TEST_ALARM_TIME" \ +#j --argjson id "$NEW_ALARM_ID" \ +#j '{ +#j "id": $id, +#j "name": $name, +#j "time": $time, +#j "repeat_rule": { +#j "type": "daily" +#j }, +#j "enabled": true, +#j "snooze": { +#j "enabled": false, +#j "duration": 10, +#j "max_count": 2 +#j }, +#j "metadata": { +#j "volume": 90, +#j "notes": "Updated test alarm" +#j } +#j }' +#j) +#j +#jUPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL") +#jif [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then +#j echo -e "${GREEN}✓ Alarm update successful${NC}" +#jelse +#j echo -e "${RED}✗ Alarm update failed${NC}" +#j echo "Response: $UPDATE_RESPONSE" +#jfi +#j +#j# 6. Verify the update +#jprint_status "Verifying update" +#jUPDATED_STATE=$(curl -s -X GET "$API_URL") +#jUPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)") +#jif [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then +#j echo -e "${GREEN}✓ Update verification successful${NC}" +#jelse +#j echo -e "${RED}✗ Update verification failed${NC}" +#j echo "Current alarm state: $UPDATED_ALARM" +#jfi +#j +#j# 7. Test invalid inputs +#jprint_status "Testing invalid inputs" +#j +#j# Test invalid time format +#jINVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}') +#jINVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL") +#jif [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then +#j echo -e "${GREEN}✓ Invalid time format detection working${NC}" +#jelse +#j echo -e "${RED}✗ Invalid time format detection failed${NC}" +#jfi +#j +#j# Test invalid repeat rule +#jINVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"') +#jINVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL") +#jif [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then +#j echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}" +#jelse +#j echo -e "${RED}✗ Invalid repeat rule detection failed${NC}" +#jfi +#j +#j# 8. Delete the test alarm +#jprint_status "Deleting test alarm" +#jDELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL") +#jif [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then +#j echo -e "${GREEN}✓ Alarm deletion successful${NC}" +#jelse +#j echo -e "${RED}✗ Alarm deletion failed${NC}" +#j echo "Response: $DELETE_RESPONSE" +#jfi +#j +#j# 9. Verify deletion +#jprint_status "Verifying deletion" +#jFINAL_STATE=$(curl -s -X GET "$API_URL") +#jif [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then +#j echo -e "${GREEN}✓ Deletion verification successful${NC}" +#jelse +#j echo -e "${RED}✗ Deletion verification failed${NC}" +#j echo "Current state: $FINAL_STATE" +#jfi +#j +#jprint_status "Test suite completed!" diff --git a/clock/ui/__init__.py b/clock/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/clock/ui/active_alarm.py b/clock/ui/active_alarm.py new file mode 100644 index 0000000..ad9d009 --- /dev/null +++ b/clock/ui/active_alarm.py @@ -0,0 +1,69 @@ +import curses +from datetime import datetime +from .utils import init_colors, draw_big_digit + +def draw_active_alarms(stdscr, context): + """Draw the active alarms""" + init_colors() + height, width = stdscr.getmaxyx() + current_time = datetime.now() + + # Draw the main clock (original position) + time_str = current_time.strftime("%H:%M:%S") + digit_width = 14 + total_width = digit_width * len(time_str) + start_x = (width - total_width) // 2 + start_y = (height - 7) // 2 - 4 # Original position from _draw_main_clock + + # Draw blinking dot + if int(current_time.timestamp()) % 2 == 0: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(start_y - 1, start_x + total_width - 2, "•") + stdscr.attroff(curses.color_pair(1)) + + # Green color for big time + stdscr.attron(curses.color_pair(1)) + for i, digit in enumerate(time_str): + draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit) + stdscr.attroff(curses.color_pair(1)) + + # Draw date (as in main clock) + date_str = current_time.strftime("%Y-%m-%d") + date_x = width // 2 - len(date_str) // 2 + date_y = height // 2 + 4 + stdscr.attron(curses.color_pair(2)) + stdscr.addstr(date_y, date_x, date_str) + stdscr.attroff(curses.color_pair(2)) + + # Get active alarm info + active_alarms = context.get('active_alarms', {}) + if not active_alarms: + return + + # Get the first (or only) active alarm + alarm_id = list(active_alarms.keys())[0] + alarm_info = active_alarms[alarm_id] + alarm_config = alarm_info['config'] + + # Format alarm info + alarm_name = alarm_config.get('name', 'Unnamed Alarm') + alarm_time = alarm_config.get('time', 'Unknown Time') + #snooze_count = alarm_info.get('snooze_count', 0) + + # Draw alarm info under the clock + info_y = start_y + 8 # Position below the clock + #alarm_str = f"[ {alarm_name} - {alarm_time} - Snoozed: {snooze_count}x ]" + alarm_str = f"[ {alarm_name} - {alarm_time} ]" + alarm_x = max(0, width // 2 - len(alarm_str) // 2) + alarm_y = date_y - 2 # Just above the date + + # Center the alarm info + info_x = max(0, width // 2 - len(alarm_str) // 2) + + # Draw with green color + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(alarm_y, alarm_x, alarm_str) + stdscr.attroff(curses.color_pair(1)) + + # Instructions + stdscr.addstr(height - 2, width // 2 - 15, "S: Snooze D: Dismiss") diff --git a/clock/ui/add_alarm.py b/clock/ui/add_alarm.py new file mode 100644 index 0000000..fc691a1 --- /dev/null +++ b/clock/ui/add_alarm.py @@ -0,0 +1,219 @@ +import curses +from datetime import datetime +from .utils import init_colors, draw_big_digit + +def draw_add_alarm(stdscr, context): + """Draw the add alarm screen""" + # Ensure context is a dictionary with default values + if context is None: + context = {} + + # Provide default values with more explicit checks + context = { + 'new_alarm_selected': context.get('new_alarm_selected', 0), + 'new_alarm_name': context.get('new_alarm_name', 'New Alarm'), + 'new_alarm_hour': context.get('new_alarm_hour', datetime.now().hour), + 'new_alarm_minute': context.get('new_alarm_minute', datetime.now().minute), + 'new_alarm_enabled': context.get('new_alarm_enabled', True), + 'new_alarm_date': context.get('new_alarm_date') or None, + 'new_alarm_weekdays': context.get('new_alarm_weekdays', []) or [], + 'weekday_names': context.get('weekday_names', ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']), + 'date_edit_pos': context.get('date_edit_pos', 2) # 0 = year, 1 = month, 2 = day + } + + init_colors() + height, width = stdscr.getmaxyx() + + # Center the form vertically with good spacing + form_y = height // 2 - 8 + + # Title with green color and bold + title = "Add New Alarm" + stdscr.attron(curses.color_pair(1) | curses.A_BOLD) + stdscr.addstr(form_y, width // 2 - len(title) // 2, title) + stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) + + # Helper function to draw a labeled field + def draw_field(y, label, value, is_selected, center_offset=0): + label_str = f"{label}: " + total_width = len(label_str) + len(str(value)) + x = width // 2 - total_width // 2 + center_offset + + # Draw label in green + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(y, x, label_str) + stdscr.attroff(curses.color_pair(1)) + + # Draw value (highlighted if selected) + if is_selected: + stdscr.attron(curses.color_pair(2)) # Yellow for selected + stdscr.addstr(y, x + len(label_str), str(value)) + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) # Green for normal + stdscr.addstr(y, x + len(label_str), str(value)) + stdscr.attroff(curses.color_pair(1)) + + # Name field with proper spacing + name_str = str(context['new_alarm_name']) + draw_field(form_y + 2, "Name", name_str, context['new_alarm_selected'] == 4) + + # Time selection with centered layout + time_y = form_y + 4 + time_label = "Time: " + hour_str = f"{int(context['new_alarm_hour']):02d}" + minute_str = f"{int(context['new_alarm_minute']):02d}" + + # Calculate center position for time + time_total_width = len(time_label) + 5 # 5 = HH:MM + time_x = width // 2 - time_total_width // 2 + + # Draw time label + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(time_y, time_x, time_label) + stdscr.attroff(curses.color_pair(1)) + + # Draw hour + if context['new_alarm_selected'] == 0: + stdscr.attron(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(time_y, time_x + len(time_label), hour_str) + if context['new_alarm_selected'] == 0: + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attroff(curses.color_pair(1)) + + # Draw colon + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(time_y, time_x + len(time_label) + 2, ":") + stdscr.attroff(curses.color_pair(1)) + + # Draw minute + if context['new_alarm_selected'] == 1: + stdscr.attron(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(time_y, time_x + len(time_label) + 3, minute_str) + if context['new_alarm_selected'] == 1: + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attroff(curses.color_pair(1)) + + # Draw weekdays + weekday_y = form_y + 6 + weekdays_label = "Repeat: " + weekday_x = width // 2 - (len(weekdays_label) + len(context['weekday_names']) * 4) // 2 + + # Draw label + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(weekday_y, weekday_x, weekdays_label) + stdscr.attroff(curses.color_pair(1)) + + # Draw each weekday + for i, day in enumerate(context['weekday_names']): + x_pos = weekday_x + len(weekdays_label) + i * 4 + is_selected = context['new_alarm_selected'] == 3 and i == context.get('weekday_edit_pos', 0) + is_active = i in context['new_alarm_weekdays'] + + if is_selected: + stdscr.attron(curses.color_pair(2)) + elif is_active: + stdscr.attron(curses.color_pair(1) | curses.A_BOLD) + else: + stdscr.attron(curses.color_pair(1)) + + stdscr.addstr(weekday_y, x_pos, day) + + if is_selected: + stdscr.attroff(curses.color_pair(2)) + elif is_active: + stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) + else: + stdscr.attroff(curses.color_pair(1)) + + # Date selection + date_y = form_y + 8 + + if context['new_alarm_weekdays']: + draw_field(date_y, "Date", "Repeating weekly", context['new_alarm_selected'] == 2) + else: + date_label = "Date: " + if context.get('new_alarm_date'): + date = context['new_alarm_date'] + date_edit_pos = context.get('date_edit_pos', 0) + + # Calculate center position + total_width = len(date_label) + len("YYYY-MM-DD") + date_x = width // 2 - total_width // 2 + + # Draw label + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, date_x, date_label) + stdscr.attroff(curses.color_pair(1)) + + # Draw date components + year_str = f"{date.year}" + month_str = f"{date.month:02d}" + day_str = f"{date.day:02d}" + current_x = date_x + len(date_label) + + # Draw year + if context['new_alarm_selected'] == 2 and date_edit_pos == 0: + stdscr.attron(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, current_x, year_str) + if context['new_alarm_selected'] == 2 and date_edit_pos == 0: + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attroff(curses.color_pair(1)) + current_x += len(year_str) + + # Draw first dash + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, current_x, "-") + stdscr.attroff(curses.color_pair(1)) + current_x += 1 + + # Draw month + if context['new_alarm_selected'] == 2 and date_edit_pos == 1: + stdscr.attron(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, current_x, month_str) + if context['new_alarm_selected'] == 2 and date_edit_pos == 1: + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attroff(curses.color_pair(1)) + current_x += len(month_str) + + # Draw second dash + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, current_x, "-") + stdscr.attroff(curses.color_pair(1)) + current_x += 1 + + # Draw day + if context['new_alarm_selected'] == 2 and date_edit_pos == 2: + stdscr.attron(curses.color_pair(2)) + else: + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(date_y, current_x, day_str) + if context['new_alarm_selected'] == 2 and date_edit_pos == 2: + stdscr.attroff(curses.color_pair(2)) + else: + stdscr.attroff(curses.color_pair(1)) + else: + draw_field(date_y, "Date", "No specific date", context['new_alarm_selected'] == 2) + + # Enabled/Disabled toggle with visual indicator + status_y = form_y + 10 + enabled_str = "● Enabled" if context['new_alarm_enabled'] else "○ Disabled" + draw_field(status_y, "Status", enabled_str, context['new_alarm_selected'] == 5, -2) + + # Instructions in green at the bottom + instructions = "j/k: Change h/l: Switch Space: Toggle Enter: Save Esc: Cancel" + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions) + stdscr.attroff(curses.color_pair(1)) diff --git a/clock/ui/big_digits.py b/clock/ui/big_digits.py new file mode 100644 index 0000000..df5d452 --- /dev/null +++ b/clock/ui/big_digits.py @@ -0,0 +1,102 @@ +# Big digit patterns (15x7 size) +BIG_DIGITS = { + '0': [ + " █████████ ", + " ███████████ ", + "███ ████", + "███ ████", + "███ ████", + " ███████████ ", + " █████████ " + ], + '1': [ + " ████ ", + " ██████ ", + " ████ ", + " ████ ", + " ████ ", + " ████ ", + " █████████ " + ], + '2': [ + " ██████████ ", + "████████████", + " ████", + " ██████████ ", + "████ ", + "████████████", + " ██████████ " + ], + '3': [ + " ██████████ ", + "████████████", + " ████", + " ██████ ", + " ████", + "████████████", + " ██████████ " + ], + '4': [ + "███ ████", + "███ ████", + "███ ████", + "████████████", + " ████", + " ████", + " ████" + ], + '5': [ + "████████████", + "████████████", + "████ ", + "████████████", + " ████", + "████████████", + "████████████" + ], + '6': [ + " ██████████ ", + "████████████", + "████ ", + "████████████", + "████ ████", + "████████████", + " ██████████ " + ], + '7': [ + "████████████", + "████████████", + " ████ ", + " ████ ", + " ████ ", + "████ ", + "████ " + ], + '8': [ + " ██████████ ", + "████████████", + "████ ████", + " ██████████ ", + "████ ████", + "████████████", + " ██████████ " + ], + '9': [ + " ██████████ ", + "████████████", + "████ ████", + "████████████", + " ████", + "████████████", + " ██████████ " + ], + ':': [ + " ", + " ████ ", + " ████ ", + " ", + " ████ ", + " ████ ", + " " + ] +} diff --git a/clock/ui/input_handlers.py b/clock/ui/input_handlers.py new file mode 100644 index 0000000..67e5a01 --- /dev/null +++ b/clock/ui/input_handlers.py @@ -0,0 +1,202 @@ +import curses + +class InputHandling: + + def _handle_active_alarms_input(self, key): + """Handle input for active alarms view""" + if not self.active_alarms: + # No active alarms, return to clock view + self.current_view = 'CLOCK' + return + + # Get the first (or only) active alarm + alarm_id = list(self.active_alarms.keys())[0] + + if key == ord('s'): # Snooze + # Send snooze command to control queue + self.control_queue.put({ + 'type': 'snooze', + 'alarm_id': alarm_id + }) + # Remove from active alarms + del self.active_alarms[alarm_id] + + # Optional: show a snooze confirmation + self._show_error("Alarm Snoozed") + + elif key == ord('d'): # Dismiss + # Send dismiss command to control queue + self.control_queue.put({ + 'type': 'dismiss', + 'alarm_id': alarm_id + }) + # Remove from active alarms + del self.active_alarms[alarm_id] + + # Return to clock view + self.current_view = 'CLOCK' + + def _handle_clock_input(self, key): + """Handle input on the clock view""" + if key == ord('a'): + self.current_view = 'ADD_ALARM' + elif key == ord('s'): + self.current_view = 'LIST_ALARMS' + self.alarm_list = self.storage.get_saved_alerts() + + def _handle_add_alarm_input(self, key): + """Handle input for alarm creation""" + alarm = self.alarm_draft + + # Escape key handling + if key == 27: # ESC + if alarm['editing_name']: + # Cancel name editing + alarm['name'] = alarm['temp_name'] + alarm['editing_name'] = False + else: + # Return to clock view + self.current_view = 'CLOCK' + return + + # Enter key handling + if key == 10: # ENTER + if alarm['editing_name']: + # Finish name editing + alarm['editing_name'] = False + self.selected_item = 0 + else: + # Save alarm + try: + alarm_data = { + "name": alarm['name'], + "time": f"{alarm['hour']:02d}:{alarm['minute']:02d}:00", + "enabled": alarm['enabled'], + "repeat_rule": { + "type": "weekly" if alarm['weekdays'] else "once", + "days_of_week": [self.weekday_names[day].lower() for day in alarm['weekdays']], + "at": alarm['date'].strftime("%Y-%m-%d") if alarm['date'] and not alarm['weekdays'] else None + } + } + self.storage.save_new_alert(alarm_data) + self.current_view = 'CLOCK' + except Exception as e: + self._show_error(str(e)) + return + + # Date editing mode + if self.selected_item == 2: # Date field selected + if not hasattr(self, 'date_edit_pos'): + self.date_edit_pos = 2 # Default to day (2) + + if key == 32: # SPACE + if alarm['date'] is None: + alarm['date'] = datetime.now().date() + else: + alarm['date'] = None + + elif alarm['date'] is not None: + if key in [ord('h'), curses.KEY_LEFT]: + self.date_edit_pos = (self.date_edit_pos - 1) % 3 + elif key in [ord('l'), curses.KEY_RIGHT]: + self.date_edit_pos = (self.date_edit_pos + 1) % 3 + elif key in [ord('j'), curses.KEY_DOWN, ord('k'), curses.KEY_UP]: + is_up = key in [ord('k'), curses.KEY_UP] + delta = 1 if is_up else -1 + + try: + current_date = alarm['date'] + if self.date_edit_pos == 0: # Year + alarm['date'] = current_date.replace(year=max(current_date.year + delta, datetime.now().year)) + elif self.date_edit_pos == 1: # Month + new_month = max(1, min(12, current_date.month + delta)) + max_day = (datetime(current_date.year, new_month, 1) + timedelta(days=31)).replace(day=1) - timedelta(days=1) + alarm['date'] = current_date.replace(month=new_month, day=min(current_date.day, max_day.day)) + elif self.date_edit_pos == 2: # Day + max_day = (datetime(current_date.year, current_date.month, 1) + timedelta(days=31)).replace(day=1) - timedelta(days=1) + alarm['date'] = current_date.replace(day=max(1, min(max_day.day, current_date.day + delta))) + except ValueError as e: + self._show_error(str(e)) + return + + # Navigation and editing + if not alarm['editing_name']: + if key in [ord('h'), curses.KEY_LEFT]: + self.selected_item = (self.selected_item - 1) % 6 + self.date_edit_pos = 2 # Reset to day when moving away + elif key in [ord('l'), curses.KEY_RIGHT]: + self.selected_item = (self.selected_item + 1) % 6 + self.date_edit_pos = 2 # Reset to day when moving away + + # Up/Down for editing values + if key in [ord('k'), curses.KEY_UP, ord('j'), curses.KEY_DOWN]: + is_up = key in [ord('k'), curses.KEY_UP] + + if self.selected_item == 0: # Hour + alarm['hour'] = (alarm['hour'] + (1 if is_up else -1)) % 24 + elif self.selected_item == 1: # Minute + alarm['minute'] = (alarm['minute'] + (1 if is_up else -1)) % 60 + elif self.selected_item == 3: # Weekdays + # Move selection through weekdays + if 'current_weekday' not in alarm: + alarm['current_weekday'] = 0 + alarm['current_weekday'] = (alarm['current_weekday'] + (1 if is_up else -1)) % 7 + + # Space key for toggling/editing + if key == 32: # SPACE + if self.selected_item == 4: # Name + if not alarm['editing_name']: + alarm['editing_name'] = True + alarm['temp_name'] = alarm['name'] + alarm['name'] = '' + elif self.selected_item == 5: # Enabled + alarm['enabled'] = not alarm['enabled'] + elif self.selected_item == 3: # Weekdays + # Toggle current weekday + current_day = alarm.get('current_weekday', 0) + if current_day in alarm['weekdays']: + alarm['weekdays'].remove(current_day) + else: + alarm['weekdays'].append(current_day) + alarm['weekdays'].sort() + + # Clear date if weekdays are selected + if alarm['weekdays']: + alarm['date'] = None + + # Name editing + if alarm['editing_name']: + if key == curses.KEY_BACKSPACE or key == 127: + alarm['name'] = alarm['name'][:-1] + elif 32 <= key <= 126: # Printable ASCII + alarm['name'] += chr(key) + + def _handle_list_alarms_input(self, key): + """Handle input for alarm list view""" + total_items = len(self.alarm_list) + 1 # +1 for "Add new alarm" option + + if key == 27: # ESC + self.current_view = 'CLOCK' + elif key in [ord('j'), curses.KEY_DOWN]: + self.selected_item = (self.selected_item + 1) % total_items + elif key in [ord('k'), curses.KEY_UP]: + self.selected_item = (self.selected_item - 1) % total_items + elif key == ord('d'): + # Only delete if a real alarm is selected (not the "Add new" option) + if self.selected_item < len(self.alarm_list) and self.alarm_list: + try: + alarm_to_delete = self.alarm_list[self.selected_item] + self.storage.remove_saved_alert(alarm_to_delete['id']) + self.alarm_list = self.storage.get_saved_alerts() + # Adjust selected item if needed + if self.selected_item >= len(self.alarm_list): + self.selected_item = len(self.alarm_list) + except Exception as e: + self._show_error(f"Failed to delete alarm: {e}") + elif key in [ord('a'), 10]: # 'a' or Enter + if self.selected_item == len(self.alarm_list): + # "Add new alarm" option selected + self.current_view = 'ADD_ALARM' + else: + # TODO: Implement alarm editing + self._show_error("Alarm editing not implemented yet") diff --git a/clock/ui/list_alarms.py b/clock/ui/list_alarms.py new file mode 100644 index 0000000..9157fd3 --- /dev/null +++ b/clock/ui/list_alarms.py @@ -0,0 +1,86 @@ +import curses +from datetime import datetime +from .utils import init_colors, draw_big_digit + +def draw_list_alarms(stdscr, context): + """Draw the list of alarms screen""" + init_colors() + height, width = stdscr.getmaxyx() + + # Get required data from context + alarms = context.get('alarms', []) + selected_index = context.get('selected_index', 0) + weekday_names = context.get('weekday_names', ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) + + # Calculate visible range for scrolling + max_visible_items = height - 8 # Leave space for header and footer + total_items = len(alarms) + 1 # +1 for "Add new alarm" option + + # Calculate scroll position + start_idx = max(0, min(selected_index - max_visible_items // 2, + total_items - max_visible_items)) + if start_idx < 0: + start_idx = 0 + end_idx = min(start_idx + max_visible_items, total_items) + + # Header + header_text = "Alarms" + stdscr.attron(curses.color_pair(1) | curses.A_BOLD) + stdscr.addstr(2, width // 2 - len(header_text) // 2, header_text) + stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) + + # Draw alarms + for i in range(start_idx, end_idx): + y_pos = 4 + (i - start_idx) + + if i == len(alarms): # "Add new alarm" option + display_str = "Add new alarm..." + else: + alarm = alarms[i] + # Format time + time_str = alarm.get('time', 'Unknown') + + # Format repeat info + repeat_info = "" + repeat_rule = alarm.get('repeat_rule', {}) + if repeat_rule: + if repeat_rule.get('type') == 'weekly': + days = repeat_rule.get('days', []) + repeat_info = f" (Every {', '.join(weekday_names[d] for d in days)})" + elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'): + repeat_info = f" (On {repeat_rule['date']})" + + # Status indicator (in green) + status = "✓" if alarm.get('enabled', True) else "✗" + display_str = f"{status} {time_str} {alarm.get('name', 'Unnamed')}{repeat_info}" + + # Truncate if too long (leaving space for selection brackets) + max_length = width - 6 + if len(display_str) > max_length: + display_str = display_str[:max_length-3] + "..." + + # Center the item + x_pos = width // 2 - len(display_str) // 2 + + # Highlight selected item + if i == selected_index: + # Draw selection brackets in green + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(y_pos, x_pos - 2, "[ ") + stdscr.addstr(y_pos, x_pos + len(display_str), " ]") + stdscr.attroff(curses.color_pair(1)) + # Draw text in yellow (highlighted) + stdscr.attron(curses.color_pair(2)) + stdscr.addstr(y_pos, x_pos, display_str) + stdscr.attroff(curses.color_pair(2)) + else: + # Draw normal items in green + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(y_pos, x_pos, display_str) + stdscr.attroff(curses.color_pair(1)) + + # Instructions + instructions = "j/k: Move d: Delete a: Add/Edit Esc: Back" + stdscr.attron(curses.color_pair(1)) + stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions) + stdscr.attroff(curses.color_pair(1)) diff --git a/clock/ui/main_clock.py b/clock/ui/main_clock.py new file mode 100644 index 0000000..3fbbf0e --- /dev/null +++ b/clock/ui/main_clock.py @@ -0,0 +1,41 @@ +import curses +from datetime import datetime +from .utils import init_colors, draw_big_digit +from .big_digits import BIG_DIGITS +from .add_alarm import draw_add_alarm +from .active_alarm import draw_active_alarms +from .list_alarms import draw_list_alarms + + +def draw_main_clock(stdscr, context=None): + """Draw the main clock screen""" + init_colors() + height, width = stdscr.getmaxyx() + current_time = datetime.now() + + # Big time display + time_str = current_time.strftime("%H:%M:%S") + digit_width = 14 # Width of each digit pattern + total_width = digit_width * len(time_str) + start_x = (width - total_width) // 2 + start_y = (height - 7) // 2 - 4 + + # Green color for big time + stdscr.attron(curses.color_pair(1)) + for i, digit in enumerate(time_str): + draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit) + stdscr.attroff(curses.color_pair(1)) + + # Date display + date_str = current_time.strftime("%Y-%m-%d") + date_x = width // 2 - len(date_str) // 2 + date_y = height // 2 + 4 + + stdscr.attron(curses.color_pair(2)) + stdscr.addstr(date_y, date_x, date_str) + stdscr.attroff(curses.color_pair(2)) + + # Menu options + menu_str = "A: Add Alarm S: List Alarms Q: Quit" + menu_x = width // 2 - len(menu_str) // 2 + stdscr.addstr(height - 2, menu_x, menu_str) diff --git a/clock/ui/ncurses_ui.py b/clock/ui/ncurses_ui.py new file mode 100644 index 0000000..1e5dc6f --- /dev/null +++ b/clock/ui/ncurses_ui.py @@ -0,0 +1,186 @@ +import curses +import time +from datetime import datetime, date, timedelta +import threading +import logging +import queue + +from .utils import draw_error, init_colors +from .active_alarm import draw_active_alarms +from .add_alarm import draw_add_alarm +from .list_alarms import draw_list_alarms +from .main_clock import draw_main_clock +from .input_handlers import InputHandling + +class UI(InputHandling): + def __init__(self, alarm_system_manager, control_queue): + # UI State Management + self.alarm_system = alarm_system_manager + self.stop_event = alarm_system_manager.stop_event + self.storage = alarm_system_manager.storage + + # Control queue for interacting with AlarmSiren + self.control_queue = control_queue + + # Logging + self.logger = logging.getLogger(__name__) + + # Active alarm tracking + self.active_alarms = {} + + # UI State + self.reset_ui_state() + + def reset_ui_state(self): + """Reset all UI state variables to their initial values""" + # Menu states + self.current_view = 'CLOCK' + self.selected_item = 0 + + # Alarm Creation State + self.alarm_draft = { + 'name': 'New Alarm', + 'hour': datetime.now().hour, + 'minute': datetime.now().minute, + 'enabled': True, + 'date': None, + 'weekdays': [], + 'editing_name': False, + 'temp_name': '' + } + + # Error handling + self.error_message = None + self.error_timestamp = None + + # Alarm list + self.alarm_list = [] + + # Weekday names (to match specification) + self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] + + # Clear active alarms + self.active_alarms.clear() + + def run(self): + """Start the ncurses UI in a separate thread""" + def ui_thread(): + try: + # Start a thread to monitor control queue + monitor_thread = threading.Thread(target=self._monitor_control_queue, daemon=True) + monitor_thread.start() + + curses.wrapper(self._main_loop) + except Exception as e: + self.logger.error(f"UI Thread Error: {e}") + finally: + self.stop_event.set() + + ui_thread_obj = threading.Thread(target=ui_thread, daemon=True) + ui_thread_obj.start() + return ui_thread_obj + + def _monitor_control_queue(self): + """Monitor the control queue for alarm events""" + while not self.stop_event.is_set(): + try: + # Non-blocking check of control queue + try: + control_msg = self.control_queue.get(timeout=1) + + # Handle different types of control messages + if control_msg['type'] == 'trigger': + # Store triggered alarm + alarm_id = control_msg['alarm_id'] + self.active_alarms[alarm_id] = control_msg['info'] + + # If not already in alarm view, switch to it + if self.current_view != 'ACTIVE_ALARMS': + self.current_view = 'ACTIVE_ALARMS' + + except queue.Empty: + pass + + time.sleep(0.1) + except Exception as e: + self.logger.error(f"Error monitoring control queue: {e}") + time.sleep(1) + + def _show_error(self, message, duration=30): + """Display an error message""" + self.error_message = message + self.error_timestamp = time.time() + + def _main_loop(self, stdscr): + """Main ncurses event loop""" + curses.curs_set(0) + stdscr.keypad(1) + stdscr.timeout(100) + + time.sleep(0.2) + stdscr.clear() + + while not self.stop_event.is_set(): + stdscr.erase() + + # Draw view based on current state + if self.current_view == 'CLOCK': + draw_main_clock(stdscr) + elif self.current_view == 'ADD_ALARM': + draw_add_alarm(stdscr, { + 'new_alarm_selected': self.selected_item, + 'new_alarm_name': self.alarm_draft['name'], + 'new_alarm_hour': self.alarm_draft['hour'], + 'new_alarm_minute': self.alarm_draft['minute'], + 'new_alarm_enabled': self.alarm_draft['enabled'], + 'new_alarm_date': self.alarm_draft['date'], + 'new_alarm_weekdays': self.alarm_draft['weekdays'], + 'weekday_names': self.weekday_names, + 'date_edit_pos': getattr(self, 'date_edit_pos', 2) + }) + elif self.current_view == 'LIST_ALARMS': + draw_list_alarms(stdscr, { + 'alarms': self.alarm_list or [], + 'weekday_names': self.weekday_names, + 'selected_index': self.selected_item + }) + elif self.current_view == 'ACTIVE_ALARMS': + # Draw active alarm view + draw_active_alarms(stdscr, {'active_alarms': self.active_alarms }) + + # Render error if exists + if self.error_message: + draw_error(stdscr, self.error_message) + self._clear_error_if_expired() + + stdscr.refresh() + + # Handle input + key = stdscr.getch() + if key != -1: + if key == ord('q'): + # Context-sensitive 'q' key handling + if self.current_view == 'CLOCK': + break # Exit the application only from clock view + else: + self.current_view = 'CLOCK' # Return to clock view from other views + continue + + # Context-specific input handling + if self.current_view == 'CLOCK': + self._handle_clock_input(key) + elif self.current_view == 'ADD_ALARM': + self._handle_add_alarm_input(key) + elif self.current_view == 'LIST_ALARMS': + self._handle_list_alarms_input(key) + elif self.current_view == 'ACTIVE_ALARMS': + self._handle_active_alarms_input(key) + + time.sleep(0.2) + + def _clear_error_if_expired(self): + """Clear error message if expired""" + if self.error_message and self.error_timestamp: + if time.time() - self.error_timestamp > 3: + self.error_message = None + self.error_timestamp = None diff --git a/clock/ui/utils.py b/clock/ui/utils.py new file mode 100644 index 0000000..252aeea --- /dev/null +++ b/clock/ui/utils.py @@ -0,0 +1,34 @@ +import curses +from .big_digits import BIG_DIGITS + + +def init_colors(): + """Initialize color pairs matching specification""" + curses.start_color() + curses.use_default_colors() + # Green text on black background (primary color) + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) + # Highlight color (yellow) + curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) + # Error color (red) + curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) + +def draw_error(stdscr, error_message): + """Draw error message following specification""" + height, width = stdscr.getmaxyx() + + # Truncate message if too long + error_message = error_message[:width-4] + + error_x = max(0, width // 2 - len(error_message) // 2) + error_y = height - 4 # Show near bottom of screen + + stdscr.attron(curses.color_pair(3) | curses.A_BOLD) + stdscr.addstr(error_y, error_x, error_message) + stdscr.attroff(curses.color_pair(3) | curses.A_BOLD) + +def draw_big_digit(stdscr, y, x, digit): + """Draw a big digit using predefined patterns""" + patterns = BIG_DIGITS[digit] + for i, line in enumerate(patterns): + stdscr.addstr(y + i, x, line)