diff --git a/README.md b/README.md index 97aec2c..347026f 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,16 @@ -# EEE PC Alarm Clock! +# EEE PC Alarm Clock! Runs on Debian 12. Requires Python 3 and mpg123. -To use, just run `main.py` from the `alert_api/` directory. +To use, just run `main.py` from the `clock/` directory. It includes: - A local ncurses UI. -- A simple HTTP API (for how to use it, check out `alert_api/tests.sh`). +- A simple HTTP API + - for how to use it, check out + `alert_clients/alarm_api_client/alarm_api_client.py`. Put your alarm sounds in `~/alarms/` as MP3 files. -License: MIT \ No newline at end of file +License: MIT diff --git a/alert_api/alarm_api.py b/alert_api/alarm_api.py deleted file mode 100644 index 8754e47..0000000 --- a/alert_api/alarm_api.py +++ /dev/null @@ -1,151 +0,0 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer -import logging -import os -import json -import hashlib -from dataclasses import dataclass, field, asdict -from typing import List, Optional, Dict, Any -from datetime import datetime -from http import HTTPStatus -import re - -from alarm_storage import AlarmStorage -from data_classes import RepeatRule, Snooze, Metadata, Alarm -from logging_config import setup_logging - -# Set up logging configuration -logger = setup_logging() - -class AlertApi(BaseHTTPRequestHandler): - def __init__(self, *args, **kwargs): - self.storage = AlarmStorage("data/alerts.json") - self.logger = logger - super().__init__(*args, **kwargs) - - def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None: - """Send a JSON response with the given status code and data/error""" - self.send_response(status_code) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - - response = {} - if data is not None: - response["data"] = data - if error is not None: - response["error"] = error - - response_json = json.dumps(response) - self.logger.debug(f"Sending response: {response_json}") - self.wfile.write(response_json.encode("utf-8")) - - def _handle_request(self, method: str) -> None: - """Handle incoming requests with proper error handling""" - self.logger.info(f"Received {method} request from {self.client_address[0]}") - - try: - if method in ["POST", "PUT", "DELETE"]: - content_length = int(self.headers.get("Content-Length", 0)) - if content_length == 0: - raise ValueError("Missing request body") - - post_data = self.rfile.read(content_length) - self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}") - - post_data = json.loads(post_data) - else: - post_data = None - - # Route request to appropriate handler - handler = getattr(self, f"_handle_{method.lower()}", None) - if handler: - self.logger.debug(f"Routing to handler: _handle_{method.lower()}") - handler(post_data) - else: - self.logger.warning(f"Method not allowed: {method}") - self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed") - - except json.JSONDecodeError as e: - self.logger.error(f"JSON decode error: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body") - except ValueError as e: - self.logger.error(f"Validation error: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) - except Exception as e: - self.logger.error(f"Unexpected error: {str(e)}", exc_info=True) - self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error") - - def _handle_get(self, _) -> None: - """Handle GET request""" - self.logger.debug("Processing GET request for all alarms") - alarms = self.storage.get_saved_alerts() - self.logger.debug(f"Retrieved {len(alarms)} alarms") - self._send_response(HTTPStatus.OK, data=alarms) - - def _handle_post(self, data: dict) -> None: - """Handle POST request""" - self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}") - try: - alarm_id = self.storage.save_new_alert(data) - self.logger.info(f"Successfully created new alarm with ID: {alarm_id}") - self._send_response(HTTPStatus.CREATED, data={"id": alarm_id}) - except ValueError as e: - self.logger.error(f"Failed to create alarm: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) - - def _handle_put(self, data: dict) -> None: - """Handle PUT request""" - alarm_id = data.pop("id", None) - self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}") - - if alarm_id is None: - self.logger.error("PUT request missing alarm ID") - self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID") - return - - if self.storage.update_alert(alarm_id, data): - self.logger.info(f"Successfully updated alarm ID: {alarm_id}") - self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"}) - else: - self.logger.warning(f"Alarm not found for update: {alarm_id}") - self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") - - def _handle_delete(self, data: dict) -> None: - """Handle DELETE request""" - alarm_id = data.get("id") - self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}") - - if not isinstance(alarm_id, int): - self.logger.error(f"Invalid alarm ID format: {alarm_id}") - self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID") - return - - if self.storage.remove_saved_alert(alarm_id): - self.logger.info(f"Successfully deleted alarm ID: {alarm_id}") - self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"}) - else: - self.logger.warning(f"Alarm not found for deletion: {alarm_id}") - self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") - - def do_GET(self): self._handle_request("GET") - def do_POST(self): self._handle_request("POST") - def do_PUT(self): self._handle_request("PUT") - def do_DELETE(self): self._handle_request("DELETE") - - -def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): - logger.info(f"Starting AlertApi on port {port}") - - server_address = ("", port) - httpd = server_class(server_address, handler_class) - - try: - logger.info("Server is ready to handle requests") - httpd.serve_forever() - except KeyboardInterrupt: - logger.info("Received shutdown signal") - except Exception as e: - logger.error(f"Server error: {str(e)}", exc_info=True) - finally: - httpd.server_close() - logger.info("Server stopped") diff --git a/alert_api/alarm_siren.py b/alert_api/alarm_siren.py deleted file mode 100644 index 5dd6dcd..0000000 --- a/alert_api/alarm_siren.py +++ /dev/null @@ -1,260 +0,0 @@ -import os -import time -import threading -import subprocess -import queue -import logging -from datetime import datetime, timedelta -from typing import Optional, Dict, Any - -from logging_config import setup_logging - -# Set up logging -logger = setup_logging() - -class AlarmSiren: - def __init__(self): - # Communication queues - self.alarm_queue = queue.Queue() - self.control_queue = queue.Queue() - - # Tracking active alarms - self.active_alarms: Dict[int, Dict[str, Any]] = {} - - # Playback thread - self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True) - self.playback_thread.start() - - def schedule_alarm(self, alarm_config: Dict[str, Any]): - """Schedule an alarm based on its configuration""" - logger.info(f"Scheduling alarm: {alarm_config}") - self.alarm_queue.put(alarm_config) - - def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]: - """Calculate the next alarm trigger time based on repeat rule""" - now = datetime.now() - current_time = now.time() - - # Parse alarm time - alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time() - - # Determine the next trigger - repeat_rule = alarm_config['repeat_rule'] - - # Determine repeat rule type and details - if isinstance(repeat_rule, dict): - repeat_type = repeat_rule.get('type') - repeat_days = repeat_rule.get('days_of_week', []) - repeat_at = repeat_rule.get('at') - else: - # Assume it's an object-like structure with attributes - repeat_type = getattr(repeat_rule, 'type', None) - repeat_days = getattr(repeat_rule, 'days_of_week', []) - repeat_at = getattr(repeat_rule, 'at', None) - - # Sanity check - if repeat_type is None: - logger.error(f"Invalid repeat rule configuration: {repeat_rule}") - return None - - if repeat_type == 'once': - # For one-time alarm, check the specific date - try: - # If 'at' is None, use current date - if repeat_at is None: - specific_date = now.date() - else: - specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date() - return datetime.combine(specific_date, alarm_time) - except ValueError: - logger.error("Invalid one-time alarm configuration") - return None - - elif repeat_type == 'daily': - # Daily alarm - trigger today or tomorrow - next_trigger = datetime.combine(now.date(), alarm_time) - if current_time < alarm_time: - return next_trigger - return next_trigger + timedelta(days=1) - - elif repeat_type == 'weekly': - # Weekly alarm - check configured days - today = now.strftime("%A").lower() - configured_days = [day.lower() for day in repeat_days] - - if today in configured_days and current_time < alarm_time: - return datetime.combine(now.date(), alarm_time) - - # Find next configured day - days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] - current_index = days_order.index(today) - - for offset in range(1, 8): - next_day_index = (current_index + offset) % 7 - next_day = days_order[next_day_index] - - if next_day in configured_days: - next_date = now.date() + timedelta(days=offset) - return datetime.combine(next_date, alarm_time) - - return None - - def _play_audio(self, file_path: str, volume: int = 100): - """Play audio file using mpg123 in the background.""" - try: - if not os.path.exists(file_path): - logger.error(f"Audio file not found: {file_path}") - return None - - # Construct mpg123 command with volume control - volume_adjust = f"-g {volume}" - cmd = ["mpg123", volume_adjust, file_path] - - logger.info(f"Playing alarm: {file_path}") - - # Run mpg123 in the background, suppressing stdout/stderr - process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return process - except Exception as e: - logger.error(f"Error playing audio: {e}") - return None - - def _playback_worker(self): - """Background thread for managing alarm playback""" - while True: - try: - # Check for new alarms to schedule - try: - new_alarm = self.alarm_queue.get(timeout=1) - alarm_time = self._calculate_next_alarm_time(new_alarm) - if alarm_time: - alarm_id = new_alarm.get('id', id(new_alarm)) - self.active_alarms[alarm_id] = { - 'config': new_alarm, - 'trigger_time': alarm_time, - 'snooze_count': 0, - 'process': None - } - except queue.Empty: - pass - - # Check for control signals (snooze/dismiss) - try: - control_msg = self.control_queue.get(timeout=0.1) - alarm_id = control_msg.get('alarm_id') - if control_msg['type'] == 'snooze': - self.snooze_alarm(alarm_id) - elif control_msg['type'] == 'dismiss': - self.dismiss_alarm(alarm_id) - except queue.Empty: - pass - - # Check for alarms to trigger - now = datetime.now() - for alarm_id, alarm_info in list(self.active_alarms.items()): - # Check if alarm is more than 1 hour late - if now > alarm_info['trigger_time'] + timedelta(hours=1): - logger.warning(f"Alarm {alarm_id} is over 1 hour late. Disabling.") - del self.active_alarms[alarm_id] - continue - if now >= alarm_info['trigger_time']: - # Trigger alarm if not already active - if alarm_info['process'] is None: - alarm_info['process'] = self._play_audio( - alarm_info['config']['file_to_play'], - alarm_info['config'].get('metadata', {}).get('volume', 100) - ) - logger.info(f"Alarm {alarm_id} triggered at {now}.") - - # Notify UI about the triggered alarm - self.control_queue.put({ - 'type': 'trigger', - 'alarm_id': alarm_id, - 'info': alarm_info - }) - - # Handle alarms that have been snoozed or dismissed - if alarm_info['process'] and alarm_info['process'].poll() is not None: - # Process has finished naturally - next_trigger = self._calculate_next_alarm_time(alarm_info['config']) - if next_trigger: - alarm_info['trigger_time'] = next_trigger - alarm_info['process'] = None - else: - logger.info(f"Removing non-repeating alarm {alarm_id}.") - del self.active_alarms[alarm_id] - - # Actively clean up zombie processes - for alarm_id, alarm_info in list(self.active_alarms.items()): - process = alarm_info.get('process') - if process and process.poll() is not None: - # Remove terminated processes - alarm_info['process'] = None - - time.sleep(0.5) # Prevent tight loop - except Exception as e: - logger.error(f"Error in playback worker: {e}") - time.sleep(1) - - def snooze_alarm(self, alarm_id: int): - """Snooze a specific alarm""" - if alarm_id not in self.active_alarms: - logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms") - return False - - alarm_info = self.active_alarms[alarm_id] - alarm_config = alarm_info['config'] - - # Default snooze configuration if not provided - snooze_config = alarm_config.get('snooze', { - 'enabled': True, - 'duration': 5, # Default 5 minutes - 'max_count': 3 # Default max 3 snoozes - }) - - if not snooze_config.get('enabled', True): - logger.warning(f"Snooze not enabled for alarm {alarm_id}") - return False - - # Check snooze count - if alarm_info.get('snooze_count', 0) >= snooze_config['max_count']: - logger.warning(f"Maximum snooze count reached for alarm {alarm_id}") - return False - - # Increment snooze count and set next trigger time - alarm_info['snooze_count'] = alarm_info.get('snooze_count', 0) + 1 - snooze_duration = snooze_config.get('duration', 5) - alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration) - - # Stop any active playback - process = alarm_info.get('process') - if process: - process.terminate() - process.wait() - alarm_info['process'] = None - - logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes.") - return True - -def dismiss_alarm(self, alarm_id: int): - """Dismiss a specific alarm.""" - if alarm_id not in self.active_alarms: - logger.warning(f"Cannot dismiss alarm {alarm_id} - not found in active alarms") - return False - - # Stop playback and terminate any running process - alarm_info = self.active_alarms.pop(alarm_id, None) - if alarm_info and alarm_info.get('process'): - try: - alarm_info['process'].terminate() - alarm_info['process'].wait(timeout=2) - except Exception as e: - logger.error(f"Error terminating alarm process: {e}") - # Force kill if terminate fails - try: - alarm_info['process'].kill() - except Exception: - pass - - logger.info(f"Dismissed alarm {alarm_id}.") - return True diff --git a/alert_api/alarm_storage.py b/alert_api/alarm_storage.py deleted file mode 100644 index 90dc2ba..0000000 --- a/alert_api/alarm_storage.py +++ /dev/null @@ -1,263 +0,0 @@ -import logging -import os -import json -import hashlib -from typing import List, Optional, Dict, Any -from datetime import datetime -import re -from dataclasses import asdict - -from data_classes import RepeatRule, Snooze, Metadata, Alarm - -# Set up logging configuration -logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(), # Console handler - logging.FileHandler('alert_api.log') # File handler - ] -) - -logger = logging.getLogger('AlertApi') - -class AlarmStorage: - def __init__(self, file_path: str, siren=None): - """Initialize AlarmStorage with optional Siren integration""" - self.file_path = file_path - self.siren = siren - self._ensure_storage_directory() - - def _ensure_storage_directory(self) -> None: - """Ensure the directory for the storage file exists""" - directory = os.path.dirname(os.path.abspath(self.file_path)) - if directory: - os.makedirs(directory, exist_ok=True) - - @staticmethod - def _generate_alarm_hash(alarm: dict) -> str: - """Generate an MD5 hash for the given alarm data""" - def to_dict(value): - return asdict(value) if hasattr(value, '__dataclass_fields__') else value - - alarm_data = alarm.copy() - alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) - alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) - alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) - - hash_input = json.dumps({ - "name": alarm_data["name"], - "time": alarm_data["time"], - "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), - "repeat_rule": alarm_data["repeat_rule"] - }, sort_keys=True) - return hashlib.md5(hash_input.encode('utf-8')).hexdigest() - - def load_data(self) -> dict: - """Load the JSON data from the file""" - try: - with open(self.file_path, 'r') as file: - data = json.load(file) - if not isinstance(data, dict): - raise ValueError("Invalid data format") - return data - except (FileNotFoundError, json.JSONDecodeError): - return {"last_id": 0, "alarms": []} - - def save_data(self, data: dict) -> None: - """Save the JSON data back to the file""" - def convert_to_dict(obj): - """Convert dataclass instances to dictionaries recursively""" - if isinstance(obj, list): - return [convert_to_dict(item) for item in obj] - elif isinstance(obj, dict): - return {key: convert_to_dict(value) for key, value in obj.items()} - elif hasattr(obj, "__dataclass_fields__"): - return asdict(obj) - else: - return obj - - # Convert data to JSON-serializable format - serializable_data = convert_to_dict(data) - - with open(self.file_path, 'w') as file: - json.dump(serializable_data, file, indent=4) - - def save_new_alert(self, alarm_data: dict) -> int: - """ - Add a new alarm to the storage and notify Siren if configured - """ - # Check for potential duplicates before saving - existing_alarms = self.get_saved_alerts() - for alarm in existing_alarms: - # Compare key characteristics - if (alarm['name'] == alarm_data['name'] and - alarm['time'] == alarm_data['time'] and - alarm['repeat_rule'] == alarm_data['repeat_rule']): - raise ValueError("Duplicate alarm already exists") - - - # Set default file_to_play if not provided - alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - - # Convert nested dictionaries to dataclass instances - try: - if 'repeat_rule' in alarm_data: - alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) - if 'snooze' in alarm_data: - alarm_data['snooze'] = Snooze(**alarm_data['snooze']) - if 'metadata' in alarm_data: - alarm_data['metadata'] = Metadata(**alarm_data['metadata']) - except TypeError as e: - raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") - - # Validate input - try: - alarm = Alarm(**alarm_data) - if not alarm.validate(): - raise ValueError("Invalid alarm configuration") - except (TypeError, ValueError) as e: - raise ValueError(f"Invalid alarm data: {str(e)}") - - # Load existing data - data = self.load_data() - - # Increment last_id and assign to new alarm - data['last_id'] += 1 - alarm_data['id'] = data['last_id'] - - # Generate hash for the alarm - alarm_data['hash'] = self._generate_alarm_hash(alarm_data) - - # Check for duplicates - if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]): - raise ValueError("Duplicate alarm detected") - - # Add new alarm to the list - data['alarms'].append(alarm_data) - - # Save updated data - self.save_data(data) - - # Notify Siren if available - if self.siren and alarm_data.get('enabled', True): - try: - self.siren.schedule_alarm(alarm_data) - logger.info(f"Siren notified of new alarm: {alarm_data['id']}") - except Exception as e: - logger.error(f"Failed to notify Siren about new alarm: {e}") - - return data['last_id'] - - def get_saved_alerts(self) -> list: - """Retrieve the list of saved alarms""" - return self.load_data().get("alarms", []) - - def remove_saved_alert(self, alarm_id: int) -> bool: - """Remove an alarm by its ID""" - data = self.load_data() - original_length = len(data["alarms"]) - - # Filter out the alarm with the given ID - data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] - - if len(data["alarms"]) == original_length: - return False - - # Save updated data - self.save_data(data) - - # Notify Siren if available - if self.siren: - try: - self.siren.dismiss_alarm(alarm_id) - logger.info(f"Siren dismissed alarm: {alarm_id}") - except Exception as e: - logger.error(f"Failed to notify Siren about removed alarm: {e}") - - return True - - def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: - """Update an existing alarm""" - alarm_data["id"] = alarm_id - - # Set default file_to_play if not provided - alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - - # Convert nested dictionaries to dataclass instances - try: - if 'repeat_rule' in alarm_data: - alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) - if 'snooze' in alarm_data: - alarm_data['snooze'] = Snooze(**alarm_data['snooze']) - if 'metadata' in alarm_data: - alarm_data['metadata'] = Metadata(**alarm_data['metadata']) - except TypeError as e: - raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") - - # Validate input - try: - alarm = Alarm(**alarm_data) - if not alarm.validate(): - raise ValueError("Invalid alarm configuration") - except (TypeError, ValueError) as e: - raise ValueError(f"Invalid alarm data: {str(e)}") - - # Load existing data - data = self.load_data() - - # Find and update the alarm - for i, existing_alarm in enumerate(data["alarms"]): - if existing_alarm["id"] == alarm_id: - # Generate new hash - alarm_data["hash"] = self._generate_alarm_hash(alarm_data) - - # Replace the existing alarm - data["alarms"][i] = alarm_data - - # Save updated data - self.save_data(data) - - # Notify Siren if available - if self.siren: - try: - # Remove existing alarm and reschedule - self.siren.dismiss_alarm(alarm_id) - - if alarm_data.get('enabled', True): - self.siren.schedule_alarm(alarm_data) - logger.info(f"Siren updated for alarm: {alarm_id}") - else: - logger.info(f"Siren dismissed disabled alarm: {alarm_id}") - except Exception as e: - logger.error(f"Failed to notify Siren about updated alarm: {e}") - - return True - - # Alarm not found - return False - - def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]: - """ - Retrieve a specific alarm by its ID - - Args: - alarm_id (int): ID of the alarm to retrieve - - Returns: - dict or None: Alarm configuration or None if not found - """ - data = self.load_data() - for alarm in data.get('alarms', []): - if alarm.get('id') == alarm_id: - return alarm - return None - - def set_siren(self, siren): - """ - Set or update the Siren instance for alarm synchronization - - Args: - siren (AlarmSiren): Siren instance to use for alarm management - """ - self.siren = siren diff --git a/alert_api/big_digits.py b/alert_api/big_digits.py deleted file mode 100644 index df5d452..0000000 --- a/alert_api/big_digits.py +++ /dev/null @@ -1,102 +0,0 @@ -# Big digit patterns (15x7 size) -BIG_DIGITS = { - '0': [ - " █████████ ", - " ███████████ ", - "███ ████", - "███ ████", - "███ ████", - " ███████████ ", - " █████████ " - ], - '1': [ - " ████ ", - " ██████ ", - " ████ ", - " ████ ", - " ████ ", - " ████ ", - " █████████ " - ], - '2': [ - " ██████████ ", - "████████████", - " ████", - " ██████████ ", - "████ ", - "████████████", - " ██████████ " - ], - '3': [ - " ██████████ ", - "████████████", - " ████", - " ██████ ", - " ████", - "████████████", - " ██████████ " - ], - '4': [ - "███ ████", - "███ ████", - "███ ████", - "████████████", - " ████", - " ████", - " ████" - ], - '5': [ - "████████████", - "████████████", - "████ ", - "████████████", - " ████", - "████████████", - "████████████" - ], - '6': [ - " ██████████ ", - "████████████", - "████ ", - "████████████", - "████ ████", - "████████████", - " ██████████ " - ], - '7': [ - "████████████", - "████████████", - " ████ ", - " ████ ", - " ████ ", - "████ ", - "████ " - ], - '8': [ - " ██████████ ", - "████████████", - "████ ████", - " ██████████ ", - "████ ████", - "████████████", - " ██████████ " - ], - '9': [ - " ██████████ ", - "████████████", - "████ ████", - "████████████", - " ████", - "████████████", - " ██████████ " - ], - ':': [ - " ", - " ████ ", - " ████ ", - " ", - " ████ ", - " ████ ", - " " - ] -} diff --git a/alert_api/data_classes.py b/alert_api/data_classes.py deleted file mode 100644 index cb4ebea..0000000 --- a/alert_api/data_classes.py +++ /dev/null @@ -1,170 +0,0 @@ -import hashlib -from dataclasses import dataclass, field, asdict -from typing import List, Optional, Dict, Any -from datetime import datetime -import os -import re -from logging_config import setup_logging - -# Set up logging -logger = setup_logging() - -@dataclass -class RepeatRule: - type: str # "daily" or "weekly" or "once" - days_of_week: Optional[List[str]] = field(default_factory=list) - at: Optional[str] = None - - def validate(self) -> bool: - """Validate repeat rule configuration""" - valid_types = {"daily", "weekly", "once"} - valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} - - if self.type not in valid_types: - logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") - return False - - if self.type == "weekly": - invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] - if invalid_days: - logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") - return False - if not self.days_of_week: - logger.error("Weekly repeat rule must specify at least one day") - return False - - if self.type == "once" and self.days_of_week: - if self.days_of_week: - logger.error("One-time alert does not support days_of_week") - return False - if not self.at: - logger.error("One-time repeat rule must specify a valid 'at' date") - return False - try: - datetime.strptime(self.at, "%d.%m.%Y") - except ValueError: - logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'") - return False - - logger.debug(f"RepeatRule validation passed: {self.__dict__}") - return True - - -@dataclass -class Snooze: - enabled: bool = True - duration: int = 10 # minutes - max_count: int = 3 - - def validate(self) -> bool: - """Validate snooze configuration""" - if not isinstance(self.enabled, bool): - logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}") - return False - - if not isinstance(self.duration, int): - logger.error(f"Snooze duration must be integer, got {type(self.duration)}") - return False - - if self.duration <= 0: - logger.error(f"Snooze duration must be positive, got {self.duration}") - return False - - if not isinstance(self.max_count, int): - logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}") - return False - - if self.max_count <= 0: - logger.error(f"Snooze max_count must be positive, got {self.max_count}") - return False - - logger.debug(f"Snooze validation passed: {self.__dict__}") - return True - - -@dataclass -class Metadata: - volume: int = 100 - notes: str = "" - md5sum: str = "" - - def validate(self) -> bool: - """Validate metadata configuration""" - if not isinstance(self.volume, int): - logger.error(f"Volume must be integer, got {type(self.volume)}") - return False - - if not 0 <= self.volume <= 100: - logger.error(f"Volume must be between 0 and 100, got {self.volume}") - return False - - logger.debug(f"Metadata validation passed: {self.__dict__}") - return True - - -@dataclass -class Alarm: - name: str - time: str # Format: "HH:MM:SS" - repeat_rule: RepeatRule - file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - enabled: bool = field(default=True) - snooze: Snooze = field(default_factory=Snooze) - metadata: Metadata = field(default_factory=Metadata) - id: Optional[int] = field(default=None) - - def validate(self) -> bool: - """Validate complete alarm configuration""" - logger.debug(f"Starting validation for alarm: {self.name}") - - # Validate name - if not isinstance(self.name, str) or len(self.name.strip()) == 0: - logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string") - return False - - # Validate time format - time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$') - if not time_pattern.match(self.time): - logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS") - return False - - # Validate enabled flag - if not isinstance(self.enabled, bool): - logger.error(f"Enabled must be boolean, got {type(self.enabled)}") - return False - - # Create default alarms directory if it doesn't exist - default_alarms_dir = os.path.expanduser("~/.alarms") - os.makedirs(default_alarms_dir, exist_ok=True) - - # Validate audio file - if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play): - logger.error(f"Audio file not found: '{self.file_to_play}'") - return False - - # Validate repeat rule - if not isinstance(self.repeat_rule, RepeatRule): - logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}") - return False - if not self.repeat_rule.validate(): - logger.error("Repeat rule validation failed") - return False - - # Validate snooze - if not isinstance(self.snooze, Snooze): - logger.error(f"Invalid snooze type: {type(self.snooze)}") - return False - if not self.snooze.validate(): - logger.error("Snooze validation failed") - return False - - # Validate metadata - if not isinstance(self.metadata, Metadata): - logger.error(f"Invalid metadata type: {type(self.metadata)}") - return False - if not self.metadata.validate(): - logger.error("Metadata validation failed") - return False - - logger.debug(f"Alarm validation passed: {self.name}") - return True diff --git a/alert_api/logging_config.py b/alert_api/logging_config.py deleted file mode 100644 index 044ba8a..0000000 --- a/alert_api/logging_config.py +++ /dev/null @@ -1,16 +0,0 @@ -import logging -import os - -def setup_logging(log_dir="logs", log_file="alarm_system.log", level=logging.DEBUG): - os.makedirs(log_dir, exist_ok=True) - log_path = os.path.join(log_dir, log_file) - - logging.basicConfig( - level=level, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler(log_path, mode='a', encoding='utf-8') - ] - ) - logger = logging.getLogger("AlarmSystem") - return logger diff --git a/alert_api/main.py b/alert_api/main.py deleted file mode 100755 index 5c193f5..0000000 --- a/alert_api/main.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/python3 - -import os -import sys -import signal -import threading -import logging -from http.server import HTTPServer -from multiprocessing import Queue -import subprocess - -# Import our custom modules -from alarm_api import AlertApi, run as run_api -from alarm_storage import AlarmStorage -from alarm_siren import AlarmSiren -from ncurses_ui import UI -from logging_config import setup_logging - -# Set up logging -logger = setup_logging() - -class AlarmSystemManager: - def __init__(self, - api_port: int = 8000, - storage_path: str = "data/alerts.json", - log_level: int = logging.DEBUG): - """ - Initialize and manage the entire alarm system - - Args: - api_port (int): Port for the HTTP API server - storage_path (str): Path to the JSON storage file - log_level (int): Logging level - """ - # Configure logging - logging.getLogger().setLevel(log_level) - - # Ensure storage directory exists - os.makedirs(os.path.dirname(os.path.abspath(storage_path)), exist_ok=True) - - # Initialize components - self.siren = AlarmSiren() - self.storage = AlarmStorage(storage_path, siren=self.siren) - - # API server setup - self.api_port = api_port - self.api_server = None - self.api_thread = None - - # Synchronization and lifecycle management - self.stop_event = threading.Event() - - # Signal handling setup - self._setup_signal_handlers() - - # Alarm synchronization - self._sync_alarms() - - # UI.. - self.ui = UI(self, control_queue=self.siren.control_queue) - - def _setup_signal_handlers(self): - """Set up signal handlers for graceful shutdown""" - signal.signal(signal.SIGINT, self._handle_shutdown) - signal.signal(signal.SIGTERM, self._handle_shutdown) - - def _handle_shutdown(self, signum, frame): - """ - Handle system shutdown signals - - Args: - signum (int): Signal number - frame (frame): Current stack frame - """ - logger.info(f"Received shutdown signal {signum}. Initiating graceful shutdown...") - self.stop_event.set() - - def _sync_alarms(self): - """ - Synchronize stored alarms with the siren - This ensures any saved alarms are loaded and scheduled - """ - try: - saved_alarms = self.storage.get_saved_alerts() - logger.info(f"Synchronizing {len(saved_alarms)} saved alarms") - - for alarm in saved_alarms: - if alarm.get('enabled', True): - self.siren.schedule_alarm(alarm) - except Exception as e: - logger.error(f"Error synchronizing alarms: {e}") - - def _start_api_server(self): - """ - Start the HTTP API server in a separate thread - """ - try: - def run_server(): - server_address = ("", self.api_port) - self.api_server = HTTPServer(server_address, AlertApi) - logger.info(f"API Server started on port {self.api_port}") - - # Run until stopped - while not self.stop_event.is_set(): - self.api_server.handle_request() - - self.api_server.server_close() - logger.info("API Server stopped") - - # Start API server in a thread - self.api_thread = threading.Thread(target=run_server, daemon=True) - self.api_thread.start() - except Exception as e: - logger.error(f"Failed to start API server: {e}") - self.stop_event.set() - - def run(self): - """ - Main run method to start all system components - """ - try: - # Start API server - self._start_api_server() - - # Start UI - ui_thread = self.ui.run() - - # Log system startup - logger.info("Alarm System started successfully") - - # Wait for shutdown signal - while not self.stop_event.is_set(): - self.stop_event.wait(timeout=1) - - # Graceful shutdown - logger.info("Initiating system shutdown...") - except Exception as e: - logger.error(f"System startup error: {e}") - finally: - self._cleanup() - - def _cleanup(self): - """ - Perform cleanup operations during shutdown - """ - # Close API server if running - if self.api_server: - try: - self.api_server.server_close() - except Exception as e: - logger.error(f"Error closing API server: {e}") - - # Stop threads - if self.api_thread and self.api_thread.is_alive(): - self.api_thread.join(timeout=2) - - # Kill any remaining mpg123 processes - try: - subprocess.run(['pkill', 'mpg123'], check=False) - except Exception as e: - logger.error(f"Error killing mpg123 processes: {e}") - - logger.info("Alarm System shutdown complete") - -def main(): - """ - Entry point for the Alarm System - Parse command-line arguments and start the system - """ - # Default configurations - default_port = 8000 - default_storage_path = "data/alerts.json" - - # Parse port from command line if provided - port = int(sys.argv[1]) if len(sys.argv) > 1 else default_port - - # Create and run the Alarm System - try: - alarm_system = AlarmSystemManager( - api_port=port, - storage_path=default_storage_path - ) - alarm_system.run() - except Exception as e: - logger.error(f"Fatal error starting Alarm System: {e}", exc_info=True) - sys.exit(1) - -if __name__ == "__main__": - main() diff --git a/alert_api/ncurses_ui.py b/alert_api/ncurses_ui.py deleted file mode 100644 index 9252c4b..0000000 --- a/alert_api/ncurses_ui.py +++ /dev/null @@ -1,332 +0,0 @@ -import curses -import time -from datetime import datetime, date, timedelta -import threading -import logging -import queue - -# Import drawing methods from the new module -from ncurses_ui_draw import ( - _draw_main_clock, - _draw_add_alarm, - _draw_list_alarms, - _draw_active_alarms, - _draw_error -) - -class UI: - def __init__(self, alarm_system_manager, control_queue): - # UI State Management - self.alarm_system = alarm_system_manager - self.stop_event = alarm_system_manager.stop_event - self.storage = alarm_system_manager.storage - - # Control queue for interacting with AlarmSiren - self.control_queue = control_queue - - # Logging - self.logger = logging.getLogger(__name__) - - # Active alarm tracking - self.active_alarms = {} - - # UI State - self.reset_ui_state() - - def reset_ui_state(self): - """Reset all UI state variables to their initial values""" - # Menu states - self.current_view = 'CLOCK' - self.selected_item = 0 - - # Alarm Creation State - self.alarm_draft = { - 'name': 'New Alarm', - 'hour': datetime.now().hour, - 'minute': datetime.now().minute, - 'enabled': True, - 'date': None, - 'weekdays': [], - 'editing_name': False, - 'temp_name': '' - } - - # Error handling - self.error_message = None - self.error_timestamp = None - - # Alarm list - self.alarm_list = [] - - # Weekday names (to match specification) - self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] - - # Clear active alarms - self.active_alarms.clear() - - def run(self): - """Start the ncurses UI in a separate thread""" - def ui_thread(): - try: - # Start a thread to monitor control queue - monitor_thread = threading.Thread(target=self._monitor_control_queue, daemon=True) - monitor_thread.start() - - curses.wrapper(self._main_loop) - except Exception as e: - self.logger.error(f"UI Thread Error: {e}") - finally: - self.stop_event.set() - - ui_thread_obj = threading.Thread(target=ui_thread, daemon=True) - ui_thread_obj.start() - return ui_thread_obj - - def _monitor_control_queue(self): - """Monitor the control queue for alarm events""" - while not self.stop_event.is_set(): - try: - # Non-blocking check of control queue - try: - control_msg = self.control_queue.get(timeout=1) - - # Handle different types of control messages - if control_msg['type'] == 'trigger': - # Store triggered alarm - alarm_id = control_msg['alarm_id'] - self.active_alarms[alarm_id] = control_msg['info'] - - # If not already in alarm view, switch to it - if self.current_view != 'ACTIVE_ALARMS': - self.current_view = 'ACTIVE_ALARMS' - - except queue.Empty: - pass - - time.sleep(0.1) - except Exception as e: - self.logger.error(f"Error monitoring control queue: {e}") - time.sleep(1) - - def _handle_active_alarms_input(self, key): - """Handle input for active alarms view""" - if not self.active_alarms: - # No active alarms, return to clock view - self.current_view = 'CLOCK' - return - - # Get the first (or only) active alarm - alarm_id = list(self.active_alarms.keys())[0] - - if key == ord('s'): # Snooze - # Send snooze command to control queue - self.control_queue.put({ - 'type': 'snooze', - 'alarm_id': alarm_id - }) - # Remove from active alarms - del self.active_alarms[alarm_id] - - # Optional: show a snooze confirmation - self._show_error("Alarm Snoozed") - - elif key == ord('d'): # Dismiss - # Send dismiss command to control queue - self.control_queue.put({ - 'type': 'dismiss', - 'alarm_id': alarm_id - }) - # Remove from active alarms - del self.active_alarms[alarm_id] - - # Return to clock view - self.current_view = 'CLOCK' - - def _handle_clock_input(self, key): - """Handle input on the clock view""" - if key == ord('a'): - self.current_view = 'ADD_ALARM' - elif key == ord('s'): - self.current_view = 'LIST_ALARMS' - self.alarm_list = self.storage.get_saved_alerts() - - def _handle_add_alarm_input(self, key): - """Comprehensive input handling for alarm creation""" - alarm = self.alarm_draft - - # Escape key handling - if key == 27: # ESC - if alarm['editing_name']: - # Cancel name editing - alarm['name'] = alarm['temp_name'] - alarm['editing_name'] = False - else: - # Return to clock view - self.current_view = 'CLOCK' - return - - # Enter key handling - if key == 10: # ENTER - if alarm['editing_name']: - # Finish name editing - alarm['editing_name'] = False - self.selected_item = 0 - else: - # Save alarm - try: - alarm_data = { - "name": alarm['name'], - "time": f"{alarm['hour']:02d}:{alarm['minute']:02d}:00", - "enabled": alarm['enabled'], - "repeat_rule": { - "type": "weekly" if alarm['weekdays'] else "once", - "days_of_week": [self.weekday_names[day].lower() for day in alarm['weekdays']], - "at": alarm['date'].strftime("%d.%m.%Y") if alarm['date'] else None - } - } - self.storage.save_new_alert(alarm_data) - self.current_view = 'CLOCK' - except Exception as e: - self._show_error(str(e)) - return - - # Navigation and editing - if not alarm['editing_name']: - if key in [ord('h'), curses.KEY_LEFT]: - self.selected_item = (self.selected_item - 1) % 6 - elif key in [ord('l'), curses.KEY_RIGHT]: - self.selected_item = (self.selected_item + 1) % 6 - - # Up/Down for editing values - if key in [ord('k'), curses.KEY_UP, ord('j'), curses.KEY_DOWN]: - is_up = key in [ord('k'), curses.KEY_UP] - - if self.selected_item == 0: # Hour - alarm['hour'] = (alarm['hour'] + (1 if is_up else -1)) % 24 - elif self.selected_item == 1: # Minute - alarm['minute'] = (alarm['minute'] + (1 if is_up else -1)) % 60 - - # Space key for toggling/editing - if key == 32: # SPACE - if self.selected_item == 4: # Name - if not alarm['editing_name']: - alarm['editing_name'] = True - alarm['temp_name'] = alarm['name'] - alarm['name'] = '' - elif self.selected_item == 5: # Enabled - alarm['enabled'] = not alarm['enabled'] - - # Name editing - if alarm['editing_name']: - if key == curses.KEY_BACKSPACE or key == 127: - alarm['name'] = alarm['name'][:-1] - elif 32 <= key <= 126: # Printable ASCII - alarm['name'] += chr(key) - - def _handle_list_alarms_input(self, key): - """Handle input for alarm list view""" - total_items = len(self.alarm_list) + 1 # +1 for "Add new alarm" option - - if key == 27: # ESC - self.current_view = 'CLOCK' - elif key in [ord('j'), curses.KEY_DOWN]: - self.selected_item = (self.selected_item + 1) % total_items - elif key in [ord('k'), curses.KEY_UP]: - self.selected_item = (self.selected_item - 1) % total_items - elif key == ord('d'): - # Only delete if a real alarm is selected (not the "Add new" option) - if self.selected_item < len(self.alarm_list) and self.alarm_list: - try: - alarm_to_delete = self.alarm_list[self.selected_item] - self.storage.remove_saved_alert(alarm_to_delete['id']) - self.alarm_list = self.storage.get_saved_alerts() - # Adjust selected item if needed - if self.selected_item >= len(self.alarm_list): - self.selected_item = len(self.alarm_list) - except Exception as e: - self._show_error(f"Failed to delete alarm: {e}") - elif key in [ord('a'), 10]: # 'a' or Enter - if self.selected_item == len(self.alarm_list): - # "Add new alarm" option selected - self.current_view = 'ADD_ALARM' - else: - # TODO: Implement alarm editing - self._show_error("Alarm editing not implemented yet") - - def _show_error(self, message, duration=30): - """Display an error message""" - self.error_message = message - self.error_timestamp = time.time() - - def _main_loop(self, stdscr): - """Main ncurses event loop""" - curses.curs_set(0) - stdscr.keypad(1) - stdscr.timeout(100) - - time.sleep(0.2) - stdscr.clear() - - while not self.stop_event.is_set(): - stdscr.erase() - - # Draw view based on current state - if self.current_view == 'CLOCK': - _draw_main_clock(stdscr) - elif self.current_view == 'ADD_ALARM': - _draw_add_alarm(stdscr, { - 'new_alarm_selected': self.selected_item, - 'new_alarm_name': self.alarm_draft['name'], - 'new_alarm_hour': self.alarm_draft['hour'], - 'new_alarm_minute': self.alarm_draft['minute'], - 'new_alarm_enabled': self.alarm_draft['enabled'], - 'new_alarm_date': self.alarm_draft['date'], - 'new_alarm_weekdays': self.alarm_draft['weekdays'], - 'weekday_names': self.weekday_names - }) - elif self.current_view == 'LIST_ALARMS': - _draw_list_alarms(stdscr, { - 'alarms': self.alarm_list or [], - 'weekday_names': self.weekday_names, - 'selected_index': self.selected_item - }) - elif self.current_view == 'ACTIVE_ALARMS': - # Draw active alarm view - _draw_active_alarms(stdscr, {'active_alarms': self.active_alarms }) - - # Render error if exists - if self.error_message: - _draw_error(stdscr, self.error_message) - self._clear_error_if_expired() - - stdscr.refresh() - - # Handle input - key = stdscr.getch() - if key != -1: - if key == ord('q'): - # Context-sensitive 'q' key handling - if self.current_view == 'CLOCK': - break # Exit the application only from clock view - else: - self.current_view = 'CLOCK' # Return to clock view from other views - continue - - # Context-specific input handling - if self.current_view == 'CLOCK': - self._handle_clock_input(key) - elif self.current_view == 'ADD_ALARM': - self._handle_add_alarm_input(key) - elif self.current_view == 'LIST_ALARMS': - self._handle_list_alarms_input(key) - elif self.current_view == 'ACTIVE_ALARMS': - self._handle_active_alarms_input(key) - - time.sleep(0.2) - - def _clear_error_if_expired(self): - """Clear error message if expired""" - if self.error_message and self.error_timestamp: - if time.time() - self.error_timestamp > 3: - self.error_message = None - self.error_timestamp = None diff --git a/alert_api/ncurses_ui_draw.py b/alert_api/ncurses_ui_draw.py deleted file mode 100644 index ad742c6..0000000 --- a/alert_api/ncurses_ui_draw.py +++ /dev/null @@ -1,359 +0,0 @@ -import curses -from datetime import datetime -from big_digits import BIG_DIGITS - -def _init_colors(): - """Initialize color pairs matching specification""" - curses.start_color() - curses.use_default_colors() - # Green text on black background (primary color) - curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) - # Highlight color (yellow) - curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) - # Error color (red) - curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) - -def _draw_error(stdscr, error_message): - """Draw error message following specification""" - height, width = stdscr.getmaxyx() - - # Truncate message if too long - error_message = error_message[:width-4] - - error_x = max(0, width // 2 - len(error_message) // 2) - error_y = height - 4 # Show near bottom of screen - - stdscr.attron(curses.color_pair(3) | curses.A_BOLD) - stdscr.addstr(error_y, error_x, error_message) - stdscr.attroff(curses.color_pair(3) | curses.A_BOLD) - -def _draw_active_alarms(stdscr, context): - """Draw the active alarms""" - _init_colors() - height, width = stdscr.getmaxyx() - current_time = datetime.now() - - # Draw the main clock (original position) - time_str = current_time.strftime("%H:%M:%S") - digit_width = 14 - total_width = digit_width * len(time_str) - start_x = (width - total_width) // 2 - start_y = (height - 7) // 2 - 4 # Original position from _draw_main_clock - - # Draw blinking dot - if int(current_time.timestamp()) % 2 == 0: - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(start_y - 1, start_x + total_width - 2, "•") - stdscr.attroff(curses.color_pair(1)) - - # Green color for big time - stdscr.attron(curses.color_pair(1)) - for i, digit in enumerate(time_str): - _draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit) - stdscr.attroff(curses.color_pair(1)) - - # Draw date (as in main clock) - date_str = current_time.strftime("%Y-%m-%d") - date_x = width // 2 - len(date_str) // 2 - date_y = height // 2 + 4 - stdscr.attron(curses.color_pair(2)) - stdscr.addstr(date_y, date_x, date_str) - stdscr.attroff(curses.color_pair(2)) - - # Get active alarm info - active_alarms = context.get('active_alarms', {}) - if not active_alarms: - return - - # Get the first (or only) active alarm - alarm_id = list(active_alarms.keys())[0] - alarm_info = active_alarms[alarm_id] - alarm_config = alarm_info['config'] - - # Format alarm info - alarm_name = alarm_config.get('name', 'Unnamed Alarm') - alarm_time = alarm_config.get('time', 'Unknown Time') - #snooze_count = alarm_info.get('snooze_count', 0) - - # Draw alarm info under the clock - info_y = start_y + 8 # Position below the clock - #alarm_str = f"[ {alarm_name} - {alarm_time} - Snoozed: {snooze_count}x ]" - alarm_str = f"[ {alarm_name} - {alarm_time} ]" - alarm_x = max(0, width // 2 - len(alarm_str) // 2) - alarm_y = date_y - 2 # Just above the date - - # Center the alarm info - info_x = max(0, width // 2 - len(alarm_str) // 2) - - # Draw with green color - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(alarm_y, alarm_x, alarm_str) - stdscr.attroff(curses.color_pair(1)) - - # Instructions - stdscr.addstr(height - 2, width // 2 - 15, "S: Snooze D: Dismiss") - -def _draw_big_digit(stdscr, y, x, digit): - """Draw a big digit using predefined patterns""" - patterns = BIG_DIGITS[digit] - for i, line in enumerate(patterns): - stdscr.addstr(y + i, x, line) - -def _draw_main_clock(stdscr, context=None): - """Draw the main clock screen""" - _init_colors() - height, width = stdscr.getmaxyx() - current_time = datetime.now() - - # Big time display - time_str = current_time.strftime("%H:%M:%S") - digit_width = 14 # Width of each digit pattern - total_width = digit_width * len(time_str) - start_x = (width - total_width) // 2 - start_y = (height - 7) // 2 - 4 - - # Green color for big time - stdscr.attron(curses.color_pair(1)) - for i, digit in enumerate(time_str): - _draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit) - stdscr.attroff(curses.color_pair(1)) - - # Date display - date_str = current_time.strftime("%Y-%m-%d") - date_x = width // 2 - len(date_str) // 2 - date_y = height // 2 + 4 - - stdscr.attron(curses.color_pair(2)) - stdscr.addstr(date_y, date_x, date_str) - stdscr.attroff(curses.color_pair(2)) - - # Menu options - menu_str = "A: Add Alarm S: List Alarms Q: Quit" - menu_x = width // 2 - len(menu_str) // 2 - stdscr.addstr(height - 2, menu_x, menu_str) - -def _draw_add_alarm(stdscr, context): - """Draw the add alarm screen""" - # Ensure context is a dictionary with default values - if context is None: - context = {} - - # Provide default values with more explicit checks - context = { - 'new_alarm_selected': context.get('new_alarm_selected', 0), - 'new_alarm_name': context.get('new_alarm_name', 'New Alarm'), - 'new_alarm_hour': context.get('new_alarm_hour', datetime.now().hour), - 'new_alarm_minute': context.get('new_alarm_minute', datetime.now().minute), - 'new_alarm_enabled': context.get('new_alarm_enabled', True), - 'new_alarm_date': context.get('new_alarm_date') or None, - 'new_alarm_weekdays': context.get('new_alarm_weekdays', []) or [], - 'weekday_names': context.get('weekday_names', ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) - } - - _init_colors() - height, width = stdscr.getmaxyx() - - # Center the form vertically with good spacing - form_y = height // 2 - 6 - - # Title with green color and bold - title = "Add New Alarm" - stdscr.attron(curses.color_pair(1) | curses.A_BOLD) - stdscr.addstr(form_y, width // 2 - len(title) // 2, title) - stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) - - # Helper function to draw a labeled field - def draw_field(y, label, value, is_selected, center_offset=0): - label_str = f"{label}: " - total_width = len(label_str) + len(str(value)) - x = width // 2 - total_width // 2 + center_offset - - # Draw label in green - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(y, x, label_str) - stdscr.attroff(curses.color_pair(1)) - - # Draw value (highlighted if selected) - if is_selected: - stdscr.attron(curses.color_pair(2)) # Yellow for selected - stdscr.addstr(y, x + len(label_str), str(value)) - stdscr.attroff(curses.color_pair(2)) - else: - stdscr.attron(curses.color_pair(1)) # Green for normal - stdscr.addstr(y, x + len(label_str), str(value)) - stdscr.attroff(curses.color_pair(1)) - - # Name field with proper spacing - name_str = str(context['new_alarm_name']) - draw_field(form_y + 2, "Name", name_str, context['new_alarm_selected'] == 4) - - # Time selection with centered layout - time_y = form_y + 4 - time_label = "Time: " - hour_str = f"{int(context['new_alarm_hour']):02d}" - minute_str = f"{int(context['new_alarm_minute']):02d}" - - # Calculate center position for time - time_total_width = len(time_label) + 5 # 5 = HH:MM - time_x = width // 2 - time_total_width // 2 - - # Draw time label - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(time_y, time_x, time_label) - stdscr.attroff(curses.color_pair(1)) - - # Draw hour - if context['new_alarm_selected'] == 0: - stdscr.attron(curses.color_pair(2)) - else: - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(time_y, time_x + len(time_label), hour_str) - if context['new_alarm_selected'] == 0: - stdscr.attroff(curses.color_pair(2)) - else: - stdscr.attroff(curses.color_pair(1)) - - # Draw colon - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(time_y, time_x + len(time_label) + 2, ":") - stdscr.attroff(curses.color_pair(1)) - - # Draw minute - if context['new_alarm_selected'] == 1: - stdscr.attron(curses.color_pair(2)) - else: - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(time_y, time_x + len(time_label) + 3, minute_str) - if context['new_alarm_selected'] == 1: - stdscr.attroff(curses.color_pair(2)) - else: - stdscr.attroff(curses.color_pair(1)) - - # Date selection - if context['new_alarm_date'] and hasattr(context['new_alarm_date'], 'strftime'): - date_str = context['new_alarm_date'].strftime("%Y-%m-%d") - else: - date_str = 'No specific date' - draw_field(form_y + 6, "Date", date_str, context['new_alarm_selected'] == 2) - - # Weekday selection with improved visual style - weekday_y = form_y + 8 - weekday_label = "Repeat: " - weekdays_str = " ".join( - f"[{context['weekday_names'][i]}]" if i in context['new_alarm_weekdays'] - else f" {context['weekday_names'][i]} " - for i in range(len(context['weekday_names'])) - ) - - # Center weekdays - total_width = len(weekday_label) + len(weekdays_str) - weekday_x = width // 2 - total_width // 2 - - # Draw weekday selection - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(weekday_y, weekday_x, weekday_label) - stdscr.attroff(curses.color_pair(1)) - - if context['new_alarm_selected'] == 3: - stdscr.attron(curses.color_pair(2)) - else: - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(weekday_y, weekday_x + len(weekday_label), weekdays_str) - if context['new_alarm_selected'] == 3: - stdscr.attroff(curses.color_pair(2)) - else: - stdscr.attroff(curses.color_pair(1)) - - # Enabled/Disabled toggle with visual indicator - status_y = form_y + 10 - enabled_str = "● Enabled" if context['new_alarm_enabled'] else "○ Disabled" - draw_field(status_y, "Status", enabled_str, context['new_alarm_selected'] == 5, -2) - - # Instructions in green at the bottom - instructions = "j/k: Change h/l: Switch Space: Toggle Enter: Save Esc: Cancel" - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions) - stdscr.attroff(curses.color_pair(1)) - -def _draw_list_alarms(stdscr, context): - """Draw the list of alarms screen""" - _init_colors() - height, width = stdscr.getmaxyx() - - # Get required data from context - alarms = context.get('alarms', []) - selected_index = context.get('selected_index', 0) - weekday_names = context.get('weekday_names', ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']) - - # Calculate visible range for scrolling - max_visible_items = height - 8 # Leave space for header and footer - total_items = len(alarms) + 1 # +1 for "Add new alarm" option - - # Calculate scroll position - start_idx = max(0, min(selected_index - max_visible_items // 2, - total_items - max_visible_items)) - if start_idx < 0: - start_idx = 0 - end_idx = min(start_idx + max_visible_items, total_items) - - # Header - header_text = "Alarms" - stdscr.attron(curses.color_pair(1) | curses.A_BOLD) - stdscr.addstr(2, width // 2 - len(header_text) // 2, header_text) - stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) - - # Draw alarms - for i in range(start_idx, end_idx): - y_pos = 4 + (i - start_idx) - - if i == len(alarms): # "Add new alarm" option - display_str = "Add new alarm..." - else: - alarm = alarms[i] - # Format time - time_str = alarm.get('time', 'Unknown') - - # Format repeat info - repeat_info = "" - repeat_rule = alarm.get('repeat_rule', {}) - if repeat_rule: - if repeat_rule.get('type') == 'weekly': - days = repeat_rule.get('days', []) - repeat_info = f" (Every {', '.join(weekday_names[d] for d in days)})" - elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'): - repeat_info = f" (On {repeat_rule['date']})" - - # Status indicator (in green) - status = "✓" if alarm.get('enabled', True) else "✗" - display_str = f"{status} {time_str} {alarm.get('name', 'Unnamed')}{repeat_info}" - - # Truncate if too long (leaving space for selection brackets) - max_length = width - 6 - if len(display_str) > max_length: - display_str = display_str[:max_length-3] + "..." - - # Center the item - x_pos = width // 2 - len(display_str) // 2 - - # Highlight selected item - if i == selected_index: - # Draw selection brackets in green - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(y_pos, x_pos - 2, "[ ") - stdscr.addstr(y_pos, x_pos + len(display_str), " ]") - stdscr.attroff(curses.color_pair(1)) - # Draw text in yellow (highlighted) - stdscr.attron(curses.color_pair(2)) - stdscr.addstr(y_pos, x_pos, display_str) - stdscr.attroff(curses.color_pair(2)) - else: - # Draw normal items in green - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(y_pos, x_pos, display_str) - stdscr.attroff(curses.color_pair(1)) - - # Instructions - instructions = "j/k: Move d: Delete a: Add/Edit Esc: Back" - stdscr.attron(curses.color_pair(1)) - stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions) - stdscr.attroff(curses.color_pair(1)) diff --git a/alert_api/tests.sh b/alert_api/tests.sh deleted file mode 100644 index 36088c1..0000000 --- a/alert_api/tests.sh +++ /dev/null @@ -1,173 +0,0 @@ -#!/bin/bash - -# Configuration -API_URL="http://localhost:8000" -TEST_ALARM_NAME="Test Alarm" -TEST_ALARM_TIME="08:30:00" -TEST_AUDIO_FILE="/tmp/test.mp3" - -# Color codes for output -GREEN='\033[0;32m' -RED='\033[0;31m' -BLUE='\033[0;34m' -NC='\033[0m' # No Color - -# Helper function to print status messages -print_status() { - echo -e "${BLUE}=== $1 ===${NC}" -} - -# Helper function to check test results -check_result() { - if [ $? -eq 0 ]; then - echo -e "${GREEN}✓ $1${NC}" - else - echo -e "${RED}✗ $1${NC}" - exit 1 - fi -} - -# 1. Test server connectivity -print_status "Testing server connectivity" -curl -s "$API_URL" > /dev/null -check_result "Server connectivity check" - -# 2. Get initial state -print_status "Getting initial state" -INITIAL_STATE=$(curl -s -X GET "$API_URL") -echo "$INITIAL_STATE" | jq . -check_result "Retrieved initial state" - -# 3. Add a new alarm (POST) -print_status "Adding new alarm" -POST_DATA=$(jq -n \ - --arg name "$TEST_ALARM_NAME" \ - --arg time "$TEST_ALARM_TIME" \ - '{ - "name": $name, - "time": $time, - "repeat_rule": { - "type": "weekly", - "days_of_week": ["monday", "wednesday", "friday"] - }, - "enabled": true, - "snooze": { - "enabled": true, - "duration": 5, - "max_count": 3 - }, - "metadata": { - "volume": 75, - "notes": "Test alarm with full configuration" - } - }' -) - -ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") -NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id') - -if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then - echo -e "${RED}Failed to add alarm${NC}" - echo "Response: $ADD_RESPONSE" - exit 1 -fi - -echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}" - -#j# 4. Test duplicate alarm detection -#jprint_status "Testing duplicate alarm detection" -#jDUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") -#jif [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then -#j echo -e "${GREEN}✓ Duplicate detection working${NC}" -#jelse -#j echo -e "${RED}✗ Duplicate detection failed${NC}" -#j echo "Response: $DUPLICATE_RESPONSE" -#jfi -#j -#j# 5. Update the alarm (PUT) -#jprint_status "Updating alarm" -#jUPDATE_DATA=$(jq -n \ -#j --arg name "$TEST_ALARM_NAME Updated" \ -#j --arg time "$TEST_ALARM_TIME" \ -#j --argjson id "$NEW_ALARM_ID" \ -#j '{ -#j "id": $id, -#j "name": $name, -#j "time": $time, -#j "repeat_rule": { -#j "type": "daily" -#j }, -#j "enabled": true, -#j "snooze": { -#j "enabled": false, -#j "duration": 10, -#j "max_count": 2 -#j }, -#j "metadata": { -#j "volume": 90, -#j "notes": "Updated test alarm" -#j } -#j }' -#j) -#j -#jUPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL") -#jif [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then -#j echo -e "${GREEN}✓ Alarm update successful${NC}" -#jelse -#j echo -e "${RED}✗ Alarm update failed${NC}" -#j echo "Response: $UPDATE_RESPONSE" -#jfi -#j -#j# 6. Verify the update -#jprint_status "Verifying update" -#jUPDATED_STATE=$(curl -s -X GET "$API_URL") -#jUPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)") -#jif [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then -#j echo -e "${GREEN}✓ Update verification successful${NC}" -#jelse -#j echo -e "${RED}✗ Update verification failed${NC}" -#j echo "Current alarm state: $UPDATED_ALARM" -#jfi -#j -#j# 7. Test invalid inputs -#jprint_status "Testing invalid inputs" -#j -#j# Test invalid time format -#jINVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}') -#jINVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL") -#jif [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then -#j echo -e "${GREEN}✓ Invalid time format detection working${NC}" -#jelse -#j echo -e "${RED}✗ Invalid time format detection failed${NC}" -#jfi -#j -#j# Test invalid repeat rule -#jINVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"') -#jINVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL") -#jif [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then -#j echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}" -#jelse -#j echo -e "${RED}✗ Invalid repeat rule detection failed${NC}" -#jfi -#j -#j# 8. Delete the test alarm -#jprint_status "Deleting test alarm" -#jDELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL") -#jif [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then -#j echo -e "${GREEN}✓ Alarm deletion successful${NC}" -#jelse -#j echo -e "${RED}✗ Alarm deletion failed${NC}" -#j echo "Response: $DELETE_RESPONSE" -#jfi -#j -#j# 9. Verify deletion -#jprint_status "Verifying deletion" -#jFINAL_STATE=$(curl -s -X GET "$API_URL") -#jif [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then -#j echo -e "${GREEN}✓ Deletion verification successful${NC}" -#jelse -#j echo -e "${RED}✗ Deletion verification failed${NC}" -#j echo "Current state: $FINAL_STATE" -#jfi -#j -#jprint_status "Test suite completed!"