Made alarm_api more modular and easier to extend.

This commit is contained in:
Kalzu Rekku
2025-01-24 19:28:14 +02:00
parent 211f581921
commit 35d1e76ba0
6 changed files with 1014 additions and 491 deletions

263
alert_api/alarm_storage.py Normal file
View File

@@ -0,0 +1,263 @@
import logging
import os
import json
import hashlib
from typing import List, Optional, Dict, Any
from datetime import datetime
import re
from dataclasses import asdict
from data_classes import RepeatRule, Snooze, Metadata, Alarm
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
class AlarmStorage:
def __init__(self, file_path: str, siren=None):
"""Initialize AlarmStorage with optional Siren integration"""
self.file_path = file_path
self.siren = siren
self._ensure_storage_directory()
def _ensure_storage_directory(self) -> None:
"""Ensure the directory for the storage file exists"""
directory = os.path.dirname(os.path.abspath(self.file_path))
if directory:
os.makedirs(directory, exist_ok=True)
@staticmethod
def _generate_alarm_hash(alarm: dict) -> str:
"""Generate an MD5 hash for the given alarm data"""
def to_dict(value):
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
alarm_data = alarm.copy()
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
hash_input = json.dumps({
"name": alarm_data["name"],
"time": alarm_data["time"],
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
"repeat_rule": alarm_data["repeat_rule"]
}, sort_keys=True)
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
def load_data(self) -> dict:
"""Load the JSON data from the file"""
try:
with open(self.file_path, 'r') as file:
data = json.load(file)
if not isinstance(data, dict):
raise ValueError("Invalid data format")
return data
except (FileNotFoundError, json.JSONDecodeError):
return {"last_id": 0, "alarms": []}
def save_data(self, data: dict) -> None:
"""Save the JSON data back to the file"""
def convert_to_dict(obj):
"""Convert dataclass instances to dictionaries recursively"""
if isinstance(obj, list):
return [convert_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {key: convert_to_dict(value) for key, value in obj.items()}
elif hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
else:
return obj
# Convert data to JSON-serializable format
serializable_data = convert_to_dict(data)
with open(self.file_path, 'w') as file:
json.dump(serializable_data, file, indent=4)
def save_new_alert(self, alarm_data: dict) -> int:
"""
Add a new alarm to the storage and notify Siren if configured
"""
# Check for potential duplicates before saving
existing_alarms = self.get_saved_alerts()
for alarm in existing_alarms:
# Compare key characteristics
if (alarm['name'] == alarm_data['name'] and
alarm['time'] == alarm_data['time'] and
alarm['repeat_rule'] == alarm_data['repeat_rule']):
raise ValueError("Duplicate alarm already exists")
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
# Load existing data
data = self.load_data()
# Increment last_id and assign to new alarm
data['last_id'] += 1
alarm_data['id'] = data['last_id']
# Generate hash for the alarm
alarm_data['hash'] = self._generate_alarm_hash(alarm_data)
# Check for duplicates
if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]):
raise ValueError("Duplicate alarm detected")
# Add new alarm to the list
data['alarms'].append(alarm_data)
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren and alarm_data.get('enabled', True):
try:
self.siren.schedule_alarm(alarm_data)
logger.info(f"Siren notified of new alarm: {alarm_data['id']}")
except Exception as e:
logger.error(f"Failed to notify Siren about new alarm: {e}")
return data['last_id']
def get_saved_alerts(self) -> list:
"""Retrieve the list of saved alarms"""
return self.load_data().get("alarms", [])
def remove_saved_alert(self, alarm_id: int) -> bool:
"""Remove an alarm by its ID"""
data = self.load_data()
original_length = len(data["alarms"])
# Filter out the alarm with the given ID
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
if len(data["alarms"]) == original_length:
return False
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren:
try:
self.siren.dismiss_alarm(alarm_id)
logger.info(f"Siren dismissed alarm: {alarm_id}")
except Exception as e:
logger.error(f"Failed to notify Siren about removed alarm: {e}")
return True
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
"""Update an existing alarm"""
alarm_data["id"] = alarm_id
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
# Load existing data
data = self.load_data()
# Find and update the alarm
for i, existing_alarm in enumerate(data["alarms"]):
if existing_alarm["id"] == alarm_id:
# Generate new hash
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
# Replace the existing alarm
data["alarms"][i] = alarm_data
# Save updated data
self.save_data(data)
# Notify Siren if available
if self.siren:
try:
# Remove existing alarm and reschedule
self.siren.dismiss_alarm(alarm_id)
if alarm_data.get('enabled', True):
self.siren.schedule_alarm(alarm_data)
logger.info(f"Siren updated for alarm: {alarm_id}")
else:
logger.info(f"Siren dismissed disabled alarm: {alarm_id}")
except Exception as e:
logger.error(f"Failed to notify Siren about updated alarm: {e}")
return True
# Alarm not found
return False
def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]:
"""
Retrieve a specific alarm by its ID
Args:
alarm_id (int): ID of the alarm to retrieve
Returns:
dict or None: Alarm configuration or None if not found
"""
data = self.load_data()
for alarm in data.get('alarms', []):
if alarm.get('id') == alarm_id:
return alarm
return None
def set_siren(self, siren):
"""
Set or update the Siren instance for alarm synchronization
Args:
siren (AlarmSiren): Siren instance to use for alarm management
"""
self.siren = siren