eee_alarm_clock/alert_api/alarm_siren.py

216 lines
8.3 KiB
Python
Raw Normal View History

import os
import time
import threading
import subprocess
import queue
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from logging_config import setup_logging
# Set up logging
logger = setup_logging()
class AlarmSiren:
def __init__(self):
# Communication queues
self.alarm_queue = queue.Queue()
self.control_queue = queue.Queue()
# Tracking active alarms
self.active_alarms: Dict[int, Dict[str, Any]] = {}
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_thread.start()
def schedule_alarm(self, alarm_config: Dict[str, Any]):
"""Schedule an alarm based on its configuration"""
logger.info(f"Scheduling alarm: {alarm_config}")
self.alarm_queue.put(alarm_config)
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
"""Calculate the next alarm trigger time based on repeat rule"""
now = datetime.now()
current_time = now.time()
# Parse alarm time
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
# Determine the next trigger
repeat_rule = alarm_config['repeat_rule']
# Determine repeat rule type and details
if isinstance(repeat_rule, dict):
repeat_type = repeat_rule.get('type')
repeat_days = repeat_rule.get('days_of_week', [])
repeat_at = repeat_rule.get('at')
else:
# Assume it's an object-like structure with attributes
repeat_type = getattr(repeat_rule, 'type', None)
repeat_days = getattr(repeat_rule, 'days_of_week', [])
repeat_at = getattr(repeat_rule, 'at', None)
# Sanity check
if repeat_type is None:
logger.error(f"Invalid repeat rule configuration: {repeat_rule}")
return None
if repeat_type == 'once':
# For one-time alarm, check the specific date
try:
# If 'at' is None, use current date
if repeat_at is None:
specific_date = now.date()
else:
specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date()
return datetime.combine(specific_date, alarm_time)
except ValueError:
logger.error("Invalid one-time alarm configuration")
return None
elif repeat_type == 'daily':
# Daily alarm - trigger today or tomorrow
next_trigger = datetime.combine(now.date(), alarm_time)
if current_time < alarm_time:
return next_trigger
return next_trigger + timedelta(days=1)
elif repeat_type == 'weekly':
# Weekly alarm - check configured days
today = now.strftime("%A").lower()
configured_days = [day.lower() for day in repeat_days]
if today in configured_days and current_time < alarm_time:
return datetime.combine(now.date(), alarm_time)
# Find next configured day
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
current_index = days_order.index(today)
for offset in range(1, 8):
next_day_index = (current_index + offset) % 7
next_day = days_order[next_day_index]
if next_day in configured_days:
next_date = now.date() + timedelta(days=offset)
return datetime.combine(next_date, alarm_time)
return None
def _play_audio(self, file_path: str, volume: int = 100):
"""Play audio file using mpg123"""
try:
# Ensure the file exists
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return False
# Construct mpg123 command with volume control
volume_adjust = f"-g {volume}"
cmd = ["mpg123", volume_adjust, file_path]
logger.info(f"Playing alarm: {file_path}")
# Track the process for potential interruption
process = subprocess.Popen(cmd)
return process
except Exception as e:
logger.error(f"Error playing audio: {e}")
return False
def _playback_worker(self):
"""Background thread for managing alarm playback"""
while True:
try:
# Check for new alarms to schedule
try:
new_alarm = self.alarm_queue.get(timeout=1)
alarm_time = self._calculate_next_alarm_time(new_alarm)
if alarm_time:
self.active_alarms[new_alarm.get('id', id(new_alarm))] = {
'config': new_alarm,
'trigger_time': alarm_time,
'snooze_count': 0
}
except queue.Empty:
pass
# Check for control signals (snooze/dismiss)
try:
control_msg = self.control_queue.get(timeout=0.1)
# Handle control message logic
except queue.Empty:
pass
# Check for alarms to trigger
now = datetime.now()
for alarm_id, alarm_info in list(self.active_alarms.items()):
if now >= alarm_info['trigger_time']:
# Trigger alarm
process = self._play_audio(
alarm_info['config']['file_to_play'],
alarm_info['config'].get('metadata', {}).get('volume', 100)
)
# Handle repeat and snooze logic
if process:
# Wait for user interaction or timeout
# In a real implementation, this would be more sophisticated
time.sleep(30) # Placeholder for user interaction
# Determine next trigger based on repeat rule
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
if next_trigger:
alarm_info['trigger_time'] = next_trigger
else:
del self.active_alarms[alarm_id]
time.sleep(1) # Prevent tight loop
except Exception as e:
logger.error(f"Error in playback worker: {e}")
time.sleep(1)
def snooze_alarm(self, alarm_id: int):
"""Snooze a specific alarm"""
if alarm_id not in self.active_alarms:
logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms")
return False
alarm_info = self.active_alarms[alarm_id]
alarm_config = alarm_info['config']
# Default snooze configuration if not provided
snooze_config = alarm_config.get('snooze', {
'enabled': True,
'duration': 5, # Default 5 minutes
'max_count': 3 # Default max 3 snoozes
})
# Ensure all required keys exist with default values
snooze_config.setdefault('enabled', True)
snooze_config.setdefault('duration', 5)
snooze_config.setdefault('max_count', 3)
# Check if snoozing is allowed
if not snooze_config['enabled']:
logger.warning(f"Snooze not enabled for alarm {alarm_id}")
return False
# Check snooze count
current_snooze_count = alarm_info.get('snooze_count', 0)
if current_snooze_count >= snooze_config['max_count']:
logger.warning(f"Maximum snooze count reached for alarm {alarm_id}")
return False
# Increment snooze count
alarm_info['snooze_count'] = current_snooze_count + 1
# Set next trigger time
snooze_duration = snooze_config['duration']
alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes. Snooze count: {alarm_info['snooze_count']}")
return True