Compare commits
2 Commits
211f581921
...
0eb5204833
Author | SHA1 | Date | |
---|---|---|---|
0eb5204833 | |||
35d1e76ba0 |
162
alert_api/alarm_api.py
Normal file
162
alert_api/alarm_api.py
Normal file
@ -0,0 +1,162 @@
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import List, Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
from http import HTTPStatus
|
||||
import re
|
||||
|
||||
from alarm_storage import AlarmStorage
|
||||
from data_classes import RepeatRule, Snooze, Metadata, Alarm
|
||||
from logging_config import setup_logging
|
||||
|
||||
# Set up logging configuration
|
||||
logger = setup_logging()
|
||||
|
||||
class AlertApi(BaseHTTPRequestHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.storage = AlarmStorage("data/alerts.json")
|
||||
self.logger = logger
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None:
|
||||
"""Send a JSON response with the given status code and data/error"""
|
||||
self.send_response(status_code)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Access-Control-Allow-Origin", "*")
|
||||
self.end_headers()
|
||||
|
||||
response = {}
|
||||
if data is not None:
|
||||
response["data"] = data
|
||||
if error is not None:
|
||||
response["error"] = error
|
||||
|
||||
response_json = json.dumps(response)
|
||||
self.logger.debug(f"Sending response: {response_json}")
|
||||
self.wfile.write(response_json.encode("utf-8"))
|
||||
|
||||
def _handle_request(self, method: str) -> None:
|
||||
"""Handle incoming requests with proper error handling"""
|
||||
self.logger.info(f"Received {method} request from {self.client_address[0]}")
|
||||
|
||||
try:
|
||||
if method in ["POST", "PUT", "DELETE"]:
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
if content_length == 0:
|
||||
raise ValueError("Missing request body")
|
||||
|
||||
post_data = self.rfile.read(content_length)
|
||||
self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}")
|
||||
|
||||
post_data = json.loads(post_data)
|
||||
else:
|
||||
post_data = None
|
||||
|
||||
# Route request to appropriate handler
|
||||
handler = getattr(self, f"_handle_{method.lower()}", None)
|
||||
if handler:
|
||||
self.logger.debug(f"Routing to handler: _handle_{method.lower()}")
|
||||
handler(post_data)
|
||||
else:
|
||||
self.logger.warning(f"Method not allowed: {method}")
|
||||
self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
self.logger.error(f"JSON decode error: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body")
|
||||
except ValueError as e:
|
||||
self.logger.error(f"Validation error: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
||||
self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error")
|
||||
|
||||
def _handle_get(self, _) -> None:
|
||||
"""Handle GET request"""
|
||||
self.logger.debug("Processing GET request for all alarms")
|
||||
alarms = self.storage.get_saved_alerts()
|
||||
self.logger.debug(f"Retrieved {len(alarms)} alarms")
|
||||
self._send_response(HTTPStatus.OK, data=alarms)
|
||||
|
||||
def _handle_post(self, data: dict) -> None:
|
||||
"""Handle POST request"""
|
||||
self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}")
|
||||
try:
|
||||
alarm_id = self.storage.save_new_alert(data)
|
||||
self.logger.info(f"Successfully created new alarm with ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.CREATED, data={"id": alarm_id})
|
||||
except ValueError as e:
|
||||
self.logger.error(f"Failed to create alarm: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
|
||||
|
||||
def _handle_put(self, data: dict) -> None:
|
||||
"""Handle PUT request"""
|
||||
alarm_id = data.pop("id", None)
|
||||
self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}")
|
||||
|
||||
if alarm_id is None:
|
||||
self.logger.error("PUT request missing alarm ID")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID")
|
||||
return
|
||||
|
||||
if self.storage.update_alert(alarm_id, data):
|
||||
self.logger.info(f"Successfully updated alarm ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"})
|
||||
else:
|
||||
self.logger.warning(f"Alarm not found for update: {alarm_id}")
|
||||
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
|
||||
|
||||
def _handle_delete(self, data: dict) -> None:
|
||||
"""Handle DELETE request"""
|
||||
alarm_id = data.get("id")
|
||||
self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}")
|
||||
|
||||
if not isinstance(alarm_id, int):
|
||||
self.logger.error(f"Invalid alarm ID format: {alarm_id}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID")
|
||||
return
|
||||
|
||||
if self.storage.remove_saved_alert(alarm_id):
|
||||
self.logger.info(f"Successfully deleted alarm ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"})
|
||||
else:
|
||||
self.logger.warning(f"Alarm not found for deletion: {alarm_id}")
|
||||
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
|
||||
|
||||
def do_GET(self): self._handle_request("GET")
|
||||
def do_POST(self): self._handle_request("POST")
|
||||
def do_PUT(self): self._handle_request("PUT")
|
||||
def do_DELETE(self): self._handle_request("DELETE")
|
||||
|
||||
|
||||
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
|
||||
# Set up logging configuration
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Console handler
|
||||
logging.FileHandler('alert_api.log') # File handler
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('AlertApi')
|
||||
logger.info(f"Starting AlertApi on port {port}")
|
||||
|
||||
server_address = ("", port)
|
||||
httpd = server_class(server_address, handler_class)
|
||||
|
||||
try:
|
||||
logger.info("Server is ready to handle requests")
|
||||
httpd.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received shutdown signal")
|
||||
except Exception as e:
|
||||
logger.error(f"Server error: {str(e)}", exc_info=True)
|
||||
finally:
|
||||
httpd.server_close()
|
||||
logger.info("Server stopped")
|
218
alert_api/alarm_siren.py
Normal file
218
alert_api/alarm_siren.py
Normal file
@ -0,0 +1,218 @@
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import subprocess
|
||||
import queue
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler('alarm_siren.log')
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger('AlarmSiren')
|
||||
|
||||
class AlarmSiren:
|
||||
def __init__(self):
|
||||
# Communication queues
|
||||
self.alarm_queue = queue.Queue()
|
||||
self.control_queue = queue.Queue()
|
||||
|
||||
# Tracking active alarms
|
||||
self.active_alarms: Dict[int, Dict[str, Any]] = {}
|
||||
|
||||
# Playback thread
|
||||
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
|
||||
self.playback_thread.start()
|
||||
|
||||
def schedule_alarm(self, alarm_config: Dict[str, Any]):
|
||||
"""Schedule an alarm based on its configuration"""
|
||||
logger.info(f"Scheduling alarm: {alarm_config}")
|
||||
self.alarm_queue.put(alarm_config)
|
||||
|
||||
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
|
||||
"""Calculate the next alarm trigger time based on repeat rule"""
|
||||
now = datetime.now()
|
||||
current_time = now.time()
|
||||
|
||||
# Parse alarm time
|
||||
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
|
||||
|
||||
# Determine the next trigger
|
||||
if alarm_config['repeat_rule']['type'] == 'once':
|
||||
# For one-time alarm, check the specific date
|
||||
try:
|
||||
specific_date = datetime.strptime(alarm_config['repeat_rule']['at'], "%d.%m.%Y")
|
||||
return datetime.combine(specific_date.date(), alarm_time)
|
||||
except (KeyError, ValueError):
|
||||
logger.error("Invalid one-time alarm configuration")
|
||||
return None
|
||||
|
||||
elif alarm_config['repeat_rule']['type'] == 'daily':
|
||||
# Daily alarm - trigger today or tomorrow
|
||||
next_trigger = datetime.combine(now.date(), alarm_time)
|
||||
if current_time < alarm_time:
|
||||
return next_trigger
|
||||
return next_trigger + timedelta(days=1)
|
||||
|
||||
elif alarm_config['repeat_rule']['type'] == 'weekly':
|
||||
# Weekly alarm - check configured days
|
||||
today = now.strftime("%A").lower()
|
||||
configured_days = [day.lower() for day in alarm_config['repeat_rule'].get('days_of_week', [])]
|
||||
|
||||
if today in configured_days and current_time < alarm_time:
|
||||
return datetime.combine(now.date(), alarm_time)
|
||||
|
||||
# Find next configured day
|
||||
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
|
||||
current_index = days_order.index(today)
|
||||
|
||||
for offset in range(1, 8):
|
||||
next_day_index = (current_index + offset) % 7
|
||||
next_day = days_order[next_day_index]
|
||||
|
||||
if next_day in configured_days:
|
||||
next_date = now.date() + timedelta(days=offset)
|
||||
return datetime.combine(next_date, alarm_time)
|
||||
|
||||
return None
|
||||
|
||||
def _play_audio(self, file_path: str, volume: int = 100):
|
||||
"""Play audio file using mpg123"""
|
||||
try:
|
||||
# Ensure the file exists
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"Audio file not found: {file_path}")
|
||||
return False
|
||||
|
||||
# Construct mpg123 command with volume control
|
||||
volume_adjust = f"-g {volume}"
|
||||
cmd = ["mpg123", volume_adjust, file_path]
|
||||
|
||||
logger.info(f"Playing alarm: {file_path}")
|
||||
|
||||
# Track the process for potential interruption
|
||||
process = subprocess.Popen(cmd)
|
||||
return process
|
||||
except Exception as e:
|
||||
logger.error(f"Error playing audio: {e}")
|
||||
return False
|
||||
|
||||
def _playback_worker(self):
|
||||
"""Background thread for managing alarm playback"""
|
||||
while True:
|
||||
try:
|
||||
# Check for new alarms to schedule
|
||||
try:
|
||||
new_alarm = self.alarm_queue.get(timeout=1)
|
||||
alarm_time = self._calculate_next_alarm_time(new_alarm)
|
||||
if alarm_time:
|
||||
self.active_alarms[new_alarm.get('id', id(new_alarm))] = {
|
||||
'config': new_alarm,
|
||||
'trigger_time': alarm_time,
|
||||
'snooze_count': 0
|
||||
}
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
# Check for control signals (snooze/dismiss)
|
||||
try:
|
||||
control_msg = self.control_queue.get(timeout=0.1)
|
||||
# Handle control message logic
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
# Check for alarms to trigger
|
||||
now = datetime.now()
|
||||
for alarm_id, alarm_info in list(self.active_alarms.items()):
|
||||
if now >= alarm_info['trigger_time']:
|
||||
# Trigger alarm
|
||||
process = self._play_audio(
|
||||
alarm_info['config']['file_to_play'],
|
||||
alarm_info['config'].get('metadata', {}).get('volume', 100)
|
||||
)
|
||||
|
||||
# Handle repeat and snooze logic
|
||||
if process:
|
||||
# Wait for user interaction or timeout
|
||||
# In a real implementation, this would be more sophisticated
|
||||
time.sleep(30) # Placeholder for user interaction
|
||||
|
||||
# Determine next trigger based on repeat rule
|
||||
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
|
||||
if next_trigger:
|
||||
alarm_info['trigger_time'] = next_trigger
|
||||
else:
|
||||
del self.active_alarms[alarm_id]
|
||||
|
||||
time.sleep(1) # Prevent tight loop
|
||||
except Exception as e:
|
||||
logger.error(f"Error in playback worker: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
def snooze_alarm(self, alarm_id: int):
|
||||
"""Snooze a specific alarm"""
|
||||
if alarm_id in self.active_alarms:
|
||||
alarm_config = self.active_alarms[alarm_id]['config']
|
||||
snooze_config = alarm_config.get('snooze', {'enabled': True, 'duration': 10, 'max_count': 3})
|
||||
|
||||
if (snooze_config['enabled'] and
|
||||
self.active_alarms[alarm_id]['snooze_count'] < snooze_config['max_count']):
|
||||
|
||||
# Increment snooze count
|
||||
self.active_alarms[alarm_id]['snooze_count'] += 1
|
||||
|
||||
# Set next trigger time
|
||||
snooze_duration = snooze_config.get('duration', 10)
|
||||
self.active_alarms[alarm_id]['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
|
||||
|
||||
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes")
|
||||
else:
|
||||
logger.warning(f"Cannot snooze alarm {alarm_id} - max snooze count reached")
|
||||
|
||||
def dismiss_alarm(self, alarm_id: int):
|
||||
"""Dismiss a specific alarm"""
|
||||
if alarm_id in self.active_alarms:
|
||||
logger.info(f"Dismissed alarm {alarm_id}")
|
||||
del self.active_alarms[alarm_id]
|
||||
|
||||
def main():
|
||||
"""Example usage of AlarmSiren"""
|
||||
siren = AlarmSiren()
|
||||
|
||||
# Example alarm configuration
|
||||
sample_alarm = {
|
||||
'id': 1,
|
||||
'name': 'Morning Alarm',
|
||||
'time': '07:00:00',
|
||||
'file_to_play': '/path/to/alarm.mp3',
|
||||
'repeat_rule': {
|
||||
'type': 'daily'
|
||||
},
|
||||
'snooze': {
|
||||
'enabled': True,
|
||||
'duration': 10,
|
||||
'max_count': 3
|
||||
},
|
||||
'metadata': {
|
||||
'volume': 80
|
||||
}
|
||||
}
|
||||
|
||||
siren.schedule_alarm(sample_alarm)
|
||||
|
||||
# Keep main thread alive (in a real app, this would be handled differently)
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("Alarm siren stopped")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
263
alert_api/alarm_storage.py
Normal file
263
alert_api/alarm_storage.py
Normal file
@ -0,0 +1,263 @@
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
from typing import List, Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
import re
|
||||
from dataclasses import asdict
|
||||
|
||||
from data_classes import RepeatRule, Snooze, Metadata, Alarm
|
||||
|
||||
# Set up logging configuration
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Console handler
|
||||
logging.FileHandler('alert_api.log') # File handler
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('AlertApi')
|
||||
|
||||
class AlarmStorage:
|
||||
def __init__(self, file_path: str, siren=None):
|
||||
"""Initialize AlarmStorage with optional Siren integration"""
|
||||
self.file_path = file_path
|
||||
self.siren = siren
|
||||
self._ensure_storage_directory()
|
||||
|
||||
def _ensure_storage_directory(self) -> None:
|
||||
"""Ensure the directory for the storage file exists"""
|
||||
directory = os.path.dirname(os.path.abspath(self.file_path))
|
||||
if directory:
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def _generate_alarm_hash(alarm: dict) -> str:
|
||||
"""Generate an MD5 hash for the given alarm data"""
|
||||
def to_dict(value):
|
||||
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
|
||||
|
||||
alarm_data = alarm.copy()
|
||||
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
|
||||
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
|
||||
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
|
||||
|
||||
hash_input = json.dumps({
|
||||
"name": alarm_data["name"],
|
||||
"time": alarm_data["time"],
|
||||
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
|
||||
"repeat_rule": alarm_data["repeat_rule"]
|
||||
}, sort_keys=True)
|
||||
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
|
||||
|
||||
def load_data(self) -> dict:
|
||||
"""Load the JSON data from the file"""
|
||||
try:
|
||||
with open(self.file_path, 'r') as file:
|
||||
data = json.load(file)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Invalid data format")
|
||||
return data
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return {"last_id": 0, "alarms": []}
|
||||
|
||||
def save_data(self, data: dict) -> None:
|
||||
"""Save the JSON data back to the file"""
|
||||
def convert_to_dict(obj):
|
||||
"""Convert dataclass instances to dictionaries recursively"""
|
||||
if isinstance(obj, list):
|
||||
return [convert_to_dict(item) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return {key: convert_to_dict(value) for key, value in obj.items()}
|
||||
elif hasattr(obj, "__dataclass_fields__"):
|
||||
return asdict(obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
# Convert data to JSON-serializable format
|
||||
serializable_data = convert_to_dict(data)
|
||||
|
||||
with open(self.file_path, 'w') as file:
|
||||
json.dump(serializable_data, file, indent=4)
|
||||
|
||||
def save_new_alert(self, alarm_data: dict) -> int:
|
||||
"""
|
||||
Add a new alarm to the storage and notify Siren if configured
|
||||
"""
|
||||
# Check for potential duplicates before saving
|
||||
existing_alarms = self.get_saved_alerts()
|
||||
for alarm in existing_alarms:
|
||||
# Compare key characteristics
|
||||
if (alarm['name'] == alarm_data['name'] and
|
||||
alarm['time'] == alarm_data['time'] and
|
||||
alarm['repeat_rule'] == alarm_data['repeat_rule']):
|
||||
raise ValueError("Duplicate alarm already exists")
|
||||
|
||||
|
||||
# Set default file_to_play if not provided
|
||||
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
|
||||
# Convert nested dictionaries to dataclass instances
|
||||
try:
|
||||
if 'repeat_rule' in alarm_data:
|
||||
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||||
if 'snooze' in alarm_data:
|
||||
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||||
if 'metadata' in alarm_data:
|
||||
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||||
|
||||
# Validate input
|
||||
try:
|
||||
alarm = Alarm(**alarm_data)
|
||||
if not alarm.validate():
|
||||
raise ValueError("Invalid alarm configuration")
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||||
|
||||
# Load existing data
|
||||
data = self.load_data()
|
||||
|
||||
# Increment last_id and assign to new alarm
|
||||
data['last_id'] += 1
|
||||
alarm_data['id'] = data['last_id']
|
||||
|
||||
# Generate hash for the alarm
|
||||
alarm_data['hash'] = self._generate_alarm_hash(alarm_data)
|
||||
|
||||
# Check for duplicates
|
||||
if any(a.get("hash") == alarm_data['hash'] for a in data["alarms"]):
|
||||
raise ValueError("Duplicate alarm detected")
|
||||
|
||||
# Add new alarm to the list
|
||||
data['alarms'].append(alarm_data)
|
||||
|
||||
# Save updated data
|
||||
self.save_data(data)
|
||||
|
||||
# Notify Siren if available
|
||||
if self.siren and alarm_data.get('enabled', True):
|
||||
try:
|
||||
self.siren.schedule_alarm(alarm_data)
|
||||
logger.info(f"Siren notified of new alarm: {alarm_data['id']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to notify Siren about new alarm: {e}")
|
||||
|
||||
return data['last_id']
|
||||
|
||||
def get_saved_alerts(self) -> list:
|
||||
"""Retrieve the list of saved alarms"""
|
||||
return self.load_data().get("alarms", [])
|
||||
|
||||
def remove_saved_alert(self, alarm_id: int) -> bool:
|
||||
"""Remove an alarm by its ID"""
|
||||
data = self.load_data()
|
||||
original_length = len(data["alarms"])
|
||||
|
||||
# Filter out the alarm with the given ID
|
||||
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
|
||||
|
||||
if len(data["alarms"]) == original_length:
|
||||
return False
|
||||
|
||||
# Save updated data
|
||||
self.save_data(data)
|
||||
|
||||
# Notify Siren if available
|
||||
if self.siren:
|
||||
try:
|
||||
self.siren.dismiss_alarm(alarm_id)
|
||||
logger.info(f"Siren dismissed alarm: {alarm_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to notify Siren about removed alarm: {e}")
|
||||
|
||||
return True
|
||||
|
||||
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
|
||||
"""Update an existing alarm"""
|
||||
alarm_data["id"] = alarm_id
|
||||
|
||||
# Set default file_to_play if not provided
|
||||
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
|
||||
# Convert nested dictionaries to dataclass instances
|
||||
try:
|
||||
if 'repeat_rule' in alarm_data:
|
||||
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||||
if 'snooze' in alarm_data:
|
||||
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||||
if 'metadata' in alarm_data:
|
||||
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||||
|
||||
# Validate input
|
||||
try:
|
||||
alarm = Alarm(**alarm_data)
|
||||
if not alarm.validate():
|
||||
raise ValueError("Invalid alarm configuration")
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||||
|
||||
# Load existing data
|
||||
data = self.load_data()
|
||||
|
||||
# Find and update the alarm
|
||||
for i, existing_alarm in enumerate(data["alarms"]):
|
||||
if existing_alarm["id"] == alarm_id:
|
||||
# Generate new hash
|
||||
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
|
||||
|
||||
# Replace the existing alarm
|
||||
data["alarms"][i] = alarm_data
|
||||
|
||||
# Save updated data
|
||||
self.save_data(data)
|
||||
|
||||
# Notify Siren if available
|
||||
if self.siren:
|
||||
try:
|
||||
# Remove existing alarm and reschedule
|
||||
self.siren.dismiss_alarm(alarm_id)
|
||||
|
||||
if alarm_data.get('enabled', True):
|
||||
self.siren.schedule_alarm(alarm_data)
|
||||
logger.info(f"Siren updated for alarm: {alarm_id}")
|
||||
else:
|
||||
logger.info(f"Siren dismissed disabled alarm: {alarm_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to notify Siren about updated alarm: {e}")
|
||||
|
||||
return True
|
||||
|
||||
# Alarm not found
|
||||
return False
|
||||
|
||||
def _get_alarm_by_id(self, alarm_id: int) -> Optional[dict]:
|
||||
"""
|
||||
Retrieve a specific alarm by its ID
|
||||
|
||||
Args:
|
||||
alarm_id (int): ID of the alarm to retrieve
|
||||
|
||||
Returns:
|
||||
dict or None: Alarm configuration or None if not found
|
||||
"""
|
||||
data = self.load_data()
|
||||
for alarm in data.get('alarms', []):
|
||||
if alarm.get('id') == alarm_id:
|
||||
return alarm
|
||||
return None
|
||||
|
||||
def set_siren(self, siren):
|
||||
"""
|
||||
Set or update the Siren instance for alarm synchronization
|
||||
|
||||
Args:
|
||||
siren (AlarmSiren): Siren instance to use for alarm management
|
||||
"""
|
||||
self.siren = siren
|
@ -1,491 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
import hashlib
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import List, Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
from http import HTTPStatus
|
||||
import re
|
||||
|
||||
# Set up logging configuration
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Console handler
|
||||
logging.FileHandler('alert_api.log') # File handler
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('AlertApi')
|
||||
|
||||
|
||||
@dataclass
|
||||
class RepeatRule:
|
||||
type: str # "daily" or "weekly" or "once"
|
||||
days_of_week: Optional[List[str]] = field(default_factory=list)
|
||||
at: Optional[str] = None
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate repeat rule configuration"""
|
||||
valid_types = {"daily", "weekly", "once"}
|
||||
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}
|
||||
|
||||
if self.type not in valid_types:
|
||||
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
|
||||
return False
|
||||
|
||||
if self.type == "weekly":
|
||||
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
|
||||
if invalid_days:
|
||||
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")
|
||||
return False
|
||||
if not self.days_of_week:
|
||||
logger.error("Weekly repeat rule must specify at least one day")
|
||||
return False
|
||||
|
||||
if self.type == "once" and self.days_of_week:
|
||||
if self.days_of_week:
|
||||
logger.error("One-time alert does not support days_of_week")
|
||||
return False
|
||||
if not self.at:
|
||||
logger.error("One-time repeat rule must specify a valid 'at' date")
|
||||
return False
|
||||
try:
|
||||
datetime.strptime(self.at, "%d.%m.%Y")
|
||||
except ValueError:
|
||||
logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'")
|
||||
return False
|
||||
|
||||
logger.debug(f"RepeatRule validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Snooze:
|
||||
enabled: bool = True
|
||||
duration: int = 10 # minutes
|
||||
max_count: int = 3
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate snooze configuration"""
|
||||
if not isinstance(self.enabled, bool):
|
||||
logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}")
|
||||
return False
|
||||
|
||||
if not isinstance(self.duration, int):
|
||||
logger.error(f"Snooze duration must be integer, got {type(self.duration)}")
|
||||
return False
|
||||
|
||||
if self.duration <= 0:
|
||||
logger.error(f"Snooze duration must be positive, got {self.duration}")
|
||||
return False
|
||||
|
||||
if not isinstance(self.max_count, int):
|
||||
logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}")
|
||||
return False
|
||||
|
||||
if self.max_count <= 0:
|
||||
logger.error(f"Snooze max_count must be positive, got {self.max_count}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Snooze validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Metadata:
|
||||
volume: int = 100
|
||||
notes: str = ""
|
||||
md5sum: str = ""
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate metadata configuration"""
|
||||
if not isinstance(self.volume, int):
|
||||
logger.error(f"Volume must be integer, got {type(self.volume)}")
|
||||
return False
|
||||
|
||||
if not 0 <= self.volume <= 100:
|
||||
logger.error(f"Volume must be between 0 and 100, got {self.volume}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Metadata validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Alarm:
|
||||
name: str
|
||||
time: str # Format: "HH:MM:SS"
|
||||
repeat_rule: RepeatRule
|
||||
file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
enabled: bool = field(default=True)
|
||||
snooze: Snooze = field(default_factory=Snooze)
|
||||
metadata: Metadata = field(default_factory=Metadata)
|
||||
id: Optional[int] = field(default=None)
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate complete alarm configuration"""
|
||||
logger.debug(f"Starting validation for alarm: {self.name}")
|
||||
|
||||
# Validate name
|
||||
if not isinstance(self.name, str) or len(self.name.strip()) == 0:
|
||||
logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string")
|
||||
return False
|
||||
|
||||
# Validate time format
|
||||
time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$')
|
||||
if not time_pattern.match(self.time):
|
||||
logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS")
|
||||
return False
|
||||
|
||||
# Validate enabled flag
|
||||
if not isinstance(self.enabled, bool):
|
||||
logger.error(f"Enabled must be boolean, got {type(self.enabled)}")
|
||||
return False
|
||||
|
||||
# Create default alarms directory if it doesn't exist
|
||||
default_alarms_dir = os.path.expanduser("~/.alarms")
|
||||
os.makedirs(default_alarms_dir, exist_ok=True)
|
||||
|
||||
# Validate audio file
|
||||
if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play):
|
||||
logger.error(f"Audio file not found: '{self.file_to_play}'")
|
||||
return False
|
||||
|
||||
# Validate repeat rule
|
||||
if not isinstance(self.repeat_rule, RepeatRule):
|
||||
logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}")
|
||||
return False
|
||||
if not self.repeat_rule.validate():
|
||||
logger.error("Repeat rule validation failed")
|
||||
return False
|
||||
|
||||
# Validate snooze
|
||||
if not isinstance(self.snooze, Snooze):
|
||||
logger.error(f"Invalid snooze type: {type(self.snooze)}")
|
||||
return False
|
||||
if not self.snooze.validate():
|
||||
logger.error("Snooze validation failed")
|
||||
return False
|
||||
|
||||
# Validate metadata
|
||||
if not isinstance(self.metadata, Metadata):
|
||||
logger.error(f"Invalid metadata type: {type(self.metadata)}")
|
||||
return False
|
||||
if not self.metadata.validate():
|
||||
logger.error("Metadata validation failed")
|
||||
return False
|
||||
|
||||
logger.debug(f"Alarm validation passed: {self.name}")
|
||||
return True
|
||||
|
||||
class StorageHandler:
|
||||
def __init__(self, file_path: str):
|
||||
self.file_path = file_path
|
||||
self._ensure_storage_directory()
|
||||
|
||||
def _ensure_storage_directory(self) -> None:
|
||||
"""Ensure the directory for the storage file exists"""
|
||||
directory = os.path.dirname(os.path.abspath(self.file_path))
|
||||
if directory:
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def _generate_alarm_hash(alarm: dict) -> str:
|
||||
"""Generate an MD5 hash for the given alarm data"""
|
||||
# Convert dataclass instances to dictionaries if needed
|
||||
def to_dict(value):
|
||||
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
|
||||
|
||||
alarm_data = alarm.copy()
|
||||
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
|
||||
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
|
||||
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
|
||||
|
||||
hash_input = json.dumps({
|
||||
"name": alarm_data["name"],
|
||||
"time": alarm_data["time"],
|
||||
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
|
||||
"repeat_rule": alarm_data["repeat_rule"]
|
||||
}, sort_keys=True)
|
||||
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
|
||||
|
||||
def load_data(self) -> dict:
|
||||
"""Load the JSON data from the file"""
|
||||
try:
|
||||
with open(self.file_path, 'r') as file:
|
||||
data = json.load(file)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Invalid data format")
|
||||
return data
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return {"last_id": 0, "alarms": []}
|
||||
|
||||
def save_data(self, data: dict) -> None:
|
||||
"""Save the JSON data back to the file"""
|
||||
def convert_to_dict(obj):
|
||||
"""Convert dataclass instances to dictionaries recursively"""
|
||||
if isinstance(obj, list):
|
||||
return [convert_to_dict(item) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return {key: convert_to_dict(value) for key, value in obj.items()}
|
||||
elif hasattr(obj, "__dataclass_fields__"):
|
||||
return asdict(obj)
|
||||
else:
|
||||
return obj
|
||||
|
||||
# Convert data to JSON-serializable format
|
||||
serializable_data = convert_to_dict(data)
|
||||
|
||||
with open(self.file_path, 'w') as file:
|
||||
json.dump(serializable_data, file, indent=4)
|
||||
|
||||
def save_new_alert(self, alarm_data: dict) -> int:
|
||||
"""Add a new alarm to the storage"""
|
||||
# Remove id if present in input data
|
||||
alarm_data.pop('id', None)
|
||||
|
||||
# Set default file_to_play if not provided
|
||||
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
|
||||
# Convert nested dictionaries to dataclass instances
|
||||
try:
|
||||
if 'repeat_rule' in alarm_data:
|
||||
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||||
if 'snooze' in alarm_data:
|
||||
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||||
if 'metadata' in alarm_data:
|
||||
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||||
|
||||
# Validate input
|
||||
try:
|
||||
alarm = Alarm(**alarm_data)
|
||||
if not alarm.validate():
|
||||
raise ValueError("Invalid alarm configuration")
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||||
|
||||
# Generate hash and check for duplicates
|
||||
alarm_hash = self._generate_alarm_hash(alarm_data)
|
||||
data = self.load_data()
|
||||
|
||||
if any(a.get("hash") == alarm_hash for a in data["alarms"]):
|
||||
raise ValueError("Duplicate alarm detected")
|
||||
|
||||
# Add new alarm
|
||||
alarm_data["id"] = data.get("last_id", 0) + 1
|
||||
alarm_data["hash"] = alarm_hash
|
||||
data["last_id"] = alarm_data["id"]
|
||||
data["alarms"].append(alarm_data)
|
||||
|
||||
self.save_data(data)
|
||||
return alarm_data["id"]
|
||||
|
||||
def get_saved_alerts(self) -> list:
|
||||
"""Retrieve the list of saved alarms"""
|
||||
return self.load_data().get("alarms", [])
|
||||
|
||||
def remove_saved_alert(self, alarm_id: int) -> bool:
|
||||
"""Remove an alarm by its ID"""
|
||||
data = self.load_data()
|
||||
original_length = len(data["alarms"])
|
||||
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
|
||||
|
||||
if len(data["alarms"]) == original_length:
|
||||
return False
|
||||
|
||||
self.save_data(data)
|
||||
return True
|
||||
|
||||
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
|
||||
"""Update an existing alarm"""
|
||||
alarm_data["id"] = alarm_id
|
||||
|
||||
# Set default file_to_play if not provided
|
||||
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
|
||||
# Convert nested dictionaries to dataclass instances
|
||||
try:
|
||||
if 'repeat_rule' in alarm_data:
|
||||
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
|
||||
if 'snooze' in alarm_data:
|
||||
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
|
||||
if 'metadata' in alarm_data:
|
||||
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
|
||||
except TypeError as e:
|
||||
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
|
||||
|
||||
# Validate input
|
||||
try:
|
||||
alarm = Alarm(**alarm_data)
|
||||
if not alarm.validate():
|
||||
raise ValueError("Invalid alarm configuration")
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid alarm data: {str(e)}")
|
||||
|
||||
data = self.load_data()
|
||||
for i, existing_alarm in enumerate(data["alarms"]):
|
||||
if existing_alarm["id"] == alarm_id:
|
||||
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
|
||||
data["alarms"][i] = alarm_data
|
||||
self.save_data(data)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class AlertApi(BaseHTTPRequestHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.storage = StorageHandler("data/alerts.json")
|
||||
self.logger = logging.getLogger('AlertApi')
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None:
|
||||
"""Send a JSON response with the given status code and data/error"""
|
||||
self.send_response(status_code)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Access-Control-Allow-Origin", "*")
|
||||
self.end_headers()
|
||||
|
||||
response = {}
|
||||
if data is not None:
|
||||
response["data"] = data
|
||||
if error is not None:
|
||||
response["error"] = error
|
||||
|
||||
response_json = json.dumps(response)
|
||||
self.logger.debug(f"Sending response: {response_json}")
|
||||
self.wfile.write(response_json.encode("utf-8"))
|
||||
|
||||
def _handle_request(self, method: str) -> None:
|
||||
"""Handle incoming requests with proper error handling"""
|
||||
self.logger.info(f"Received {method} request from {self.client_address[0]}")
|
||||
|
||||
try:
|
||||
if method in ["POST", "PUT", "DELETE"]:
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
if content_length == 0:
|
||||
raise ValueError("Missing request body")
|
||||
|
||||
post_data = self.rfile.read(content_length)
|
||||
self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}")
|
||||
|
||||
post_data = json.loads(post_data)
|
||||
else:
|
||||
post_data = None
|
||||
|
||||
# Route request to appropriate handler
|
||||
handler = getattr(self, f"_handle_{method.lower()}", None)
|
||||
if handler:
|
||||
self.logger.debug(f"Routing to handler: _handle_{method.lower()}")
|
||||
handler(post_data)
|
||||
else:
|
||||
self.logger.warning(f"Method not allowed: {method}")
|
||||
self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
self.logger.error(f"JSON decode error: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body")
|
||||
except ValueError as e:
|
||||
self.logger.error(f"Validation error: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
|
||||
except Exception as e:
|
||||
self.logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
||||
self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error")
|
||||
|
||||
def _handle_get(self, _) -> None:
|
||||
"""Handle GET request"""
|
||||
self.logger.debug("Processing GET request for all alarms")
|
||||
alarms = self.storage.get_saved_alerts()
|
||||
self.logger.debug(f"Retrieved {len(alarms)} alarms")
|
||||
self._send_response(HTTPStatus.OK, data=alarms)
|
||||
|
||||
def _handle_post(self, data: dict) -> None:
|
||||
"""Handle POST request"""
|
||||
self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}")
|
||||
try:
|
||||
alarm_id = self.storage.save_new_alert(data)
|
||||
self.logger.info(f"Successfully created new alarm with ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.CREATED, data={"id": alarm_id})
|
||||
except ValueError as e:
|
||||
self.logger.error(f"Failed to create alarm: {str(e)}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
|
||||
|
||||
def _handle_put(self, data: dict) -> None:
|
||||
"""Handle PUT request"""
|
||||
alarm_id = data.pop("id", None)
|
||||
self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}")
|
||||
|
||||
if alarm_id is None:
|
||||
self.logger.error("PUT request missing alarm ID")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID")
|
||||
return
|
||||
|
||||
if self.storage.update_alert(alarm_id, data):
|
||||
self.logger.info(f"Successfully updated alarm ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"})
|
||||
else:
|
||||
self.logger.warning(f"Alarm not found for update: {alarm_id}")
|
||||
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
|
||||
|
||||
def _handle_delete(self, data: dict) -> None:
|
||||
"""Handle DELETE request"""
|
||||
alarm_id = data.get("id")
|
||||
self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}")
|
||||
|
||||
if not isinstance(alarm_id, int):
|
||||
self.logger.error(f"Invalid alarm ID format: {alarm_id}")
|
||||
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID")
|
||||
return
|
||||
|
||||
if self.storage.remove_saved_alert(alarm_id):
|
||||
self.logger.info(f"Successfully deleted alarm ID: {alarm_id}")
|
||||
self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"})
|
||||
else:
|
||||
self.logger.warning(f"Alarm not found for deletion: {alarm_id}")
|
||||
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
|
||||
|
||||
def do_GET(self): self._handle_request("GET")
|
||||
def do_POST(self): self._handle_request("POST")
|
||||
def do_PUT(self): self._handle_request("PUT")
|
||||
def do_DELETE(self): self._handle_request("DELETE")
|
||||
|
||||
|
||||
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
|
||||
# Set up logging configuration
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Console handler
|
||||
logging.FileHandler('alert_api.log') # File handler
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('AlertApi')
|
||||
logger.info(f"Starting AlertApi on port {port}")
|
||||
|
||||
server_address = ("", port)
|
||||
httpd = server_class(server_address, handler_class)
|
||||
|
||||
try:
|
||||
logger.info("Server is ready to handle requests")
|
||||
httpd.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received shutdown signal")
|
||||
except Exception as e:
|
||||
logger.error(f"Server error: {str(e)}", exc_info=True)
|
||||
finally:
|
||||
httpd.server_close()
|
||||
logger.info("Server stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from sys import argv
|
||||
run(port=int(argv[1]) if len(argv) == 2 else 8000)
|
102
alert_api/big_digits.py
Normal file
102
alert_api/big_digits.py
Normal file
@ -0,0 +1,102 @@
|
||||
# Big digit patterns (15x7 size)
|
||||
BIG_DIGITS = {
|
||||
'0': [
|
||||
" █████████ ",
|
||||
" ███████████ ",
|
||||
"███ ████",
|
||||
"███ ████",
|
||||
"███ ████",
|
||||
" ███████████ ",
|
||||
" █████████ "
|
||||
],
|
||||
'1': [
|
||||
" ███ ",
|
||||
" █████ ",
|
||||
" ███ ",
|
||||
" ███ ",
|
||||
" ███ ",
|
||||
" ███ ",
|
||||
" ███████ "
|
||||
],
|
||||
'2': [
|
||||
" ██████████ ",
|
||||
"████████████",
|
||||
" ████",
|
||||
" ██████████ ",
|
||||
"████ ",
|
||||
"████████████",
|
||||
" ██████████ "
|
||||
],
|
||||
'3': [
|
||||
" ██████████ ",
|
||||
"████████████",
|
||||
" ████",
|
||||
" ██████ ",
|
||||
" ████",
|
||||
"████████████",
|
||||
" ██████████ "
|
||||
],
|
||||
'4': [
|
||||
"███ ████",
|
||||
"███ ████",
|
||||
"███ ████",
|
||||
"████████████",
|
||||
" ████",
|
||||
" ████",
|
||||
" ████"
|
||||
],
|
||||
'5': [
|
||||
"████████████",
|
||||
"████████████",
|
||||
"████ ",
|
||||
"████████████",
|
||||
" ████",
|
||||
"████████████",
|
||||
"████████████"
|
||||
],
|
||||
'6': [
|
||||
" ██████████ ",
|
||||
"████████████",
|
||||
"████ ",
|
||||
"████████████",
|
||||
"████ ████",
|
||||
"████████████",
|
||||
" ██████████ "
|
||||
],
|
||||
'7': [
|
||||
"████████████",
|
||||
"████████████",
|
||||
" ████ ",
|
||||
" ████ ",
|
||||
" ████ ",
|
||||
"████ ",
|
||||
"████ "
|
||||
],
|
||||
'8': [
|
||||
" ██████████ ",
|
||||
"████████████",
|
||||
"████ ████",
|
||||
" ██████████ ",
|
||||
"████ ████",
|
||||
"████████████",
|
||||
" ██████████ "
|
||||
],
|
||||
'9': [
|
||||
" ██████████ ",
|
||||
"████████████",
|
||||
"████ ████",
|
||||
"████████████",
|
||||
" ████",
|
||||
"████████████",
|
||||
" ██████████ "
|
||||
],
|
||||
':': [
|
||||
" ",
|
||||
" ████ ",
|
||||
" ████ ",
|
||||
" ",
|
||||
" ████ ",
|
||||
" ████ ",
|
||||
" "
|
||||
]
|
||||
}
|
180
alert_api/data_classes.py
Normal file
180
alert_api/data_classes.py
Normal file
@ -0,0 +1,180 @@
|
||||
import hashlib
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import List, Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
# Set up logging configuration
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(), # Console handler
|
||||
logging.FileHandler('alert_api.log') # File handler
|
||||
]
|
||||
)
|
||||
|
||||
logger = logging.getLogger('AlertApi')
|
||||
|
||||
|
||||
@dataclass
|
||||
class RepeatRule:
|
||||
type: str # "daily" or "weekly" or "once"
|
||||
days_of_week: Optional[List[str]] = field(default_factory=list)
|
||||
at: Optional[str] = None
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate repeat rule configuration"""
|
||||
valid_types = {"daily", "weekly", "once"}
|
||||
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}
|
||||
|
||||
if self.type not in valid_types:
|
||||
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
|
||||
return False
|
||||
|
||||
if self.type == "weekly":
|
||||
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
|
||||
if invalid_days:
|
||||
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")
|
||||
return False
|
||||
if not self.days_of_week:
|
||||
logger.error("Weekly repeat rule must specify at least one day")
|
||||
return False
|
||||
|
||||
if self.type == "once" and self.days_of_week:
|
||||
if self.days_of_week:
|
||||
logger.error("One-time alert does not support days_of_week")
|
||||
return False
|
||||
if not self.at:
|
||||
logger.error("One-time repeat rule must specify a valid 'at' date")
|
||||
return False
|
||||
try:
|
||||
datetime.strptime(self.at, "%d.%m.%Y")
|
||||
except ValueError:
|
||||
logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'")
|
||||
return False
|
||||
|
||||
logger.debug(f"RepeatRule validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Snooze:
|
||||
enabled: bool = True
|
||||
duration: int = 10 # minutes
|
||||
max_count: int = 3
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate snooze configuration"""
|
||||
if not isinstance(self.enabled, bool):
|
||||
logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}")
|
||||
return False
|
||||
|
||||
if not isinstance(self.duration, int):
|
||||
logger.error(f"Snooze duration must be integer, got {type(self.duration)}")
|
||||
return False
|
||||
|
||||
if self.duration <= 0:
|
||||
logger.error(f"Snooze duration must be positive, got {self.duration}")
|
||||
return False
|
||||
|
||||
if not isinstance(self.max_count, int):
|
||||
logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}")
|
||||
return False
|
||||
|
||||
if self.max_count <= 0:
|
||||
logger.error(f"Snooze max_count must be positive, got {self.max_count}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Snooze validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Metadata:
|
||||
volume: int = 100
|
||||
notes: str = ""
|
||||
md5sum: str = ""
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate metadata configuration"""
|
||||
if not isinstance(self.volume, int):
|
||||
logger.error(f"Volume must be integer, got {type(self.volume)}")
|
||||
return False
|
||||
|
||||
if not 0 <= self.volume <= 100:
|
||||
logger.error(f"Volume must be between 0 and 100, got {self.volume}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Metadata validation passed: {self.__dict__}")
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Alarm:
|
||||
name: str
|
||||
time: str # Format: "HH:MM:SS"
|
||||
repeat_rule: RepeatRule
|
||||
file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
|
||||
enabled: bool = field(default=True)
|
||||
snooze: Snooze = field(default_factory=Snooze)
|
||||
metadata: Metadata = field(default_factory=Metadata)
|
||||
id: Optional[int] = field(default=None)
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate complete alarm configuration"""
|
||||
logger.debug(f"Starting validation for alarm: {self.name}")
|
||||
|
||||
# Validate name
|
||||
if not isinstance(self.name, str) or len(self.name.strip()) == 0:
|
||||
logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string")
|
||||
return False
|
||||
|
||||
# Validate time format
|
||||
time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$')
|
||||
if not time_pattern.match(self.time):
|
||||
logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS")
|
||||
return False
|
||||
|
||||
# Validate enabled flag
|
||||
if not isinstance(self.enabled, bool):
|
||||
logger.error(f"Enabled must be boolean, got {type(self.enabled)}")
|
||||
return False
|
||||
|
||||
# Create default alarms directory if it doesn't exist
|
||||
default_alarms_dir = os.path.expanduser("~/.alarms")
|
||||
os.makedirs(default_alarms_dir, exist_ok=True)
|
||||
|
||||
# Validate audio file
|
||||
if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play):
|
||||
logger.error(f"Audio file not found: '{self.file_to_play}'")
|
||||
return False
|
||||
|
||||
# Validate repeat rule
|
||||
if not isinstance(self.repeat_rule, RepeatRule):
|
||||
logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}")
|
||||
return False
|
||||
if not self.repeat_rule.validate():
|
||||
logger.error("Repeat rule validation failed")
|
||||
return False
|
||||
|
||||
# Validate snooze
|
||||
if not isinstance(self.snooze, Snooze):
|
||||
logger.error(f"Invalid snooze type: {type(self.snooze)}")
|
||||
return False
|
||||
if not self.snooze.validate():
|
||||
logger.error("Snooze validation failed")
|
||||
return False
|
||||
|
||||
# Validate metadata
|
||||
if not isinstance(self.metadata, Metadata):
|
||||
logger.error(f"Invalid metadata type: {type(self.metadata)}")
|
||||
return False
|
||||
if not self.metadata.validate():
|
||||
logger.error("Metadata validation failed")
|
||||
return False
|
||||
|
||||
logger.debug(f"Alarm validation passed: {self.name}")
|
||||
return True
|
17
alert_api/logging_config.py
Normal file
17
alert_api/logging_config.py
Normal file
@ -0,0 +1,17 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
def setup_logging(log_dir="logs", log_file="alarm_system.log", level=logging.DEBUG):
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
log_path = os.path.join(log_dir, log_file)
|
||||
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.StreamHandler(),
|
||||
logging.FileHandler(log_path, mode='a', encoding='utf-8')
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger("AlarmSystem")
|
||||
return logger
|
182
alert_api/main.py
Executable file
182
alert_api/main.py
Executable file
@ -0,0 +1,182 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import threading
|
||||
import logging
|
||||
from http.server import HTTPServer
|
||||
from multiprocessing import Queue
|
||||
|
||||
# Import our custom modules
|
||||
from alarm_api import AlertApi, run as run_api
|
||||
from alarm_storage import AlarmStorage
|
||||
from alarm_siren import AlarmSiren
|
||||
from ncurses_ui import UI
|
||||
from logging_config import setup_logging
|
||||
|
||||
# Set up logging
|
||||
logger = setup_logging()
|
||||
|
||||
class AlarmSystemManager:
|
||||
def __init__(self,
|
||||
api_port: int = 8000,
|
||||
storage_path: str = "data/alerts.json",
|
||||
log_level: int = logging.DEBUG):
|
||||
"""
|
||||
Initialize and manage the entire alarm system
|
||||
|
||||
Args:
|
||||
api_port (int): Port for the HTTP API server
|
||||
storage_path (str): Path to the JSON storage file
|
||||
log_level (int): Logging level
|
||||
"""
|
||||
# Configure logging
|
||||
logging.getLogger().setLevel(log_level)
|
||||
|
||||
# Ensure storage directory exists
|
||||
os.makedirs(os.path.dirname(os.path.abspath(storage_path)), exist_ok=True)
|
||||
|
||||
# Initialize components
|
||||
self.siren = AlarmSiren()
|
||||
self.storage = AlarmStorage(storage_path, siren=self.siren)
|
||||
|
||||
# API server setup
|
||||
self.api_port = api_port
|
||||
self.api_server = None
|
||||
self.api_thread = None
|
||||
|
||||
# Synchronization and lifecycle management
|
||||
self.stop_event = threading.Event()
|
||||
|
||||
# Signal handling setup
|
||||
self._setup_signal_handlers()
|
||||
|
||||
# Alarm synchronization
|
||||
self._sync_alarms()
|
||||
|
||||
# UI..
|
||||
self.ui = UI(self)
|
||||
|
||||
def _setup_signal_handlers(self):
|
||||
"""Set up signal handlers for graceful shutdown"""
|
||||
signal.signal(signal.SIGINT, self._handle_shutdown)
|
||||
signal.signal(signal.SIGTERM, self._handle_shutdown)
|
||||
|
||||
def _handle_shutdown(self, signum, frame):
|
||||
"""
|
||||
Handle system shutdown signals
|
||||
|
||||
Args:
|
||||
signum (int): Signal number
|
||||
frame (frame): Current stack frame
|
||||
"""
|
||||
logger.info(f"Received shutdown signal {signum}. Initiating graceful shutdown...")
|
||||
self.stop_event.set()
|
||||
|
||||
def _sync_alarms(self):
|
||||
"""
|
||||
Synchronize stored alarms with the siren
|
||||
This ensures any saved alarms are loaded and scheduled
|
||||
"""
|
||||
try:
|
||||
saved_alarms = self.storage.get_saved_alerts()
|
||||
logger.info(f"Synchronizing {len(saved_alarms)} saved alarms")
|
||||
|
||||
for alarm in saved_alarms:
|
||||
if alarm.get('enabled', True):
|
||||
self.siren.schedule_alarm(alarm)
|
||||
except Exception as e:
|
||||
logger.error(f"Error synchronizing alarms: {e}")
|
||||
|
||||
def _start_api_server(self):
|
||||
"""
|
||||
Start the HTTP API server in a separate thread
|
||||
"""
|
||||
try:
|
||||
def run_server():
|
||||
server_address = ("", self.api_port)
|
||||
self.api_server = HTTPServer(server_address, AlertApi)
|
||||
logger.info(f"API Server started on port {self.api_port}")
|
||||
|
||||
# Run until stopped
|
||||
while not self.stop_event.is_set():
|
||||
self.api_server.handle_request()
|
||||
|
||||
self.api_server.server_close()
|
||||
logger.info("API Server stopped")
|
||||
|
||||
# Start API server in a thread
|
||||
self.api_thread = threading.Thread(target=run_server, daemon=True)
|
||||
self.api_thread.start()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start API server: {e}")
|
||||
self.stop_event.set()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Main run method to start all system components
|
||||
"""
|
||||
try:
|
||||
# Start API server
|
||||
self._start_api_server()
|
||||
|
||||
# Start UI
|
||||
ui_thread = self.ui.run()
|
||||
|
||||
# Log system startup
|
||||
logger.info("Alarm System started successfully")
|
||||
|
||||
# Wait for shutdown signal
|
||||
while not self.stop_event.is_set():
|
||||
self.stop_event.wait(timeout=1)
|
||||
|
||||
# Graceful shutdown
|
||||
logger.info("Initiating system shutdown...")
|
||||
except Exception as e:
|
||||
logger.error(f"System startup error: {e}")
|
||||
finally:
|
||||
self._cleanup()
|
||||
|
||||
def _cleanup(self):
|
||||
"""
|
||||
Perform cleanup operations during shutdown
|
||||
"""
|
||||
# Close API server if running
|
||||
if self.api_server:
|
||||
try:
|
||||
self.api_server.server_close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing API server: {e}")
|
||||
|
||||
# Stop threads
|
||||
if self.api_thread and self.api_thread.is_alive():
|
||||
self.api_thread.join(timeout=2)
|
||||
|
||||
logger.info("Alarm System shutdown complete")
|
||||
|
||||
def main():
|
||||
"""
|
||||
Entry point for the Alarm System
|
||||
Parse command-line arguments and start the system
|
||||
"""
|
||||
# Default configurations
|
||||
default_port = 8000
|
||||
default_storage_path = "data/alerts.json"
|
||||
|
||||
# Parse port from command line if provided
|
||||
port = int(sys.argv[1]) if len(sys.argv) > 1 else default_port
|
||||
|
||||
# Create and run the Alarm System
|
||||
try:
|
||||
alarm_system = AlarmSystemManager(
|
||||
api_port=port,
|
||||
storage_path=default_storage_path
|
||||
)
|
||||
alarm_system.run()
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error starting Alarm System: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
230
alert_api/ncurses_ui.py
Normal file
230
alert_api/ncurses_ui.py
Normal file
@ -0,0 +1,230 @@
|
||||
import curses
|
||||
import time
|
||||
from datetime import datetime, date, timedelta
|
||||
import os
|
||||
from big_digits import BIG_DIGITS
|
||||
from ncurses_ui_draw import _draw_big_digit, _draw_main_clock, _draw_add_alarm, _draw_list_alarms, _draw_error
|
||||
|
||||
class UI:
|
||||
def __init__(self, alarm_system_manager):
|
||||
"""
|
||||
Initialize the ncurses UI for the alarm system
|
||||
|
||||
Args:
|
||||
alarm_system_manager (AlarmSystemManager): The main alarm system manager
|
||||
"""
|
||||
self.alarm_system = alarm_system_manager
|
||||
self.stop_event = alarm_system_manager.stop_event
|
||||
self.storage = alarm_system_manager.storage
|
||||
|
||||
# UI state variables
|
||||
self.selected_menu = 0
|
||||
self.new_alarm_name = "Alarm"
|
||||
self.new_alarm_hour = datetime.now().hour
|
||||
self.new_alarm_minute = datetime.now().minute
|
||||
self.new_alarm_selected = 0
|
||||
self.new_alarm_date = None
|
||||
self.new_alarm_weekdays = []
|
||||
self.new_alarm_enabled = True
|
||||
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||||
self.alarms = []
|
||||
self.error_message = None
|
||||
self.error_timestamp = None
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Start the ncurses UI
|
||||
"""
|
||||
def ui_thread():
|
||||
try:
|
||||
curses.wrapper(self._main_loop)
|
||||
except Exception as e:
|
||||
print(f"Error in UI thread: {e}")
|
||||
finally:
|
||||
self.stop_event.set()
|
||||
|
||||
import threading
|
||||
ui_thread_obj = threading.Thread(target=ui_thread, daemon=True)
|
||||
ui_thread_obj.start()
|
||||
return ui_thread_obj
|
||||
|
||||
def _handle_add_alarm_input(self, key):
|
||||
"""
|
||||
Handle input for adding a new alarm
|
||||
"""
|
||||
if key == 27: # Escape
|
||||
self.selected_menu = 0
|
||||
return
|
||||
|
||||
if key == 10: # Enter
|
||||
try:
|
||||
# Prepare alarm data
|
||||
alarm_data = {
|
||||
"name": self.new_alarm_name,
|
||||
"time": f"{self.new_alarm_hour:02d}:{self.new_alarm_minute:02d}:00",
|
||||
"enabled": self.new_alarm_enabled
|
||||
}
|
||||
|
||||
# Add repeat rule if applicable
|
||||
if self.new_alarm_weekdays:
|
||||
alarm_data["repeat_rule"] = {
|
||||
"type": "weekly",
|
||||
"days": self.new_alarm_weekdays
|
||||
}
|
||||
elif self.new_alarm_date:
|
||||
alarm_data["repeat_rule"] = {
|
||||
"type": "once",
|
||||
"date": self.new_alarm_date.strftime("%Y-%m-%d")
|
||||
}
|
||||
|
||||
# Save new alarm
|
||||
self.storage.save_new_alert(alarm_data)
|
||||
|
||||
# Reset form
|
||||
self.selected_menu = 0
|
||||
self.new_alarm_hour = datetime.now().hour
|
||||
self.new_alarm_minute = datetime.now().minute
|
||||
self.new_alarm_date = None
|
||||
self.new_alarm_weekdays = []
|
||||
self.new_alarm_name = "Alarm"
|
||||
self.new_alarm_enabled = True
|
||||
except Exception as e:
|
||||
# TODO: Show error on screen
|
||||
print(f"Failed to save alarm: {e}")
|
||||
return
|
||||
|
||||
if key == curses.KEY_LEFT:
|
||||
self.new_alarm_selected = (self.new_alarm_selected - 1) % 6
|
||||
elif key == curses.KEY_RIGHT:
|
||||
self.new_alarm_selected = (self.new_alarm_selected + 1) % 6
|
||||
elif key == 32: # Space
|
||||
if self.new_alarm_selected == 2: # Date
|
||||
self.new_alarm_date = None
|
||||
elif self.new_alarm_selected == 3: # Weekdays
|
||||
current_day = len(self.new_alarm_weekdays)
|
||||
if current_day < 7:
|
||||
if current_day in self.new_alarm_weekdays:
|
||||
self.new_alarm_weekdays.remove(current_day)
|
||||
else:
|
||||
self.new_alarm_weekdays.append(current_day)
|
||||
self.new_alarm_weekdays.sort()
|
||||
elif self.new_alarm_selected == 5: # Enabled toggle
|
||||
self.new_alarm_enabled = not self.new_alarm_enabled
|
||||
|
||||
elif key == curses.KEY_UP:
|
||||
if self.new_alarm_selected == 0:
|
||||
self.new_alarm_hour = (self.new_alarm_hour + 1) % 24
|
||||
elif self.new_alarm_selected == 1:
|
||||
self.new_alarm_minute = (self.new_alarm_minute + 1) % 60
|
||||
elif self.new_alarm_selected == 2:
|
||||
if not self.new_alarm_date:
|
||||
self.new_alarm_date = date.today()
|
||||
else:
|
||||
self.new_alarm_date += timedelta(days=1)
|
||||
elif self.new_alarm_selected == 4: # Name
|
||||
self.new_alarm_name += " "
|
||||
|
||||
elif key == curses.KEY_DOWN:
|
||||
if self.new_alarm_selected == 0:
|
||||
self.new_alarm_hour = (self.new_alarm_hour - 1) % 24
|
||||
elif self.new_alarm_selected == 1:
|
||||
self.new_alarm_minute = (self.new_alarm_minute - 1) % 60
|
||||
elif self.new_alarm_selected == 2 and self.new_alarm_date:
|
||||
self.new_alarm_date -= timedelta(days=1)
|
||||
elif self.new_alarm_selected == 4 and len(self.new_alarm_name) > 1:
|
||||
self.new_alarm_name = self.new_alarm_name.rstrip()[:-1]
|
||||
|
||||
def _handle_list_alarms_input(self, key):
|
||||
"""
|
||||
Handle input for the list alarms screen
|
||||
"""
|
||||
if key == 27: # Escape
|
||||
self.selected_menu = 0
|
||||
elif key == ord('d'):
|
||||
# Delete last alarm if exists
|
||||
if self.alarms:
|
||||
last_alarm = self.alarms[-1]
|
||||
try:
|
||||
self.storage.remove_saved_alert(last_alarm['id'])
|
||||
except Exception as e:
|
||||
print(f"Failed to delete alarm: {e}")
|
||||
|
||||
def _show_error(self, message, duration=3):
|
||||
"""Display an error message for a specified duration"""
|
||||
self.error_message = message
|
||||
self.error_timestamp = time.time()
|
||||
|
||||
def _clear_error_if_expired(self):
|
||||
"""Clear error message if it has been displayed long enough"""
|
||||
if self.error_message and self.error_timestamp:
|
||||
if time.time() - self.error_timestamp > 3: # 3 seconds
|
||||
self.error_message = None
|
||||
self.error_timestamp = None
|
||||
|
||||
def _main_loop(self, stdscr):
|
||||
"""
|
||||
Main ncurses event loop
|
||||
"""
|
||||
# Initialize color pairs
|
||||
curses.start_color()
|
||||
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
||||
curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
||||
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK)
|
||||
curses.curs_set(0)
|
||||
|
||||
# Configure screen
|
||||
stdscr.keypad(1)
|
||||
stdscr.timeout(100)
|
||||
|
||||
while not self.stop_event.is_set():
|
||||
# Clear the screen
|
||||
stdscr.clear()
|
||||
|
||||
# Draw appropriate screen based on selected menu
|
||||
if self.selected_menu == 0:
|
||||
_draw_main_clock(stdscr)
|
||||
elif self.selected_menu == 1:
|
||||
_draw_add_alarm(stdscr, {
|
||||
'new_alarm_selected': self.new_alarm_selected,
|
||||
'new_alarm_name': self.new_alarm_name,
|
||||
'new_alarm_hour': self.new_alarm_hour,
|
||||
'new_alarm_minute': self.new_alarm_minute,
|
||||
'new_alarm_enabled': self.new_alarm_enabled,
|
||||
'new_alarm_date': self.new_alarm_date,
|
||||
'new_alarm_weekdays': self.new_alarm_weekdays,
|
||||
'weekday_names': self.weekday_names,
|
||||
'new_alarm_snooze_enabled': getattr(self, 'new_alarm_snooze_enabled', False),
|
||||
'new_alarm_snooze_duration': getattr(self, 'new_alarm_snooze_duration', 5),
|
||||
'new_alarm_snooze_max_count': getattr(self, 'new_alarm_snooze_max_count', 3)
|
||||
})
|
||||
elif self.selected_menu == 2:
|
||||
_draw_list_alarms(stdscr, {
|
||||
'alarms': self.alarms,
|
||||
'weekday_names': self.weekday_names
|
||||
})
|
||||
|
||||
# Refresh the screen
|
||||
stdscr.refresh()
|
||||
|
||||
# Small sleep to reduce CPU usage
|
||||
time.sleep(0.1)
|
||||
|
||||
# Handle input
|
||||
key = stdscr.getch()
|
||||
if key != -1:
|
||||
# Menu navigation and input handling
|
||||
if key == ord('q') or key == 27: # 'q' or Escape
|
||||
break
|
||||
elif key == ord('c'): # Clock/Home screen
|
||||
self.selected_menu = 0
|
||||
elif key == ord('a'): # Add Alarm
|
||||
self.selected_menu = 1
|
||||
elif key == ord('l'): # List Alarms
|
||||
self.selected_menu = 2
|
||||
self.alarms = self.storage.get_saved_alerts()
|
||||
|
||||
# Context-specific input handling
|
||||
if self.selected_menu == 1:
|
||||
self._handle_add_alarm_input(key)
|
||||
elif self.selected_menu == 2:
|
||||
self._handle_list_alarms_input(key)
|
184
alert_api/ncurses_ui_draw.py
Normal file
184
alert_api/ncurses_ui_draw.py
Normal file
@ -0,0 +1,184 @@
|
||||
from datetime import datetime
|
||||
import curses
|
||||
from big_digits import BIG_DIGITS
|
||||
|
||||
def _draw_error(stdscr, error_message):
|
||||
"""Draw error message if present"""
|
||||
if error_message:
|
||||
height, width = stdscr.getmaxyx()
|
||||
error_x = width // 2 - len(error_message) // 2
|
||||
error_y = height - 4 # Show near bottom of screen
|
||||
|
||||
# Red color for errors
|
||||
stdscr.attron(curses.color_pair(3))
|
||||
stdscr.addstr(error_y, error_x, error_message)
|
||||
stdscr.attroff(curses.color_pair(3))
|
||||
|
||||
def _draw_big_digit(stdscr, y, x, digit, big_digits):
|
||||
"""
|
||||
Draw a big digit using predefined patterns
|
||||
"""
|
||||
patterns = big_digits[digit]
|
||||
for i, line in enumerate(patterns):
|
||||
stdscr.addstr(y + i, x, line)
|
||||
|
||||
def _draw_big_time(stdscr, big_digits):
|
||||
"""
|
||||
Draw the time in big digits
|
||||
"""
|
||||
current_time = datetime.now()
|
||||
time_str = current_time.strftime("%H:%M:%S")
|
||||
|
||||
# Get terminal dimensions
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
# Calculate starting position to center the big clock
|
||||
digit_width = 14 # Width of each digit pattern including spacing
|
||||
total_width = digit_width * len(time_str)
|
||||
start_x = (width - total_width) // 2
|
||||
start_y = (height - 7) // 2 - 4 # Move up a bit to make room for date
|
||||
|
||||
# Color for the big time
|
||||
stdscr.attron(curses.color_pair(1))
|
||||
for i, digit in enumerate(time_str):
|
||||
_draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit, big_digits)
|
||||
stdscr.attroff(curses.color_pair(1))
|
||||
|
||||
def _draw_main_clock(stdscr):
|
||||
"""
|
||||
Draw the main clock screen
|
||||
"""
|
||||
current_time = datetime.now()
|
||||
time_str = current_time.strftime("%H:%M:%S")
|
||||
date_str = current_time.strftime("%Y-%m-%d")
|
||||
|
||||
# Get terminal dimensions
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
# Draw big time
|
||||
# Note: You'll need to pass BIG_DIGITS from big_digits module when calling
|
||||
_draw_big_time(stdscr, BIG_DIGITS)
|
||||
|
||||
# Draw date
|
||||
date_x = width // 2 - len(date_str) // 2
|
||||
date_y = height // 2 + 4 # Below the big clock
|
||||
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(date_y, date_x, date_str)
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Draw menu options
|
||||
menu_str = "A: Add Alarm L: List Alarms Q: Quit"
|
||||
menu_x = width // 2 - len(menu_str) // 2
|
||||
stdscr.addstr(height - 2, menu_x, menu_str)
|
||||
|
||||
def _draw_add_alarm(stdscr, context):
|
||||
"""
|
||||
Draw the add alarm screen
|
||||
|
||||
Args:
|
||||
stdscr: The curses screen object
|
||||
context: A dictionary containing UI state variables
|
||||
"""
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
form_y = height // 2 - 3
|
||||
stdscr.addstr(form_y, width // 2 - 10, "Add New Alarm")
|
||||
|
||||
# Name input
|
||||
if context['new_alarm_selected'] == 4:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 1, width // 2 - 10, f"Name: {context['new_alarm_name']}")
|
||||
if context['new_alarm_selected'] == 4:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Time selection
|
||||
hour_str = f"{context['new_alarm_hour']:02d}"
|
||||
minute_str = f"{context['new_alarm_minute']:02d}"
|
||||
|
||||
# Highlight selected field
|
||||
if context['new_alarm_selected'] == 0:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 2, width // 2 - 2, hour_str)
|
||||
if context['new_alarm_selected'] == 0:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
stdscr.addstr(form_y + 2, width // 2, ":")
|
||||
|
||||
if context['new_alarm_selected'] == 1:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 2, width // 2 + 1, minute_str)
|
||||
if context['new_alarm_selected'] == 1:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Enabled/Disabled toggle
|
||||
enabled_str = "Enabled" if context['new_alarm_enabled'] else "Disabled"
|
||||
if context['new_alarm_selected'] == 5:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 4, width // 2 - len(enabled_str)//2, enabled_str)
|
||||
if context['new_alarm_selected'] == 5:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Date selection
|
||||
date_str = "No specific date" if not context['new_alarm_date'] else context['new_alarm_date'].strftime("%Y-%m-%d")
|
||||
if context['new_alarm_selected'] == 2:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 3, width // 2 - len(date_str) // 2, date_str)
|
||||
if context['new_alarm_selected'] == 2:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Weekday selection
|
||||
weekday_str = "Repeat: " + " ".join(
|
||||
context['weekday_names'][i] if i in context['new_alarm_weekdays'] else "___"
|
||||
for i in range(7)
|
||||
)
|
||||
if context['new_alarm_selected'] == 3:
|
||||
stdscr.attron(curses.color_pair(2))
|
||||
stdscr.addstr(form_y + 5, width // 2 - len(weekday_str) // 2, weekday_str)
|
||||
if context['new_alarm_selected'] == 3:
|
||||
stdscr.attroff(curses.color_pair(2))
|
||||
|
||||
# Instructions
|
||||
stdscr.addstr(height - 2, 2,
|
||||
"↑↓: Change ←→: Switch Space: Toggle Enter: Save Esc: Cancel")
|
||||
|
||||
def _draw_list_alarms(stdscr, context):
|
||||
"""
|
||||
Draw the list of alarms screen
|
||||
|
||||
Args:
|
||||
stdscr: The curses screen object
|
||||
context: A dictionary containing UI state variables
|
||||
"""
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
# Header
|
||||
stdscr.addstr(2, width // 2 - 5, "Alarms")
|
||||
|
||||
if not context['alarms']:
|
||||
stdscr.addstr(4, width // 2 - 10, "No alarms set")
|
||||
else:
|
||||
for i, alarm in enumerate(context['alarms'][:height-6]):
|
||||
# Format time and repeat information
|
||||
time_str = alarm.get('time', 'Unknown')
|
||||
|
||||
# Format repeat info
|
||||
repeat_info = ""
|
||||
repeat_rule = alarm.get('repeat_rule', {})
|
||||
if repeat_rule:
|
||||
if repeat_rule.get('type') == 'weekly':
|
||||
days = repeat_rule.get('days', [])
|
||||
repeat_info = f" (Every {', '.join(context['weekday_names'][d] for d in days)})"
|
||||
elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'):
|
||||
repeat_info = f" (On {repeat_rule['date']})"
|
||||
|
||||
# Status indicator
|
||||
status = "✓" if alarm.get('enabled', True) else "✗"
|
||||
display_str = f"{status} {time_str}{repeat_info}"
|
||||
|
||||
# Truncate if too long
|
||||
display_str = display_str[:width-4]
|
||||
|
||||
stdscr.addstr(4 + i, 2, display_str)
|
||||
|
||||
stdscr.addstr(height - 2, 2, "D: Delete Enter: Edit Esc: Back")
|
Reference in New Issue
Block a user