import logging import os import json import hashlib from typing import List, Optional, Dict, Any from datetime import datetime import re from dataclasses import asdict from data_classes import RepeatRule, Snooze, Metadata, Alarm # Set up logging configuration logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Console handler logging.FileHandler('alert_api.log') # File handler ] ) logger = logging.getLogger('AlertApi') class AlarmStorage: def __init__(self, file_path: str, siren=None): """Initialize AlarmStorage with optional Siren integration""" self.file_path = file_path self.siren = siren self._ensure_storage_directory() def _ensure_storage_directory(self) -> None: """Ensure the directory for the storage file exists""" directory = os.path.dirname(os.path.abspath(self.file_path)) if directory: os.makedirs(directory, exist_ok=True) @staticmethod def _generate_alarm_hash(alarm: dict) -> str: """Generate an MD5 hash for the given alarm data""" def to_dict(value): return asdict(value) if hasattr(value, '__dataclass_fields__') else value alarm_data = alarm.copy() alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) hash_input = json.dumps({ "name": alarm_data["name"], "time": alarm_data["time"], "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), "repeat_rule": alarm_data["repeat_rule"] }, sort_keys=True) return hashlib.md5(hash_input.encode('utf-8')).hexdigest() def load_data(self) -> dict: """Load the JSON data from the file""" try: with open(self.file_path, 'r') as file: data = json.load(file) if not isinstance(data, dict): raise ValueError("Invalid data format") return data except (FileNotFoundError, json.JSONDecodeError): return {"last_id": 0, "alarms": []} def save_data(self, data: dict) -> None: """Save the JSON data back to the file""" def convert_to_dict(obj): """Convert dataclass instances to dictionaries recursively""" if isinstance(obj, list): return [convert_to_dict(item) for item in obj] elif isinstance(obj, dict): return {key: convert_to_dict(value) for key, value in obj.items()} elif hasattr(obj, "__dataclass_fields__"): return asdict(obj) else: return obj # Convert data to JSON-serializable format serializable_data = convert_to_dict(data) with open(self.file_path, 'w') as file: json.dump(serializable_data, file, indent=4) def save_new_alert(self, alarm_data: dict) -> int: """ Add a new alarm to the storage and notify Siren if configured """ # Check for potential duplicates before saving existing_alarms = self.get_saved_alerts() for alarm in existing_alarms: # Compare key characteristics if (alarm['name'] == alarm_data['name'] and alarm['time'] == alarm_data['time'] and alarm['repeat_rule'] == alarm_data['repeat_rule']): raise ValueError("Duplicate alarm already exists") # Set default file_to_play if not provided alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) # Convert nested dictionaries to dataclass instances try: if 'repeat_rule' in alarm_data: alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) if 'snooze' in alarm_data: alarm_data['snooze'] = Snooze(**alarm_data['snooze']) if 'metadata' in alarm_data: alarm_data['metadata'] = Metadata(**alarm_data['metadata']) except TypeError as e: raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") # Validate input try: alarm = Alarm(**alarm_data) if not alarm.validate(): raise ValueError("Invalid alarm configuration") except (TypeError, ValueError) as e: raise ValueError(f"Invalid alarm data: {str(e)}") # Load existing data data = self.load_data() # Increment last_id and assign to new alarm data['last_id'] += 1 alarm_data['id'] = data['last_id'] # Generate hash for the alarm alarm_data['hash'] = self._generate_alarm_hash(alarm_data) # Check for duplicates if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]): raise ValueError("Duplicate alarm detected") # Add new alarm to the list data['alarms'].append(alarm_data) # Save updated data self.save_data(data) # Notify Siren if available if self.siren and alarm_data.get('enabled', True): try: self.siren.schedule_alarm(alarm_data) logger.info(f"Siren notified of new alarm: {alarm_data['id']}") except Exception as e: logger.error(f"Failed to notify Siren about new alarm: {e}") return data['last_id'] def get_saved_alerts(self) -> list: """Retrieve the list of saved alarms""" return self.load_data().get("alarms", []) def remove_saved_alert(self, alarm_id: int) -> bool: """Remove an alarm by its ID""" data = self.load_data() original_length = len(data["alarms"]) # Filter out the alarm with the given ID data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] if len(data["alarms"]) == original_length: return False # Save updated data self.save_data(data) # Notify Siren if available if self.siren: try: self.siren.dismiss_alarm(alarm_id) logger.info(f"Siren dismissed alarm: {alarm_id}") except Exception as e: logger.error(f"Failed to notify Siren about removed alarm: {e}") return True def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: """Update an existing alarm""" alarm_data["id"] = alarm_id # Set default file_to_play if not provided alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) # Convert nested dictionaries to dataclass instances try: if 'repeat_rule' in alarm_data: alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) if 'snooze' in alarm_data: alarm_data['snooze'] = Snooze(**alarm_data['snooze']) if 'metadata' in alarm_data: alarm_data['metadata'] = Metadata(**alarm_data['metadata']) except TypeError as e: raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") # Validate input try: alarm = Alarm(**alarm_data) if not alarm.validate(): raise ValueError("Invalid alarm configuration") except (TypeError, ValueError) as e: raise ValueError(f"Invalid alarm data: {str(e)}") # Load existing data data = self.load_data() # Find and update the alarm for i, existing_alarm in enumerate(data["alarms"]): if existing_alarm["id"] == alarm_id: # Generate new hash alarm_data["hash"] = self._generate_alarm_hash(alarm_data) # Replace the existing alarm data["alarms"][i] = alarm_data # Save updated data self.save_data(data) # Notify Siren if available if self.siren: try: # Remove existing alarm and reschedule self.siren.dismiss_alarm(alarm_id) if alarm_data.get('enabled', True): self.siren.schedule_alarm(alarm_data) logger.info(f"Siren updated for alarm: {alarm_id}") else: logger.info(f"Siren dismissed disabled alarm: {alarm_id}") except Exception as e: logger.error(f"Failed to notify Siren about updated alarm: {e}") return True # Alarm not found return False def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]: """ Retrieve a specific alarm by its ID Args: alarm_id (int): ID of the alarm to retrieve Returns: dict or None: Alarm configuration or None if not found """ data = self.load_data() for alarm in data.get('alarms', []): if alarm.get('id') == alarm_id: return alarm return None def set_siren(self, siren): """ Set or update the Siren instance for alarm synchronization Args: siren (AlarmSiren): Siren instance to use for alarm management """ self.siren = siren