eee_alarm_clock/clock/alarm_siren.py

261 lines
10 KiB
Python
Raw Normal View History

import os
import time
import threading
import subprocess
import queue
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from logging_config import setup_logging
# Set up logging
logger = setup_logging()
class AlarmSiren:
def __init__(self):
# Communication queues
self.alarm_queue = queue.Queue()
self.control_queue = queue.Queue()
# Tracking active alarms
self.active_alarms: Dict[int, Dict[str, Any]] = {}
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_thread.start()
def schedule_alarm(self, alarm_config: Dict[str, Any]):
"""Schedule an alarm based on its configuration"""
logger.info(f"Scheduling alarm: {alarm_config}")
self.alarm_queue.put(alarm_config)
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
"""Calculate the next alarm trigger time based on repeat rule"""
now = datetime.now()
current_time = now.time()
# Parse alarm time
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
# Determine the next trigger
repeat_rule = alarm_config['repeat_rule']
# Determine repeat rule type and details
if isinstance(repeat_rule, dict):
repeat_type = repeat_rule.get('type')
repeat_days = repeat_rule.get('days_of_week', [])
repeat_at = repeat_rule.get('at')
else:
# Assume it's an object-like structure with attributes
repeat_type = getattr(repeat_rule, 'type', None)
repeat_days = getattr(repeat_rule, 'days_of_week', [])
repeat_at = getattr(repeat_rule, 'at', None)
# Sanity check
if repeat_type is None:
logger.error(f"Invalid repeat rule configuration: {repeat_rule}")
return None
if repeat_type == 'once':
# For one-time alarm, check the specific date
try:
# If 'at' is None, use current date
if repeat_at is None:
specific_date = now.date()
else:
specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date()
return datetime.combine(specific_date, alarm_time)
except ValueError:
logger.error("Invalid one-time alarm configuration")
return None
elif repeat_type == 'daily':
# Daily alarm - trigger today or tomorrow
next_trigger = datetime.combine(now.date(), alarm_time)
if current_time < alarm_time:
return next_trigger
return next_trigger + timedelta(days=1)
elif repeat_type == 'weekly':
# Weekly alarm - check configured days
today = now.strftime("%A").lower()
configured_days = [day.lower() for day in repeat_days]
if today in configured_days and current_time < alarm_time:
return datetime.combine(now.date(), alarm_time)
# Find next configured day
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
current_index = days_order.index(today)
for offset in range(1, 8):
next_day_index = (current_index + offset) % 7
next_day = days_order[next_day_index]
if next_day in configured_days:
next_date = now.date() + timedelta(days=offset)
return datetime.combine(next_date, alarm_time)
return None
def _play_audio(self, file_path: str, volume: int = 100):
"""Play audio file using mpg123 in the background."""
try:
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return None
# Construct mpg123 command with volume control
volume_adjust = f"-g {volume}"
cmd = ["mpg123", volume_adjust, file_path]
logger.info(f"Playing alarm: {file_path}")
# Run mpg123 in the background, suppressing stdout/stderr
process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return process
except Exception as e:
logger.error(f"Error playing audio: {e}")
return None
def _playback_worker(self):
"""Background thread for managing alarm playback"""
while True:
try:
# Check for new alarms to schedule
try:
new_alarm = self.alarm_queue.get(timeout=1)
alarm_time = self._calculate_next_alarm_time(new_alarm)
if alarm_time:
alarm_id = new_alarm.get('id', id(new_alarm))
self.active_alarms[alarm_id] = {
'config': new_alarm,
'trigger_time': alarm_time,
'snooze_count': 0,
'process': None
}
except queue.Empty:
pass
# Check for control signals (snooze/dismiss)
try:
control_msg = self.control_queue.get(timeout=0.1)
alarm_id = control_msg.get('alarm_id')
if control_msg['type'] == 'snooze':
self.snooze_alarm(alarm_id)
elif control_msg['type'] == 'dismiss':
self.dismiss_alarm(alarm_id)
except queue.Empty:
pass
# Check for alarms to trigger
now = datetime.now()
for alarm_id, alarm_info in list(self.active_alarms.items()):
# Check if alarm is more than 1 hour late
if now > alarm_info['trigger_time'] + timedelta(hours=1):
logger.warning(f"Alarm {alarm_id} is over 1 hour late. Disabling.")
del self.active_alarms[alarm_id]
continue
if now >= alarm_info['trigger_time']:
# Trigger alarm if not already active
if alarm_info['process'] is None:
alarm_info['process'] = self._play_audio(
alarm_info['config']['file_to_play'],
alarm_info['config'].get('metadata', {}).get('volume', 100)
)
logger.info(f"Alarm {alarm_id} triggered at {now}.")
# Notify UI about the triggered alarm
self.control_queue.put({
'type': 'trigger',
'alarm_id': alarm_id,
'info': alarm_info
})
# Handle alarms that have been snoozed or dismissed
if alarm_info['process'] and alarm_info['process'].poll() is not None:
# Process has finished naturally
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
if next_trigger:
alarm_info['trigger_time'] = next_trigger
alarm_info['process'] = None
else:
logger.info(f"Removing non-repeating alarm {alarm_id}.")
del self.active_alarms[alarm_id]
# Actively clean up zombie processes
for alarm_id, alarm_info in list(self.active_alarms.items()):
process = alarm_info.get('process')
if process and process.poll() is not None:
# Remove terminated processes
alarm_info['process'] = None
time.sleep(0.5) # Prevent tight loop
except Exception as e:
logger.error(f"Error in playback worker: {e}")
time.sleep(1)
def snooze_alarm(self, alarm_id: int):
"""Snooze a specific alarm"""
if alarm_id not in self.active_alarms:
logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms")
return False
alarm_info = self.active_alarms[alarm_id]
alarm_config = alarm_info['config']
# Default snooze configuration if not provided
snooze_config = alarm_config.get('snooze', {
'enabled': True,
'duration': 5, # Default 5 minutes
'max_count': 3 # Default max 3 snoozes
})
if not snooze_config.get('enabled', True):
logger.warning(f"Snooze not enabled for alarm {alarm_id}")
return False
# Check snooze count
if alarm_info.get('snooze_count', 0) >= snooze_config['max_count']:
logger.warning(f"Maximum snooze count reached for alarm {alarm_id}")
return False
# Increment snooze count and set next trigger time
alarm_info['snooze_count'] = alarm_info.get('snooze_count', 0) + 1
snooze_duration = snooze_config.get('duration', 5)
alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
# Stop any active playback
process = alarm_info.get('process')
if process:
process.terminate()
process.wait()
alarm_info['process'] = None
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes.")
return True
def dismiss_alarm(self, alarm_id: int):
"""Dismiss a specific alarm."""
if alarm_id not in self.active_alarms:
logger.warning(f"Cannot dismiss alarm {alarm_id} - not found in active alarms")
return False
# Stop playback and terminate any running process
alarm_info = self.active_alarms.pop(alarm_id, None)
if alarm_info and alarm_info.get('process'):
try:
alarm_info['process'].terminate()
alarm_info['process'].wait(timeout=2)
except Exception as e:
logger.error(f"Error terminating alarm process: {e}")
# Force kill if terminate fails
try:
alarm_info['process'].kill()
except Exception:
pass
logger.info(f"Dismissed alarm {alarm_id}.")
return True