From 35d1e76ba066c8cb268b1b305316be1d510912ec Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Fri, 24 Jan 2025 19:28:14 +0200 Subject: [PATCH] Made alarm_api more modular and easier to extend. --- alert_api/alarm_api.py | 170 +++++++++++++ alert_api/alarm_siren.py | 218 ++++++++++++++++ alert_api/alarm_storage.py | 263 ++++++++++++++++++++ alert_api/alert_api.py | 491 ------------------------------------- alert_api/data_classes.py | 180 ++++++++++++++ alert_api/main.py | 183 ++++++++++++++ 6 files changed, 1014 insertions(+), 491 deletions(-) create mode 100644 alert_api/alarm_api.py create mode 100644 alert_api/alarm_siren.py create mode 100644 alert_api/alarm_storage.py delete mode 100755 alert_api/alert_api.py create mode 100644 alert_api/data_classes.py create mode 100755 alert_api/main.py diff --git a/alert_api/alarm_api.py b/alert_api/alarm_api.py new file mode 100644 index 0000000..a3b1a86 --- /dev/null +++ b/alert_api/alarm_api.py @@ -0,0 +1,170 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer +import logging +import os +import json +import hashlib +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +from datetime import datetime +from http import HTTPStatus +import re + +from alarm_storage import AlarmStorage +from data_classes import RepeatRule, Snooze, Metadata, Alarm + +# Set up logging configuration +logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG to show all log levels + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] +) + +logger = logging.getLogger('AlertApi') + +class AlertApi(BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + self.storage = AlarmStorage("data/alerts.json") + self.logger = logging.getLogger('AlertApi') + super().__init__(*args, **kwargs) + + def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None: + """Send a JSON response with the given status code and data/error""" + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + response = {} + if data is not None: + response["data"] = data + if error is not None: + response["error"] = error + + response_json = json.dumps(response) + self.logger.debug(f"Sending response: {response_json}") + self.wfile.write(response_json.encode("utf-8")) + + def _handle_request(self, method: str) -> None: + """Handle incoming requests with proper error handling""" + self.logger.info(f"Received {method} request from {self.client_address[0]}") + + try: + if method in ["POST", "PUT", "DELETE"]: + content_length = int(self.headers.get("Content-Length", 0)) + if content_length == 0: + raise ValueError("Missing request body") + + post_data = self.rfile.read(content_length) + self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}") + + post_data = json.loads(post_data) + else: + post_data = None + + # Route request to appropriate handler + handler = getattr(self, f"_handle_{method.lower()}", None) + if handler: + self.logger.debug(f"Routing to handler: _handle_{method.lower()}") + handler(post_data) + else: + self.logger.warning(f"Method not allowed: {method}") + self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed") + + except json.JSONDecodeError as e: + self.logger.error(f"JSON decode error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body") + except ValueError as e: + self.logger.error(f"Validation error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + except Exception as e: + self.logger.error(f"Unexpected error: {str(e)}", exc_info=True) + self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error") + + def _handle_get(self, _) -> None: + """Handle GET request""" + self.logger.debug("Processing GET request for all alarms") + alarms = self.storage.get_saved_alerts() + self.logger.debug(f"Retrieved {len(alarms)} alarms") + self._send_response(HTTPStatus.OK, data=alarms) + + def _handle_post(self, data: dict) -> None: + """Handle POST request""" + self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}") + try: + alarm_id = self.storage.save_new_alert(data) + self.logger.info(f"Successfully created new alarm with ID: {alarm_id}") + self._send_response(HTTPStatus.CREATED, data={"id": alarm_id}) + except ValueError as e: + self.logger.error(f"Failed to create alarm: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + + def _handle_put(self, data: dict) -> None: + """Handle PUT request""" + alarm_id = data.pop("id", None) + self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}") + + if alarm_id is None: + self.logger.error("PUT request missing alarm ID") + self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID") + return + + if self.storage.update_alert(alarm_id, data): + self.logger.info(f"Successfully updated alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"}) + else: + self.logger.warning(f"Alarm not found for update: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def _handle_delete(self, data: dict) -> None: + """Handle DELETE request""" + alarm_id = data.get("id") + self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}") + + if not isinstance(alarm_id, int): + self.logger.error(f"Invalid alarm ID format: {alarm_id}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID") + return + + if self.storage.remove_saved_alert(alarm_id): + self.logger.info(f"Successfully deleted alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"}) + else: + self.logger.warning(f"Alarm not found for deletion: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def do_GET(self): self._handle_request("GET") + def do_POST(self): self._handle_request("POST") + def do_PUT(self): self._handle_request("PUT") + def do_DELETE(self): self._handle_request("DELETE") + + +def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): + # Set up logging configuration + logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG to show all log levels + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] + ) + + logger = logging.getLogger('AlertApi') + logger.info(f"Starting AlertApi on port {port}") + + server_address = ("", port) + httpd = server_class(server_address, handler_class) + + try: + logger.info("Server is ready to handle requests") + httpd.serve_forever() + except KeyboardInterrupt: + logger.info("Received shutdown signal") + except Exception as e: + logger.error(f"Server error: {str(e)}", exc_info=True) + finally: + httpd.server_close() + logger.info("Server stopped") diff --git a/alert_api/alarm_siren.py b/alert_api/alarm_siren.py new file mode 100644 index 0000000..52521de --- /dev/null +++ b/alert_api/alarm_siren.py @@ -0,0 +1,218 @@ +import os +import time +import threading +import subprocess +import queue +import logging +from datetime import datetime, timedelta +from typing import Optional, Dict, Any + +# Set up logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('alarm_siren.log') + ] +) +logger = logging.getLogger('AlarmSiren') + +class AlarmSiren: + def __init__(self): + # Communication queues + self.alarm_queue = queue.Queue() + self.control_queue = queue.Queue() + + # Tracking active alarms + self.active_alarms: Dict[int, Dict[str, Any]] = {} + + # Playback thread + self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True) + self.playback_thread.start() + + def schedule_alarm(self, alarm_config: Dict[str, Any]): + """Schedule an alarm based on its configuration""" + logger.info(f"Scheduling alarm: {alarm_config}") + self.alarm_queue.put(alarm_config) + + def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]: + """Calculate the next alarm trigger time based on repeat rule""" + now = datetime.now() + current_time = now.time() + + # Parse alarm time + alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time() + + # Determine the next trigger + if alarm_config['repeat_rule']['type'] == 'once': + # For one-time alarm, check the specific date + try: + specific_date = datetime.strptime(alarm_config['repeat_rule']['at'], "%d.%m.%Y") + return datetime.combine(specific_date.date(), alarm_time) + except (KeyError, ValueError): + logger.error("Invalid one-time alarm configuration") + return None + + elif alarm_config['repeat_rule']['type'] == 'daily': + # Daily alarm - trigger today or tomorrow + next_trigger = datetime.combine(now.date(), alarm_time) + if current_time < alarm_time: + return next_trigger + return next_trigger + timedelta(days=1) + + elif alarm_config['repeat_rule']['type'] == 'weekly': + # Weekly alarm - check configured days + today = now.strftime("%A").lower() + configured_days = [day.lower() for day in alarm_config['repeat_rule'].get('days_of_week', [])] + + if today in configured_days and current_time < alarm_time: + return datetime.combine(now.date(), alarm_time) + + # Find next configured day + days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'] + current_index = days_order.index(today) + + for offset in range(1, 8): + next_day_index = (current_index + offset) % 7 + next_day = days_order[next_day_index] + + if next_day in configured_days: + next_date = now.date() + timedelta(days=offset) + return datetime.combine(next_date, alarm_time) + + return None + + def _play_audio(self, file_path: str, volume: int = 100): + """Play audio file using mpg123""" + try: + # Ensure the file exists + if not os.path.exists(file_path): + logger.error(f"Audio file not found: {file_path}") + return False + + # Construct mpg123 command with volume control + volume_adjust = f"-g {volume}" + cmd = ["mpg123", volume_adjust, file_path] + + logger.info(f"Playing alarm: {file_path}") + + # Track the process for potential interruption + process = subprocess.Popen(cmd) + return process + except Exception as e: + logger.error(f"Error playing audio: {e}") + return False + + def _playback_worker(self): + """Background thread for managing alarm playback""" + while True: + try: + # Check for new alarms to schedule + try: + new_alarm = self.alarm_queue.get(timeout=1) + alarm_time = self._calculate_next_alarm_time(new_alarm) + if alarm_time: + self.active_alarms[new_alarm.get('id', id(new_alarm))] = { + 'config': new_alarm, + 'trigger_time': alarm_time, + 'snooze_count': 0 + } + except queue.Empty: + pass + + # Check for control signals (snooze/dismiss) + try: + control_msg = self.control_queue.get(timeout=0.1) + # Handle control message logic + except queue.Empty: + pass + + # Check for alarms to trigger + now = datetime.now() + for alarm_id, alarm_info in list(self.active_alarms.items()): + if now >= alarm_info['trigger_time']: + # Trigger alarm + process = self._play_audio( + alarm_info['config']['file_to_play'], + alarm_info['config'].get('metadata', {}).get('volume', 100) + ) + + # Handle repeat and snooze logic + if process: + # Wait for user interaction or timeout + # In a real implementation, this would be more sophisticated + time.sleep(30) # Placeholder for user interaction + + # Determine next trigger based on repeat rule + next_trigger = self._calculate_next_alarm_time(alarm_info['config']) + if next_trigger: + alarm_info['trigger_time'] = next_trigger + else: + del self.active_alarms[alarm_id] + + time.sleep(1) # Prevent tight loop + except Exception as e: + logger.error(f"Error in playback worker: {e}") + time.sleep(1) + + def snooze_alarm(self, alarm_id: int): + """Snooze a specific alarm""" + if alarm_id in self.active_alarms: + alarm_config = self.active_alarms[alarm_id]['config'] + snooze_config = alarm_config.get('snooze', {'enabled': True, 'duration': 10, 'max_count': 3}) + + if (snooze_config['enabled'] and + self.active_alarms[alarm_id]['snooze_count'] < snooze_config['max_count']): + + # Increment snooze count + self.active_alarms[alarm_id]['snooze_count'] += 1 + + # Set next trigger time + snooze_duration = snooze_config.get('duration', 10) + self.active_alarms[alarm_id]['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration) + + logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes") + else: + logger.warning(f"Cannot snooze alarm {alarm_id} - max snooze count reached") + + def dismiss_alarm(self, alarm_id: int): + """Dismiss a specific alarm""" + if alarm_id in self.active_alarms: + logger.info(f"Dismissed alarm {alarm_id}") + del self.active_alarms[alarm_id] + +def main(): + """Example usage of AlarmSiren""" + siren = AlarmSiren() + + # Example alarm configuration + sample_alarm = { + 'id': 1, + 'name': 'Morning Alarm', + 'time': '07:00:00', + 'file_to_play': '/path/to/alarm.mp3', + 'repeat_rule': { + 'type': 'daily' + }, + 'snooze': { + 'enabled': True, + 'duration': 10, + 'max_count': 3 + }, + 'metadata': { + 'volume': 80 + } + } + + siren.schedule_alarm(sample_alarm) + + # Keep main thread alive (in a real app, this would be handled differently) + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("Alarm siren stopped") + +if __name__ == "__main__": + main() diff --git a/alert_api/alarm_storage.py b/alert_api/alarm_storage.py new file mode 100644 index 0000000..90dc2ba --- /dev/null +++ b/alert_api/alarm_storage.py @@ -0,0 +1,263 @@ +import logging +import os +import json +import hashlib +from typing import List, Optional, Dict, Any +from datetime import datetime +import re +from dataclasses import asdict + +from data_classes import RepeatRule, Snooze, Metadata, Alarm + +# Set up logging configuration +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] +) + +logger = logging.getLogger('AlertApi') + +class AlarmStorage: + def __init__(self, file_path: str, siren=None): + """Initialize AlarmStorage with optional Siren integration""" + self.file_path = file_path + self.siren = siren + self._ensure_storage_directory() + + def _ensure_storage_directory(self) -> None: + """Ensure the directory for the storage file exists""" + directory = os.path.dirname(os.path.abspath(self.file_path)) + if directory: + os.makedirs(directory, exist_ok=True) + + @staticmethod + def _generate_alarm_hash(alarm: dict) -> str: + """Generate an MD5 hash for the given alarm data""" + def to_dict(value): + return asdict(value) if hasattr(value, '__dataclass_fields__') else value + + alarm_data = alarm.copy() + alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) + alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) + alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) + + hash_input = json.dumps({ + "name": alarm_data["name"], + "time": alarm_data["time"], + "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), + "repeat_rule": alarm_data["repeat_rule"] + }, sort_keys=True) + return hashlib.md5(hash_input.encode('utf-8')).hexdigest() + + def load_data(self) -> dict: + """Load the JSON data from the file""" + try: + with open(self.file_path, 'r') as file: + data = json.load(file) + if not isinstance(data, dict): + raise ValueError("Invalid data format") + return data + except (FileNotFoundError, json.JSONDecodeError): + return {"last_id": 0, "alarms": []} + + def save_data(self, data: dict) -> None: + """Save the JSON data back to the file""" + def convert_to_dict(obj): + """Convert dataclass instances to dictionaries recursively""" + if isinstance(obj, list): + return [convert_to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {key: convert_to_dict(value) for key, value in obj.items()} + elif hasattr(obj, "__dataclass_fields__"): + return asdict(obj) + else: + return obj + + # Convert data to JSON-serializable format + serializable_data = convert_to_dict(data) + + with open(self.file_path, 'w') as file: + json.dump(serializable_data, file, indent=4) + + def save_new_alert(self, alarm_data: dict) -> int: + """ + Add a new alarm to the storage and notify Siren if configured + """ + # Check for potential duplicates before saving + existing_alarms = self.get_saved_alerts() + for alarm in existing_alarms: + # Compare key characteristics + if (alarm['name'] == alarm_data['name'] and + alarm['time'] == alarm_data['time'] and + alarm['repeat_rule'] == alarm_data['repeat_rule']): + raise ValueError("Duplicate alarm already exists") + + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + # Load existing data + data = self.load_data() + + # Increment last_id and assign to new alarm + data['last_id'] += 1 + alarm_data['id'] = data['last_id'] + + # Generate hash for the alarm + alarm_data['hash'] = self._generate_alarm_hash(alarm_data) + + # Check for duplicates + if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]): + raise ValueError("Duplicate alarm detected") + + # Add new alarm to the list + data['alarms'].append(alarm_data) + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren and alarm_data.get('enabled', True): + try: + self.siren.schedule_alarm(alarm_data) + logger.info(f"Siren notified of new alarm: {alarm_data['id']}") + except Exception as e: + logger.error(f"Failed to notify Siren about new alarm: {e}") + + return data['last_id'] + + def get_saved_alerts(self) -> list: + """Retrieve the list of saved alarms""" + return self.load_data().get("alarms", []) + + def remove_saved_alert(self, alarm_id: int) -> bool: + """Remove an alarm by its ID""" + data = self.load_data() + original_length = len(data["alarms"]) + + # Filter out the alarm with the given ID + data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] + + if len(data["alarms"]) == original_length: + return False + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren: + try: + self.siren.dismiss_alarm(alarm_id) + logger.info(f"Siren dismissed alarm: {alarm_id}") + except Exception as e: + logger.error(f"Failed to notify Siren about removed alarm: {e}") + + return True + + def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: + """Update an existing alarm""" + alarm_data["id"] = alarm_id + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + # Load existing data + data = self.load_data() + + # Find and update the alarm + for i, existing_alarm in enumerate(data["alarms"]): + if existing_alarm["id"] == alarm_id: + # Generate new hash + alarm_data["hash"] = self._generate_alarm_hash(alarm_data) + + # Replace the existing alarm + data["alarms"][i] = alarm_data + + # Save updated data + self.save_data(data) + + # Notify Siren if available + if self.siren: + try: + # Remove existing alarm and reschedule + self.siren.dismiss_alarm(alarm_id) + + if alarm_data.get('enabled', True): + self.siren.schedule_alarm(alarm_data) + logger.info(f"Siren updated for alarm: {alarm_id}") + else: + logger.info(f"Siren dismissed disabled alarm: {alarm_id}") + except Exception as e: + logger.error(f"Failed to notify Siren about updated alarm: {e}") + + return True + + # Alarm not found + return False + + def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]: + """ + Retrieve a specific alarm by its ID + + Args: + alarm_id (int): ID of the alarm to retrieve + + Returns: + dict or None: Alarm configuration or None if not found + """ + data = self.load_data() + for alarm in data.get('alarms', []): + if alarm.get('id') == alarm_id: + return alarm + return None + + def set_siren(self, siren): + """ + Set or update the Siren instance for alarm synchronization + + Args: + siren (AlarmSiren): Siren instance to use for alarm management + """ + self.siren = siren diff --git a/alert_api/alert_api.py b/alert_api/alert_api.py deleted file mode 100755 index 2440c97..0000000 --- a/alert_api/alert_api.py +++ /dev/null @@ -1,491 +0,0 @@ -#!/usr/bin/python3 - -from http.server import BaseHTTPRequestHandler, HTTPServer -import logging -import os -import json -import hashlib -from dataclasses import dataclass, field, asdict -from typing import List, Optional, Dict, Any -from datetime import datetime -from http import HTTPStatus -import re - -# Set up logging configuration -logging.basicConfig( - level=logging.DEBUG, # Set to DEBUG to show all log levels - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(), # Console handler - logging.FileHandler('alert_api.log') # File handler - ] -) - -logger = logging.getLogger('AlertApi') - - -@dataclass -class RepeatRule: - type: str # "daily" or "weekly" or "once" - days_of_week: Optional[List[str]] = field(default_factory=list) - at: Optional[str] = None - - def validate(self) -> bool: - """Validate repeat rule configuration""" - valid_types = {"daily", "weekly", "once"} - valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} - - if self.type not in valid_types: - logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") - return False - - if self.type == "weekly": - invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] - if invalid_days: - logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") - return False - if not self.days_of_week: - logger.error("Weekly repeat rule must specify at least one day") - return False - - if self.type == "once" and self.days_of_week: - if self.days_of_week: - logger.error("One-time alert does not support days_of_week") - return False - if not self.at: - logger.error("One-time repeat rule must specify a valid 'at' date") - return False - try: - datetime.strptime(self.at, "%d.%m.%Y") - except ValueError: - logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'") - return False - - logger.debug(f"RepeatRule validation passed: {self.__dict__}") - return True - - -@dataclass -class Snooze: - enabled: bool = True - duration: int = 10 # minutes - max_count: int = 3 - - def validate(self) -> bool: - """Validate snooze configuration""" - if not isinstance(self.enabled, bool): - logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}") - return False - - if not isinstance(self.duration, int): - logger.error(f"Snooze duration must be integer, got {type(self.duration)}") - return False - - if self.duration <= 0: - logger.error(f"Snooze duration must be positive, got {self.duration}") - return False - - if not isinstance(self.max_count, int): - logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}") - return False - - if self.max_count <= 0: - logger.error(f"Snooze max_count must be positive, got {self.max_count}") - return False - - logger.debug(f"Snooze validation passed: {self.__dict__}") - return True - - -@dataclass -class Metadata: - volume: int = 100 - notes: str = "" - md5sum: str = "" - - def validate(self) -> bool: - """Validate metadata configuration""" - if not isinstance(self.volume, int): - logger.error(f"Volume must be integer, got {type(self.volume)}") - return False - - if not 0 <= self.volume <= 100: - logger.error(f"Volume must be between 0 and 100, got {self.volume}") - return False - - logger.debug(f"Metadata validation passed: {self.__dict__}") - return True - - -@dataclass -class Alarm: - name: str - time: str # Format: "HH:MM:SS" - repeat_rule: RepeatRule - file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - enabled: bool = field(default=True) - snooze: Snooze = field(default_factory=Snooze) - metadata: Metadata = field(default_factory=Metadata) - id: Optional[int] = field(default=None) - - def validate(self) -> bool: - """Validate complete alarm configuration""" - logger.debug(f"Starting validation for alarm: {self.name}") - - # Validate name - if not isinstance(self.name, str) or len(self.name.strip()) == 0: - logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string") - return False - - # Validate time format - time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$') - if not time_pattern.match(self.time): - logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS") - return False - - # Validate enabled flag - if not isinstance(self.enabled, bool): - logger.error(f"Enabled must be boolean, got {type(self.enabled)}") - return False - - # Create default alarms directory if it doesn't exist - default_alarms_dir = os.path.expanduser("~/.alarms") - os.makedirs(default_alarms_dir, exist_ok=True) - - # Validate audio file - if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play): - logger.error(f"Audio file not found: '{self.file_to_play}'") - return False - - # Validate repeat rule - if not isinstance(self.repeat_rule, RepeatRule): - logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}") - return False - if not self.repeat_rule.validate(): - logger.error("Repeat rule validation failed") - return False - - # Validate snooze - if not isinstance(self.snooze, Snooze): - logger.error(f"Invalid snooze type: {type(self.snooze)}") - return False - if not self.snooze.validate(): - logger.error("Snooze validation failed") - return False - - # Validate metadata - if not isinstance(self.metadata, Metadata): - logger.error(f"Invalid metadata type: {type(self.metadata)}") - return False - if not self.metadata.validate(): - logger.error("Metadata validation failed") - return False - - logger.debug(f"Alarm validation passed: {self.name}") - return True - -class StorageHandler: - def __init__(self, file_path: str): - self.file_path = file_path - self._ensure_storage_directory() - - def _ensure_storage_directory(self) -> None: - """Ensure the directory for the storage file exists""" - directory = os.path.dirname(os.path.abspath(self.file_path)) - if directory: - os.makedirs(directory, exist_ok=True) - - @staticmethod - def _generate_alarm_hash(alarm: dict) -> str: - """Generate an MD5 hash for the given alarm data""" - # Convert dataclass instances to dictionaries if needed - def to_dict(value): - return asdict(value) if hasattr(value, '__dataclass_fields__') else value - - alarm_data = alarm.copy() - alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) - alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) - alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) - - hash_input = json.dumps({ - "name": alarm_data["name"], - "time": alarm_data["time"], - "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), - "repeat_rule": alarm_data["repeat_rule"] - }, sort_keys=True) - return hashlib.md5(hash_input.encode('utf-8')).hexdigest() - - def load_data(self) -> dict: - """Load the JSON data from the file""" - try: - with open(self.file_path, 'r') as file: - data = json.load(file) - if not isinstance(data, dict): - raise ValueError("Invalid data format") - return data - except (FileNotFoundError, json.JSONDecodeError): - return {"last_id": 0, "alarms": []} - - def save_data(self, data: dict) -> None: - """Save the JSON data back to the file""" - def convert_to_dict(obj): - """Convert dataclass instances to dictionaries recursively""" - if isinstance(obj, list): - return [convert_to_dict(item) for item in obj] - elif isinstance(obj, dict): - return {key: convert_to_dict(value) for key, value in obj.items()} - elif hasattr(obj, "__dataclass_fields__"): - return asdict(obj) - else: - return obj - - # Convert data to JSON-serializable format - serializable_data = convert_to_dict(data) - - with open(self.file_path, 'w') as file: - json.dump(serializable_data, file, indent=4) - - def save_new_alert(self, alarm_data: dict) -> int: - """Add a new alarm to the storage""" - # Remove id if present in input data - alarm_data.pop('id', None) - - # Set default file_to_play if not provided - alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - - # Convert nested dictionaries to dataclass instances - try: - if 'repeat_rule' in alarm_data: - alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) - if 'snooze' in alarm_data: - alarm_data['snooze'] = Snooze(**alarm_data['snooze']) - if 'metadata' in alarm_data: - alarm_data['metadata'] = Metadata(**alarm_data['metadata']) - except TypeError as e: - raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") - - # Validate input - try: - alarm = Alarm(**alarm_data) - if not alarm.validate(): - raise ValueError("Invalid alarm configuration") - except (TypeError, ValueError) as e: - raise ValueError(f"Invalid alarm data: {str(e)}") - - # Generate hash and check for duplicates - alarm_hash = self._generate_alarm_hash(alarm_data) - data = self.load_data() - - if any(a.get("hash") == alarm_hash for a in data["alarms"]): - raise ValueError("Duplicate alarm detected") - - # Add new alarm - alarm_data["id"] = data.get("last_id", 0) + 1 - alarm_data["hash"] = alarm_hash - data["last_id"] = alarm_data["id"] - data["alarms"].append(alarm_data) - - self.save_data(data) - return alarm_data["id"] - - def get_saved_alerts(self) -> list: - """Retrieve the list of saved alarms""" - return self.load_data().get("alarms", []) - - def remove_saved_alert(self, alarm_id: int) -> bool: - """Remove an alarm by its ID""" - data = self.load_data() - original_length = len(data["alarms"]) - data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] - - if len(data["alarms"]) == original_length: - return False - - self.save_data(data) - return True - - def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: - """Update an existing alarm""" - alarm_data["id"] = alarm_id - - # Set default file_to_play if not provided - alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) - - # Convert nested dictionaries to dataclass instances - try: - if 'repeat_rule' in alarm_data: - alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) - if 'snooze' in alarm_data: - alarm_data['snooze'] = Snooze(**alarm_data['snooze']) - if 'metadata' in alarm_data: - alarm_data['metadata'] = Metadata(**alarm_data['metadata']) - except TypeError as e: - raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") - - # Validate input - try: - alarm = Alarm(**alarm_data) - if not alarm.validate(): - raise ValueError("Invalid alarm configuration") - except (TypeError, ValueError) as e: - raise ValueError(f"Invalid alarm data: {str(e)}") - - data = self.load_data() - for i, existing_alarm in enumerate(data["alarms"]): - if existing_alarm["id"] == alarm_id: - alarm_data["hash"] = self._generate_alarm_hash(alarm_data) - data["alarms"][i] = alarm_data - self.save_data(data) - return True - return False - - -class AlertApi(BaseHTTPRequestHandler): - def __init__(self, *args, **kwargs): - self.storage = StorageHandler("data/alerts.json") - self.logger = logging.getLogger('AlertApi') - super().__init__(*args, **kwargs) - - def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None: - """Send a JSON response with the given status code and data/error""" - self.send_response(status_code) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - - response = {} - if data is not None: - response["data"] = data - if error is not None: - response["error"] = error - - response_json = json.dumps(response) - self.logger.debug(f"Sending response: {response_json}") - self.wfile.write(response_json.encode("utf-8")) - - def _handle_request(self, method: str) -> None: - """Handle incoming requests with proper error handling""" - self.logger.info(f"Received {method} request from {self.client_address[0]}") - - try: - if method in ["POST", "PUT", "DELETE"]: - content_length = int(self.headers.get("Content-Length", 0)) - if content_length == 0: - raise ValueError("Missing request body") - - post_data = self.rfile.read(content_length) - self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}") - - post_data = json.loads(post_data) - else: - post_data = None - - # Route request to appropriate handler - handler = getattr(self, f"_handle_{method.lower()}", None) - if handler: - self.logger.debug(f"Routing to handler: _handle_{method.lower()}") - handler(post_data) - else: - self.logger.warning(f"Method not allowed: {method}") - self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed") - - except json.JSONDecodeError as e: - self.logger.error(f"JSON decode error: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body") - except ValueError as e: - self.logger.error(f"Validation error: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) - except Exception as e: - self.logger.error(f"Unexpected error: {str(e)}", exc_info=True) - self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error") - - def _handle_get(self, _) -> None: - """Handle GET request""" - self.logger.debug("Processing GET request for all alarms") - alarms = self.storage.get_saved_alerts() - self.logger.debug(f"Retrieved {len(alarms)} alarms") - self._send_response(HTTPStatus.OK, data=alarms) - - def _handle_post(self, data: dict) -> None: - """Handle POST request""" - self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}") - try: - alarm_id = self.storage.save_new_alert(data) - self.logger.info(f"Successfully created new alarm with ID: {alarm_id}") - self._send_response(HTTPStatus.CREATED, data={"id": alarm_id}) - except ValueError as e: - self.logger.error(f"Failed to create alarm: {str(e)}") - self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) - - def _handle_put(self, data: dict) -> None: - """Handle PUT request""" - alarm_id = data.pop("id", None) - self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}") - - if alarm_id is None: - self.logger.error("PUT request missing alarm ID") - self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID") - return - - if self.storage.update_alert(alarm_id, data): - self.logger.info(f"Successfully updated alarm ID: {alarm_id}") - self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"}) - else: - self.logger.warning(f"Alarm not found for update: {alarm_id}") - self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") - - def _handle_delete(self, data: dict) -> None: - """Handle DELETE request""" - alarm_id = data.get("id") - self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}") - - if not isinstance(alarm_id, int): - self.logger.error(f"Invalid alarm ID format: {alarm_id}") - self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID") - return - - if self.storage.remove_saved_alert(alarm_id): - self.logger.info(f"Successfully deleted alarm ID: {alarm_id}") - self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"}) - else: - self.logger.warning(f"Alarm not found for deletion: {alarm_id}") - self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") - - def do_GET(self): self._handle_request("GET") - def do_POST(self): self._handle_request("POST") - def do_PUT(self): self._handle_request("PUT") - def do_DELETE(self): self._handle_request("DELETE") - - -def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): - # Set up logging configuration - logging.basicConfig( - level=logging.DEBUG, # Set to DEBUG to show all log levels - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(), # Console handler - logging.FileHandler('alert_api.log') # File handler - ] - ) - - logger = logging.getLogger('AlertApi') - logger.info(f"Starting AlertApi on port {port}") - - server_address = ("", port) - httpd = server_class(server_address, handler_class) - - try: - logger.info("Server is ready to handle requests") - httpd.serve_forever() - except KeyboardInterrupt: - logger.info("Received shutdown signal") - except Exception as e: - logger.error(f"Server error: {str(e)}", exc_info=True) - finally: - httpd.server_close() - logger.info("Server stopped") - - -if __name__ == "__main__": - from sys import argv - run(port=int(argv[1]) if len(argv) == 2 else 8000) diff --git a/alert_api/data_classes.py b/alert_api/data_classes.py new file mode 100644 index 0000000..6273d73 --- /dev/null +++ b/alert_api/data_classes.py @@ -0,0 +1,180 @@ +import hashlib +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +from datetime import datetime +import os +import re +import logging + +# Set up logging configuration +logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG to show all log levels + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] +) + +logger = logging.getLogger('AlertApi') + + +@dataclass +class RepeatRule: + type: str # "daily" or "weekly" or "once" + days_of_week: Optional[List[str]] = field(default_factory=list) + at: Optional[str] = None + + def validate(self) -> bool: + """Validate repeat rule configuration""" + valid_types = {"daily", "weekly", "once"} + valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} + + if self.type not in valid_types: + logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") + return False + + if self.type == "weekly": + invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] + if invalid_days: + logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") + return False + if not self.days_of_week: + logger.error("Weekly repeat rule must specify at least one day") + return False + + if self.type == "once" and self.days_of_week: + if self.days_of_week: + logger.error("One-time alert does not support days_of_week") + return False + if not self.at: + logger.error("One-time repeat rule must specify a valid 'at' date") + return False + try: + datetime.strptime(self.at, "%d.%m.%Y") + except ValueError: + logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'") + return False + + logger.debug(f"RepeatRule validation passed: {self.__dict__}") + return True + + +@dataclass +class Snooze: + enabled: bool = True + duration: int = 10 # minutes + max_count: int = 3 + + def validate(self) -> bool: + """Validate snooze configuration""" + if not isinstance(self.enabled, bool): + logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}") + return False + + if not isinstance(self.duration, int): + logger.error(f"Snooze duration must be integer, got {type(self.duration)}") + return False + + if self.duration <= 0: + logger.error(f"Snooze duration must be positive, got {self.duration}") + return False + + if not isinstance(self.max_count, int): + logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}") + return False + + if self.max_count <= 0: + logger.error(f"Snooze max_count must be positive, got {self.max_count}") + return False + + logger.debug(f"Snooze validation passed: {self.__dict__}") + return True + + +@dataclass +class Metadata: + volume: int = 100 + notes: str = "" + md5sum: str = "" + + def validate(self) -> bool: + """Validate metadata configuration""" + if not isinstance(self.volume, int): + logger.error(f"Volume must be integer, got {type(self.volume)}") + return False + + if not 0 <= self.volume <= 100: + logger.error(f"Volume must be between 0 and 100, got {self.volume}") + return False + + logger.debug(f"Metadata validation passed: {self.__dict__}") + return True + + +@dataclass +class Alarm: + name: str + time: str # Format: "HH:MM:SS" + repeat_rule: RepeatRule + file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + enabled: bool = field(default=True) + snooze: Snooze = field(default_factory=Snooze) + metadata: Metadata = field(default_factory=Metadata) + id: Optional[int] = field(default=None) + + def validate(self) -> bool: + """Validate complete alarm configuration""" + logger.debug(f"Starting validation for alarm: {self.name}") + + # Validate name + if not isinstance(self.name, str) or len(self.name.strip()) == 0: + logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string") + return False + + # Validate time format + time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$') + if not time_pattern.match(self.time): + logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS") + return False + + # Validate enabled flag + if not isinstance(self.enabled, bool): + logger.error(f"Enabled must be boolean, got {type(self.enabled)}") + return False + + # Create default alarms directory if it doesn't exist + default_alarms_dir = os.path.expanduser("~/.alarms") + os.makedirs(default_alarms_dir, exist_ok=True) + + # Validate audio file + if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play): + logger.error(f"Audio file not found: '{self.file_to_play}'") + return False + + # Validate repeat rule + if not isinstance(self.repeat_rule, RepeatRule): + logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}") + return False + if not self.repeat_rule.validate(): + logger.error("Repeat rule validation failed") + return False + + # Validate snooze + if not isinstance(self.snooze, Snooze): + logger.error(f"Invalid snooze type: {type(self.snooze)}") + return False + if not self.snooze.validate(): + logger.error("Snooze validation failed") + return False + + # Validate metadata + if not isinstance(self.metadata, Metadata): + logger.error(f"Invalid metadata type: {type(self.metadata)}") + return False + if not self.metadata.validate(): + logger.error("Metadata validation failed") + return False + + logger.debug(f"Alarm validation passed: {self.name}") + return True diff --git a/alert_api/main.py b/alert_api/main.py new file mode 100755 index 0000000..6904ad0 --- /dev/null +++ b/alert_api/main.py @@ -0,0 +1,183 @@ +#!/usr/bin/python3 + +import os +import sys +import signal +import threading +import logging +from http.server import HTTPServer +from multiprocessing import Queue + +# Import our custom modules +from alarm_api import AlertApi, run as run_api +from alarm_storage import AlarmStorage +from alarm_siren import AlarmSiren + +# Set up logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('alarm_system.log') + ] +) +logger = logging.getLogger('AlarmSystem') + +class AlarmSystemManager: + def __init__(self, + api_port: int = 8000, + storage_path: str = "data/alerts.json", + log_level: int = logging.DEBUG): + """ + Initialize and manage the entire alarm system + + Args: + api_port (int): Port for the HTTP API server + storage_path (str): Path to the JSON storage file + log_level (int): Logging level + """ + # Configure logging + logging.getLogger().setLevel(log_level) + + # Ensure storage directory exists + os.makedirs(os.path.dirname(os.path.abspath(storage_path)), exist_ok=True) + + # Initialize components + self.siren = AlarmSiren() + self.storage = AlarmStorage(storage_path, siren=self.siren) + + + # API server setup + self.api_port = api_port + self.api_server = None + self.api_thread = None + + # Synchronization and lifecycle management + self.stop_event = threading.Event() + + # Signal handling setup + self._setup_signal_handlers() + + # Alarm synchronization + self._sync_alarms() + + def _setup_signal_handlers(self): + """Set up signal handlers for graceful shutdown""" + signal.signal(signal.SIGINT, self._handle_shutdown) + signal.signal(signal.SIGTERM, self._handle_shutdown) + + def _handle_shutdown(self, signum, frame): + """ + Handle system shutdown signals + + Args: + signum (int): Signal number + frame (frame): Current stack frame + """ + logger.info(f"Received shutdown signal {signum}. Initiating graceful shutdown...") + self.stop_event.set() + + def _sync_alarms(self): + """ + Synchronize stored alarms with the siren + This ensures any saved alarms are loaded and scheduled + """ + try: + saved_alarms = self.storage.get_saved_alerts() + logger.info(f"Synchronizing {len(saved_alarms)} saved alarms") + + for alarm in saved_alarms: + if alarm.get('enabled', True): + self.siren.schedule_alarm(alarm) + except Exception as e: + logger.error(f"Error synchronizing alarms: {e}") + + def _start_api_server(self): + """ + Start the HTTP API server in a separate thread + """ + try: + def run_server(): + server_address = ("", self.api_port) + self.api_server = HTTPServer(server_address, AlertApi) + logger.info(f"API Server started on port {self.api_port}") + + # Run until stopped + while not self.stop_event.is_set(): + self.api_server.handle_request() + + self.api_server.server_close() + logger.info("API Server stopped") + + # Start API server in a thread + self.api_thread = threading.Thread(target=run_server, daemon=True) + self.api_thread.start() + except Exception as e: + logger.error(f"Failed to start API server: {e}") + self.stop_event.set() + + def run(self): + """ + Main run method to start all system components + """ + try: + # Start API server + self._start_api_server() + + # Log system startup + logger.info("Alarm System started successfully") + + # Wait for shutdown signal + while not self.stop_event.is_set(): + self.stop_event.wait(timeout=1) + + # Graceful shutdown + logger.info("Initiating system shutdown...") + except Exception as e: + logger.error(f"System startup error: {e}") + finally: + self._cleanup() + + def _cleanup(self): + """ + Perform cleanup operations during shutdown + """ + # Close API server if running + if self.api_server: + try: + self.api_server.server_close() + except Exception as e: + logger.error(f"Error closing API server: {e}") + + # Stop threads + if self.api_thread and self.api_thread.is_alive(): + self.api_thread.join(timeout=2) + + logger.info("Alarm System shutdown complete") + +def main(): + """ + Entry point for the Alarm System + Parse command-line arguments and start the system + """ + # Default configurations + default_port = 8000 + default_storage_path = "data/alerts.json" + + # Parse port from command line if provided + port = int(sys.argv[1]) if len(sys.argv) > 1 else default_port + + # Create and run the Alarm System + try: + alarm_system = AlarmSystemManager( + api_port=port, + storage_path=default_storage_path + ) + alarm_system.run() + except Exception as e: + logger.error(f"Fatal error starting Alarm System: {e}", exc_info=True) + sys.exit(1) + +if __name__ == "__main__": + main()