264 lines
9.4 KiB
Python
264 lines
9.4 KiB
Python
![]() |
import logging
|
||
|
import os
|
||
|
import json
|
||
|
import hashlib
|
||
|
from typing import List, Optional, Dict, Any
|
||
|
from datetime import datetime
|
||
|
import re
|
||
|
from dataclasses import asdict
|
||
|
|
||
|
from data_classes import RepeatRule, Snooze, Metadata, Alarm
|
||
|
|
||
|
# Set up logging configuration
|
||
|
logging.basicConfig(
|
||
|
level=logging.DEBUG,
|
||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||
|
handlers=[
|
||
|
logging.StreamHandler(), # Console handler
|
||
|
logging.FileHandler('alert_api.log') # File handler
|
||
|
]
|
||
|
)
|
||
|
|
||
|
logger = logging.getLogger('AlertApi')
|
||
|
|
||
|
class AlarmStorage:
|
||
|
def __init__(self, file_path: str, siren=None):
|
||
|
"""Initialize AlarmStorage with optional Siren integration"""
|
||
|
self.file_path = file_path
|
||
|
self.siren = siren
|
||
|
self._ensure_storage_directory()
|
||
|
|
||
|
def _ensure_storage_directory(self) -> None:
|
||
|
"""Ensure the directory for the storage file exists"""
|
||
|
directory = os.path.dirname(os.path.abspath(self.file_path))
|
||
|
if directory:
|
||
|
os.makedirs(directory, exist_ok=True)
|
||
|
|
||
|
@staticmethod
|
||
|
def _generate_alarm_hash(alarm: dict) -> str:
|
||
|
"""Generate an MD5 hash for the given alarm data"""
|
||
|
def to_dict(value):
|
||
|
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
|
||
|
|
||
|
alarm_data = alarm.copy()
|
||
|
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
|
||
|
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
|
||
|
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
|
||
|
|
||
|
hash_input = json.dumps({
|
||
|
"name": alarm_data["name"],
|
||
|
"time": alarm_data["time"],
|
||
|
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
|
||
|
"repeat_rule": alarm_data["repeat_rule"]
|
||
|
}, sort_keys=True)
|
||
|
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
|
||
|
|
||
|
def load_data(self) -> dict:
|
||
|
"""Load the JSON data from the file"""
|
||
|
try:
|
||
|
with open(self.file_path, 'r') as file:
|
||
|
data = json.load(file)
|
||
|
if not isinstance(data, dict):
|
||
|
raise ValueError("Invalid data format")
|
||
|
return data
|
||
|
except (FileNotFoundError, json.JSONDecodeError):
|
||
|
return {"last_id": 0, "alarms": []}
|
||
|
|
||
|
def save_data(self, data: dict) -> None:
|
||
|
"""Save the JSON data back to the file"""
|
||
|
def convert_to_dict(obj):
|
||
|
"""Convert dataclass instances to dictionaries recursively"""
|
||
|
if isinstance(obj, list):
|
||
|
return [convert_to_dict(item) for item in obj]
|
||
|
elif isinstance(obj, dict):
|
||
|
return {key: convert_to_dict(value) for key, value in obj.items()}
|
||
|
elif hasattr(obj, "__dataclass_fields__"):
|
||
|
return asdict(obj)
|
||
|
else:
|
||
|
return obj
|
||
|
|
||
|
# Convert data to JSON-serializable format
|
||
|
serializable_data = convert_to_dict(data)
|
||
|
|
||
|
with open(self.file_path, 'w') as file:
|
||
|
json.dump(serializable_data, file, indent=4)
|
||
|
|
||
|
def save_new_alert(self, alarm_data: dict) -> int:
|
||
|
"""
|
||
|
Add a new alarm to the storage and notify Siren if configured
|
||
|
"""
|
||
|
# Check for potential duplicates before saving
|
||
|
existing_alarms = self.get_saved_alerts()
|
||
|
for alarm in existing_alarms:
|
||
|
# Compare key characteristics
|
||
|
if (alarm['name'] == alarm_data['name'] and
|
||
|
alarm['time'] == alarm_data['time'] and
|
||
|
alarm['repeat_rule'] == alarm_data['repeat_rule']):
|
||
|
raise ValueError("Duplicate alarm already exists")
|
||
|
|
||
|
|
||
|
# Set default file_to_play if not provided
|
||
|
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||
|
|
||
|
# Convert nested dictionaries to dataclass instances
|
||
|
try:
|
||
|
if 'repeat_rule' in alarm_data:
|
||
|
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||
|
if 'snooze' in alarm_data:
|
||
|
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||
|
if 'metadata' in alarm_data:
|
||
|
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||
|
except TypeError as e:
|
||
|
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||
|
|
||
|
# Validate input
|
||
|
try:
|
||
|
alarm = Alarm(**alarm_data)
|
||
|
if not alarm.validate():
|
||
|
raise ValueError("Invalid alarm configuration")
|
||
|
except (TypeError, ValueError) as e:
|
||
|
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||
|
|
||
|
# Load existing data
|
||
|
data = self.load_data()
|
||
|
|
||
|
# Increment last_id and assign to new alarm
|
||
|
data['last_id'] += 1
|
||
|
alarm_data['id'] = data['last_id']
|
||
|
|
||
|
# Generate hash for the alarm
|
||
|
alarm_data['hash'] = self._generate_alarm_hash(alarm_data)
|
||
|
|
||
|
# Check for duplicates
|
||
|
if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]):
|
||
|
raise ValueError("Duplicate alarm detected")
|
||
|
|
||
|
# Add new alarm to the list
|
||
|
data['alarms'].append(alarm_data)
|
||
|
|
||
|
# Save updated data
|
||
|
self.save_data(data)
|
||
|
|
||
|
# Notify Siren if available
|
||
|
if self.siren and alarm_data.get('enabled', True):
|
||
|
try:
|
||
|
self.siren.schedule_alarm(alarm_data)
|
||
|
logger.info(f"Siren notified of new alarm: {alarm_data['id']}")
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to notify Siren about new alarm: {e}")
|
||
|
|
||
|
return data['last_id']
|
||
|
|
||
|
def get_saved_alerts(self) -> list:
|
||
|
"""Retrieve the list of saved alarms"""
|
||
|
return self.load_data().get("alarms", [])
|
||
|
|
||
|
def remove_saved_alert(self, alarm_id: int) -> bool:
|
||
|
"""Remove an alarm by its ID"""
|
||
|
data = self.load_data()
|
||
|
original_length = len(data["alarms"])
|
||
|
|
||
|
# Filter out the alarm with the given ID
|
||
|
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
|
||
|
|
||
|
if len(data["alarms"]) == original_length:
|
||
|
return False
|
||
|
|
||
|
# Save updated data
|
||
|
self.save_data(data)
|
||
|
|
||
|
# Notify Siren if available
|
||
|
if self.siren:
|
||
|
try:
|
||
|
self.siren.dismiss_alarm(alarm_id)
|
||
|
logger.info(f"Siren dismissed alarm: {alarm_id}")
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to notify Siren about removed alarm: {e}")
|
||
|
|
||
|
return True
|
||
|
|
||
|
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
|
||
|
"""Update an existing alarm"""
|
||
|
alarm_data["id"] = alarm_id
|
||
|
|
||
|
# Set default file_to_play if not provided
|
||
|
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||
|
|
||
|
# Convert nested dictionaries to dataclass instances
|
||
|
try:
|
||
|
if 'repeat_rule' in alarm_data:
|
||
|
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||
|
if 'snooze' in alarm_data:
|
||
|
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||
|
if 'metadata' in alarm_data:
|
||
|
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||
|
except TypeError as e:
|
||
|
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||
|
|
||
|
# Validate input
|
||
|
try:
|
||
|
alarm = Alarm(**alarm_data)
|
||
|
if not alarm.validate():
|
||
|
raise ValueError("Invalid alarm configuration")
|
||
|
except (TypeError, ValueError) as e:
|
||
|
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||
|
|
||
|
# Load existing data
|
||
|
data = self.load_data()
|
||
|
|
||
|
# Find and update the alarm
|
||
|
for i, existing_alarm in enumerate(data["alarms"]):
|
||
|
if existing_alarm["id"] == alarm_id:
|
||
|
# Generate new hash
|
||
|
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
|
||
|
|
||
|
# Replace the existing alarm
|
||
|
data["alarms"][i] = alarm_data
|
||
|
|
||
|
# Save updated data
|
||
|
self.save_data(data)
|
||
|
|
||
|
# Notify Siren if available
|
||
|
if self.siren:
|
||
|
try:
|
||
|
# Remove existing alarm and reschedule
|
||
|
self.siren.dismiss_alarm(alarm_id)
|
||
|
|
||
|
if alarm_data.get('enabled', True):
|
||
|
self.siren.schedule_alarm(alarm_data)
|
||
|
logger.info(f"Siren updated for alarm: {alarm_id}")
|
||
|
else:
|
||
|
logger.info(f"Siren dismissed disabled alarm: {alarm_id}")
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to notify Siren about updated alarm: {e}")
|
||
|
|
||
|
return True
|
||
|
|
||
|
# Alarm not found
|
||
|
return False
|
||
|
|
||
|
def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]:
|
||
|
"""
|
||
|
Retrieve a specific alarm by its ID
|
||
|
|
||
|
Args:
|
||
|
alarm_id (int): ID of the alarm to retrieve
|
||
|
|
||
|
Returns:
|
||
|
dict or None: Alarm configuration or None if not found
|
||
|
"""
|
||
|
data = self.load_data()
|
||
|
for alarm in data.get('alarms', []):
|
||
|
if alarm.get('id') == alarm_id:
|
||
|
return alarm
|
||
|
return None
|
||
|
|
||
|
def set_siren(self, siren):
|
||
|
"""
|
||
|
Set or update the Siren instance for alarm synchronization
|
||
|
|
||
|
Args:
|
||
|
siren (AlarmSiren): Siren instance to use for alarm management
|
||
|
"""
|
||
|
self.siren = siren
|