eee_alarm_clock/alert_api/alarm_siren.py

219 lines
8.3 KiB
Python
Raw Normal View History

import os
import time
import threading
import subprocess
import queue
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('alarm_siren.log')
]
)
logger = logging.getLogger('AlarmSiren')
class AlarmSiren:
def __init__(self):
# Communication queues
self.alarm_queue = queue.Queue()
self.control_queue = queue.Queue()
# Tracking active alarms
self.active_alarms: Dict[int, Dict[str, Any]] = {}
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_thread.start()
def schedule_alarm(self, alarm_config: Dict[str, Any]):
"""Schedule an alarm based on its configuration"""
logger.info(f"Scheduling alarm: {alarm_config}")
self.alarm_queue.put(alarm_config)
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
"""Calculate the next alarm trigger time based on repeat rule"""
now = datetime.now()
current_time = now.time()
# Parse alarm time
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
# Determine the next trigger
if alarm_config['repeat_rule']['type'] == 'once':
# For one-time alarm, check the specific date
try:
specific_date = datetime.strptime(alarm_config['repeat_rule']['at'], "%d.%m.%Y")
return datetime.combine(specific_date.date(), alarm_time)
except (KeyError, ValueError):
logger.error("Invalid one-time alarm configuration")
return None
elif alarm_config['repeat_rule']['type'] == 'daily':
# Daily alarm - trigger today or tomorrow
next_trigger = datetime.combine(now.date(), alarm_time)
if current_time < alarm_time:
return next_trigger
return next_trigger + timedelta(days=1)
elif alarm_config['repeat_rule']['type'] == 'weekly':
# Weekly alarm - check configured days
today = now.strftime("%A").lower()
configured_days = [day.lower() for day in alarm_config['repeat_rule'].get('days_of_week', [])]
if today in configured_days and current_time < alarm_time:
return datetime.combine(now.date(), alarm_time)
# Find next configured day
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
current_index = days_order.index(today)
for offset in range(1, 8):
next_day_index = (current_index + offset) % 7
next_day = days_order[next_day_index]
if next_day in configured_days:
next_date = now.date() + timedelta(days=offset)
return datetime.combine(next_date, alarm_time)
return None
def _play_audio(self, file_path: str, volume: int = 100):
"""Play audio file using mpg123"""
try:
# Ensure the file exists
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return False
# Construct mpg123 command with volume control
volume_adjust = f"-g {volume}"
cmd = ["mpg123", volume_adjust, file_path]
logger.info(f"Playing alarm: {file_path}")
# Track the process for potential interruption
process = subprocess.Popen(cmd)
return process
except Exception as e:
logger.error(f"Error playing audio: {e}")
return False
def _playback_worker(self):
"""Background thread for managing alarm playback"""
while True:
try:
# Check for new alarms to schedule
try:
new_alarm = self.alarm_queue.get(timeout=1)
alarm_time = self._calculate_next_alarm_time(new_alarm)
if alarm_time:
self.active_alarms[new_alarm.get('id', id(new_alarm))] = {
'config': new_alarm,
'trigger_time': alarm_time,
'snooze_count': 0
}
except queue.Empty:
pass
# Check for control signals (snooze/dismiss)
try:
control_msg = self.control_queue.get(timeout=0.1)
# Handle control message logic
except queue.Empty:
pass
# Check for alarms to trigger
now = datetime.now()
for alarm_id, alarm_info in list(self.active_alarms.items()):
if now >= alarm_info['trigger_time']:
# Trigger alarm
process = self._play_audio(
alarm_info['config']['file_to_play'],
alarm_info['config'].get('metadata', {}).get('volume', 100)
)
# Handle repeat and snooze logic
if process:
# Wait for user interaction or timeout
# In a real implementation, this would be more sophisticated
time.sleep(30) # Placeholder for user interaction
# Determine next trigger based on repeat rule
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
if next_trigger:
alarm_info['trigger_time'] = next_trigger
else:
del self.active_alarms[alarm_id]
time.sleep(1) # Prevent tight loop
except Exception as e:
logger.error(f"Error in playback worker: {e}")
time.sleep(1)
def snooze_alarm(self, alarm_id: int):
"""Snooze a specific alarm"""
if alarm_id in self.active_alarms:
alarm_config = self.active_alarms[alarm_id]['config']
snooze_config = alarm_config.get('snooze', {'enabled': True, 'duration': 10, 'max_count': 3})
if (snooze_config['enabled'] and
self.active_alarms[alarm_id]['snooze_count'] < snooze_config['max_count']):
# Increment snooze count
self.active_alarms[alarm_id]['snooze_count'] += 1
# Set next trigger time
snooze_duration = snooze_config.get('duration', 10)
self.active_alarms[alarm_id]['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes")
else:
logger.warning(f"Cannot snooze alarm {alarm_id} - max snooze count reached")
def dismiss_alarm(self, alarm_id: int):
"""Dismiss a specific alarm"""
if alarm_id in self.active_alarms:
logger.info(f"Dismissed alarm {alarm_id}")
del self.active_alarms[alarm_id]
def main():
"""Example usage of AlarmSiren"""
siren = AlarmSiren()
# Example alarm configuration
sample_alarm = {
'id': 1,
'name': 'Morning Alarm',
'time': '07:00:00',
'file_to_play': '/path/to/alarm.mp3',
'repeat_rule': {
'type': 'daily'
},
'snooze': {
'enabled': True,
'duration': 10,
'max_count': 3
},
'metadata': {
'volume': 80
}
}
siren.schedule_alarm(sample_alarm)
# Keep main thread alive (in a real app, this would be handled differently)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Alarm siren stopped")
if __name__ == "__main__":
main()