eee_alarm_clock/clock/alarm_storage.py

264 lines
9.4 KiB
Python
Raw Permalink Normal View History

import logging
import os
import json
import hashlib
from typing import List, Optional, Dict, Any
from datetime import datetime
import re
from dataclasses import asdict
from data_classes import RepeatRule, Snooze, Metadata, Alarm
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
class AlarmStorage:
def __init__(self, file_path: str, siren=None):
"""Initialize AlarmStorage with optional Siren integration"""
self.file_path = file_path
self.siren = siren
self._ensure_storage_directory()
def _ensure_storage_directory(self) -> None:
"""Ensure the directory for the storage file exists"""
directory = os.path.dirname(os.path.abspath(self.file_path))
if directory:
os.makedirs(directory, exist_ok=True)
@staticmethod
def _generate_alarm_hash(alarm: dict) -> str:
"""Generate an MD5 hash for the given alarm data"""
def to_dict(value):
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
alarm_data = alarm.copy()
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
hash_input = json.dumps({
"name": alarm_data["name"],
"time": alarm_data["time"],
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
"repeat_rule": alarm_data["repeat_rule"]
}, sort_keys=True)
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
def load_data(self) -> dict:
"""Load the JSON data from the file"""
try:
with open(self.file_path, 'r') as file:
data = json.load(file)
if not isinstance(data, dict):
raise ValueError("Invalid data format")
return data
except (FileNotFoundError, json.JSONDecodeError):
return {"last_id": 0, "alarms": []}
def save_data(self, data: dict) -> None:
"""Save the JSON data back to the file"""
def convert_to_dict(obj):
"""Convert dataclass instances to dictionaries recursively"""
if isinstance(obj, list):
return [convert_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {key: convert_to_dict(value) for key, value in obj.items()}
elif hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
else:
return obj
# Convert data to JSON-serializable format
serializable_data = convert_to_dict(data)
with open(self.file_path, 'w') as file:
json.dump(serializable_data, file, indent=4)
def save_new_alert(self, alarm_data: dict) -> int:
"""
Add a new alarm to the storage and notify Siren if configured
"""
# Check for potential duplicates before saving
existing_alarms = self.get_saved_alerts()
for alarm in existing_alarms:
# Compare key characteristics
if (alarm['name'] == alarm_data['name'] and
alarm['time'] == alarm_data['time'] and
alarm['repeat_rule'] == alarm_data['repeat_rule']):
raise ValueError("Duplicate alarm already exists")
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
# Load existing data
data = self.load_data()
# Increment last_id and assign to new alarm
data['last_id'] += 1
alarm_data['id'] = data['last_id']
# Generate hash for the alarm
alarm_data['hash'] = self._generate_alarm_hash(alarm_data)
# Check for duplicates
if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]):
raise ValueError("Duplicate alarm detected")
# Add new alarm to the list
data['alarms'].append(alarm_data)
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren and alarm_data.get('enabled', True):
try:
self.siren.schedule_alarm(alarm_data)
logger.info(f"Siren notified of new alarm: {alarm_data['id']}")
except Exception as e:
logger.error(f"Failed to notify Siren about new alarm: {e}")
return data['last_id']
def get_saved_alerts(self) -> list:
"""Retrieve the list of saved alarms"""
return self.load_data().get("alarms", [])
def remove_saved_alert(self, alarm_id: int) -> bool:
"""Remove an alarm by its ID"""
data = self.load_data()
original_length = len(data["alarms"])
# Filter out the alarm with the given ID
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
if len(data["alarms"]) == original_length:
return False
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren:
try:
self.siren.dismiss_alarm(alarm_id)
logger.info(f"Siren dismissed alarm: {alarm_id}")
except Exception as e:
logger.error(f"Failed to notify Siren about removed alarm: {e}")
return True
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
"""Update an existing alarm"""
alarm_data["id"] = alarm_id
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
# Load existing data
data = self.load_data()
# Find and update the alarm
for i, existing_alarm in enumerate(data["alarms"]):
if existing_alarm["id"] == alarm_id:
# Generate new hash
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
# Replace the existing alarm
data["alarms"][i] = alarm_data
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren:
try:
# Remove existing alarm and reschedule
self.siren.dismiss_alarm(alarm_id)
if alarm_data.get('enabled', True):
self.siren.schedule_alarm(alarm_data)
logger.info(f"Siren updated for alarm: {alarm_id}")
else:
logger.info(f"Siren dismissed disabled alarm: {alarm_id}")
except Exception as e:
logger.error(f"Failed to notify Siren about updated alarm: {e}")
return True
# Alarm not found
return False
def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]:
"""
Retrieve a specific alarm by its ID
Args:
alarm_id (int): ID of the alarm to retrieve
Returns:
dict or None: Alarm configuration or None if not found
"""
data = self.load_data()
for alarm in data.get('alarms', []):
if alarm.get('id') == alarm_id:
return alarm
return None
def set_siren(self, siren):
"""
Set or update the Siren instance for alarm synchronization
Args:
siren (AlarmSiren): Siren instance to use for alarm management
"""
self.siren = siren