Compare commits
12 Commits
0eb5204833
...
main
Author | SHA1 | Date | |
---|---|---|---|
f59048c2d0 | |||
7eafbf5ebd | |||
1072973f3f | |||
5aa1f078bc | |||
3a3813e389 | |||
c8bb1517c5 | |||
8690532c19 | |||
6fd172ce2e | |||
f3dabc1116 | |||
a48976a762 | |||
4e1c838eaa | |||
56827825b3 |
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
__pycache*
|
||||||
|
*log
|
||||||
|
/logs
|
||||||
|
/data
|
||||||
|
*json
|
||||||
|
*~
|
16
README.md
Normal file
16
README.md
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
# EEE PC Alarm Clock!
|
||||||
|
|
||||||
|
Runs on Debian 12.
|
||||||
|
Requires Python 3 and mpg123.
|
||||||
|
|
||||||
|
To use, just run `main.py` from the `clock/` directory.
|
||||||
|
|
||||||
|
It includes:
|
||||||
|
- A local ncurses UI.
|
||||||
|
- A simple HTTP API
|
||||||
|
- for how to use it, check out
|
||||||
|
`alert_clients/alarm_api_client/alarm_api_client.py`.
|
||||||
|
|
||||||
|
Put your alarm sounds in `~/alarms/` as MP3 files.
|
||||||
|
|
||||||
|
License: MIT
|
@ -1,218 +0,0 @@
|
|||||||
import os
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
import subprocess
|
|
||||||
import queue
|
|
||||||
import logging
|
|
||||||
from datetime import datetime, timedelta
|
|
||||||
from typing import Optional, Dict, Any
|
|
||||||
|
|
||||||
# Set up logging
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.DEBUG,
|
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(),
|
|
||||||
logging.FileHandler('alarm_siren.log')
|
|
||||||
]
|
|
||||||
)
|
|
||||||
logger = logging.getLogger('AlarmSiren')
|
|
||||||
|
|
||||||
class AlarmSiren:
|
|
||||||
def __init__(self):
|
|
||||||
# Communication queues
|
|
||||||
self.alarm_queue = queue.Queue()
|
|
||||||
self.control_queue = queue.Queue()
|
|
||||||
|
|
||||||
# Tracking active alarms
|
|
||||||
self.active_alarms: Dict[int, Dict[str, Any]] = {}
|
|
||||||
|
|
||||||
# Playback thread
|
|
||||||
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
|
|
||||||
self.playback_thread.start()
|
|
||||||
|
|
||||||
def schedule_alarm(self, alarm_config: Dict[str, Any]):
|
|
||||||
"""Schedule an alarm based on its configuration"""
|
|
||||||
logger.info(f"Scheduling alarm: {alarm_config}")
|
|
||||||
self.alarm_queue.put(alarm_config)
|
|
||||||
|
|
||||||
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
|
|
||||||
"""Calculate the next alarm trigger time based on repeat rule"""
|
|
||||||
now = datetime.now()
|
|
||||||
current_time = now.time()
|
|
||||||
|
|
||||||
# Parse alarm time
|
|
||||||
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
|
|
||||||
|
|
||||||
# Determine the next trigger
|
|
||||||
if alarm_config['repeat_rule']['type'] == 'once':
|
|
||||||
# For one-time alarm, check the specific date
|
|
||||||
try:
|
|
||||||
specific_date = datetime.strptime(alarm_config['repeat_rule']['at'], "%d.%m.%Y")
|
|
||||||
return datetime.combine(specific_date.date(), alarm_time)
|
|
||||||
except (KeyError, ValueError):
|
|
||||||
logger.error("Invalid one-time alarm configuration")
|
|
||||||
return None
|
|
||||||
|
|
||||||
elif alarm_config['repeat_rule']['type'] == 'daily':
|
|
||||||
# Daily alarm - trigger today or tomorrow
|
|
||||||
next_trigger = datetime.combine(now.date(), alarm_time)
|
|
||||||
if current_time < alarm_time:
|
|
||||||
return next_trigger
|
|
||||||
return next_trigger + timedelta(days=1)
|
|
||||||
|
|
||||||
elif alarm_config['repeat_rule']['type'] == 'weekly':
|
|
||||||
# Weekly alarm - check configured days
|
|
||||||
today = now.strftime("%A").lower()
|
|
||||||
configured_days = [day.lower() for day in alarm_config['repeat_rule'].get('days_of_week', [])]
|
|
||||||
|
|
||||||
if today in configured_days and current_time < alarm_time:
|
|
||||||
return datetime.combine(now.date(), alarm_time)
|
|
||||||
|
|
||||||
# Find next configured day
|
|
||||||
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
|
|
||||||
current_index = days_order.index(today)
|
|
||||||
|
|
||||||
for offset in range(1, 8):
|
|
||||||
next_day_index = (current_index + offset) % 7
|
|
||||||
next_day = days_order[next_day_index]
|
|
||||||
|
|
||||||
if next_day in configured_days:
|
|
||||||
next_date = now.date() + timedelta(days=offset)
|
|
||||||
return datetime.combine(next_date, alarm_time)
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _play_audio(self, file_path: str, volume: int = 100):
|
|
||||||
"""Play audio file using mpg123"""
|
|
||||||
try:
|
|
||||||
# Ensure the file exists
|
|
||||||
if not os.path.exists(file_path):
|
|
||||||
logger.error(f"Audio file not found: {file_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Construct mpg123 command with volume control
|
|
||||||
volume_adjust = f"-g {volume}"
|
|
||||||
cmd = ["mpg123", volume_adjust, file_path]
|
|
||||||
|
|
||||||
logger.info(f"Playing alarm: {file_path}")
|
|
||||||
|
|
||||||
# Track the process for potential interruption
|
|
||||||
process = subprocess.Popen(cmd)
|
|
||||||
return process
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error playing audio: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _playback_worker(self):
|
|
||||||
"""Background thread for managing alarm playback"""
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Check for new alarms to schedule
|
|
||||||
try:
|
|
||||||
new_alarm = self.alarm_queue.get(timeout=1)
|
|
||||||
alarm_time = self._calculate_next_alarm_time(new_alarm)
|
|
||||||
if alarm_time:
|
|
||||||
self.active_alarms[new_alarm.get('id', id(new_alarm))] = {
|
|
||||||
'config': new_alarm,
|
|
||||||
'trigger_time': alarm_time,
|
|
||||||
'snooze_count': 0
|
|
||||||
}
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Check for control signals (snooze/dismiss)
|
|
||||||
try:
|
|
||||||
control_msg = self.control_queue.get(timeout=0.1)
|
|
||||||
# Handle control message logic
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Check for alarms to trigger
|
|
||||||
now = datetime.now()
|
|
||||||
for alarm_id, alarm_info in list(self.active_alarms.items()):
|
|
||||||
if now >= alarm_info['trigger_time']:
|
|
||||||
# Trigger alarm
|
|
||||||
process = self._play_audio(
|
|
||||||
alarm_info['config']['file_to_play'],
|
|
||||||
alarm_info['config'].get('metadata', {}).get('volume', 100)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Handle repeat and snooze logic
|
|
||||||
if process:
|
|
||||||
# Wait for user interaction or timeout
|
|
||||||
# In a real implementation, this would be more sophisticated
|
|
||||||
time.sleep(30) # Placeholder for user interaction
|
|
||||||
|
|
||||||
# Determine next trigger based on repeat rule
|
|
||||||
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
|
|
||||||
if next_trigger:
|
|
||||||
alarm_info['trigger_time'] = next_trigger
|
|
||||||
else:
|
|
||||||
del self.active_alarms[alarm_id]
|
|
||||||
|
|
||||||
time.sleep(1) # Prevent tight loop
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in playback worker: {e}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
def snooze_alarm(self, alarm_id: int):
|
|
||||||
"""Snooze a specific alarm"""
|
|
||||||
if alarm_id in self.active_alarms:
|
|
||||||
alarm_config = self.active_alarms[alarm_id]['config']
|
|
||||||
snooze_config = alarm_config.get('snooze', {'enabled': True, 'duration': 10, 'max_count': 3})
|
|
||||||
|
|
||||||
if (snooze_config['enabled'] and
|
|
||||||
self.active_alarms[alarm_id]['snooze_count'] < snooze_config['max_count']):
|
|
||||||
|
|
||||||
# Increment snooze count
|
|
||||||
self.active_alarms[alarm_id]['snooze_count'] += 1
|
|
||||||
|
|
||||||
# Set next trigger time
|
|
||||||
snooze_duration = snooze_config.get('duration', 10)
|
|
||||||
self.active_alarms[alarm_id]['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
|
|
||||||
|
|
||||||
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes")
|
|
||||||
else:
|
|
||||||
logger.warning(f"Cannot snooze alarm {alarm_id} - max snooze count reached")
|
|
||||||
|
|
||||||
def dismiss_alarm(self, alarm_id: int):
|
|
||||||
"""Dismiss a specific alarm"""
|
|
||||||
if alarm_id in self.active_alarms:
|
|
||||||
logger.info(f"Dismissed alarm {alarm_id}")
|
|
||||||
del self.active_alarms[alarm_id]
|
|
||||||
|
|
||||||
def main():
|
|
||||||
"""Example usage of AlarmSiren"""
|
|
||||||
siren = AlarmSiren()
|
|
||||||
|
|
||||||
# Example alarm configuration
|
|
||||||
sample_alarm = {
|
|
||||||
'id': 1,
|
|
||||||
'name': 'Morning Alarm',
|
|
||||||
'time': '07:00:00',
|
|
||||||
'file_to_play': '/path/to/alarm.mp3',
|
|
||||||
'repeat_rule': {
|
|
||||||
'type': 'daily'
|
|
||||||
},
|
|
||||||
'snooze': {
|
|
||||||
'enabled': True,
|
|
||||||
'duration': 10,
|
|
||||||
'max_count': 3
|
|
||||||
},
|
|
||||||
'metadata': {
|
|
||||||
'volume': 80
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
siren.schedule_alarm(sample_alarm)
|
|
||||||
|
|
||||||
# Keep main thread alive (in a real app, this would be handled differently)
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
time.sleep(1)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("Alarm siren stopped")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,230 +0,0 @@
|
|||||||
import curses
|
|
||||||
import time
|
|
||||||
from datetime import datetime, date, timedelta
|
|
||||||
import os
|
|
||||||
from big_digits import BIG_DIGITS
|
|
||||||
from ncurses_ui_draw import _draw_big_digit, _draw_main_clock, _draw_add_alarm, _draw_list_alarms, _draw_error
|
|
||||||
|
|
||||||
class UI:
|
|
||||||
def __init__(self, alarm_system_manager):
|
|
||||||
"""
|
|
||||||
Initialize the ncurses UI for the alarm system
|
|
||||||
|
|
||||||
Args:
|
|
||||||
alarm_system_manager (AlarmSystemManager): The main alarm system manager
|
|
||||||
"""
|
|
||||||
self.alarm_system = alarm_system_manager
|
|
||||||
self.stop_event = alarm_system_manager.stop_event
|
|
||||||
self.storage = alarm_system_manager.storage
|
|
||||||
|
|
||||||
# UI state variables
|
|
||||||
self.selected_menu = 0
|
|
||||||
self.new_alarm_name = "Alarm"
|
|
||||||
self.new_alarm_hour = datetime.now().hour
|
|
||||||
self.new_alarm_minute = datetime.now().minute
|
|
||||||
self.new_alarm_selected = 0
|
|
||||||
self.new_alarm_date = None
|
|
||||||
self.new_alarm_weekdays = []
|
|
||||||
self.new_alarm_enabled = True
|
|
||||||
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
|
||||||
self.alarms = []
|
|
||||||
self.error_message = None
|
|
||||||
self.error_timestamp = None
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
"""
|
|
||||||
Start the ncurses UI
|
|
||||||
"""
|
|
||||||
def ui_thread():
|
|
||||||
try:
|
|
||||||
curses.wrapper(self._main_loop)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error in UI thread: {e}")
|
|
||||||
finally:
|
|
||||||
self.stop_event.set()
|
|
||||||
|
|
||||||
import threading
|
|
||||||
ui_thread_obj = threading.Thread(target=ui_thread, daemon=True)
|
|
||||||
ui_thread_obj.start()
|
|
||||||
return ui_thread_obj
|
|
||||||
|
|
||||||
def _handle_add_alarm_input(self, key):
|
|
||||||
"""
|
|
||||||
Handle input for adding a new alarm
|
|
||||||
"""
|
|
||||||
if key == 27: # Escape
|
|
||||||
self.selected_menu = 0
|
|
||||||
return
|
|
||||||
|
|
||||||
if key == 10: # Enter
|
|
||||||
try:
|
|
||||||
# Prepare alarm data
|
|
||||||
alarm_data = {
|
|
||||||
"name": self.new_alarm_name,
|
|
||||||
"time": f"{self.new_alarm_hour:02d}:{self.new_alarm_minute:02d}:00",
|
|
||||||
"enabled": self.new_alarm_enabled
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add repeat rule if applicable
|
|
||||||
if self.new_alarm_weekdays:
|
|
||||||
alarm_data["repeat_rule"] = {
|
|
||||||
"type": "weekly",
|
|
||||||
"days": self.new_alarm_weekdays
|
|
||||||
}
|
|
||||||
elif self.new_alarm_date:
|
|
||||||
alarm_data["repeat_rule"] = {
|
|
||||||
"type": "once",
|
|
||||||
"date": self.new_alarm_date.strftime("%Y-%m-%d")
|
|
||||||
}
|
|
||||||
|
|
||||||
# Save new alarm
|
|
||||||
self.storage.save_new_alert(alarm_data)
|
|
||||||
|
|
||||||
# Reset form
|
|
||||||
self.selected_menu = 0
|
|
||||||
self.new_alarm_hour = datetime.now().hour
|
|
||||||
self.new_alarm_minute = datetime.now().minute
|
|
||||||
self.new_alarm_date = None
|
|
||||||
self.new_alarm_weekdays = []
|
|
||||||
self.new_alarm_name = "Alarm"
|
|
||||||
self.new_alarm_enabled = True
|
|
||||||
except Exception as e:
|
|
||||||
# TODO: Show error on screen
|
|
||||||
print(f"Failed to save alarm: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
if key == curses.KEY_LEFT:
|
|
||||||
self.new_alarm_selected = (self.new_alarm_selected - 1) % 6
|
|
||||||
elif key == curses.KEY_RIGHT:
|
|
||||||
self.new_alarm_selected = (self.new_alarm_selected + 1) % 6
|
|
||||||
elif key == 32: # Space
|
|
||||||
if self.new_alarm_selected == 2: # Date
|
|
||||||
self.new_alarm_date = None
|
|
||||||
elif self.new_alarm_selected == 3: # Weekdays
|
|
||||||
current_day = len(self.new_alarm_weekdays)
|
|
||||||
if current_day < 7:
|
|
||||||
if current_day in self.new_alarm_weekdays:
|
|
||||||
self.new_alarm_weekdays.remove(current_day)
|
|
||||||
else:
|
|
||||||
self.new_alarm_weekdays.append(current_day)
|
|
||||||
self.new_alarm_weekdays.sort()
|
|
||||||
elif self.new_alarm_selected == 5: # Enabled toggle
|
|
||||||
self.new_alarm_enabled = not self.new_alarm_enabled
|
|
||||||
|
|
||||||
elif key == curses.KEY_UP:
|
|
||||||
if self.new_alarm_selected == 0:
|
|
||||||
self.new_alarm_hour = (self.new_alarm_hour + 1) % 24
|
|
||||||
elif self.new_alarm_selected == 1:
|
|
||||||
self.new_alarm_minute = (self.new_alarm_minute + 1) % 60
|
|
||||||
elif self.new_alarm_selected == 2:
|
|
||||||
if not self.new_alarm_date:
|
|
||||||
self.new_alarm_date = date.today()
|
|
||||||
else:
|
|
||||||
self.new_alarm_date += timedelta(days=1)
|
|
||||||
elif self.new_alarm_selected == 4: # Name
|
|
||||||
self.new_alarm_name += " "
|
|
||||||
|
|
||||||
elif key == curses.KEY_DOWN:
|
|
||||||
if self.new_alarm_selected == 0:
|
|
||||||
self.new_alarm_hour = (self.new_alarm_hour - 1) % 24
|
|
||||||
elif self.new_alarm_selected == 1:
|
|
||||||
self.new_alarm_minute = (self.new_alarm_minute - 1) % 60
|
|
||||||
elif self.new_alarm_selected == 2 and self.new_alarm_date:
|
|
||||||
self.new_alarm_date -= timedelta(days=1)
|
|
||||||
elif self.new_alarm_selected == 4 and len(self.new_alarm_name) > 1:
|
|
||||||
self.new_alarm_name = self.new_alarm_name.rstrip()[:-1]
|
|
||||||
|
|
||||||
def _handle_list_alarms_input(self, key):
|
|
||||||
"""
|
|
||||||
Handle input for the list alarms screen
|
|
||||||
"""
|
|
||||||
if key == 27: # Escape
|
|
||||||
self.selected_menu = 0
|
|
||||||
elif key == ord('d'):
|
|
||||||
# Delete last alarm if exists
|
|
||||||
if self.alarms:
|
|
||||||
last_alarm = self.alarms[-1]
|
|
||||||
try:
|
|
||||||
self.storage.remove_saved_alert(last_alarm['id'])
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Failed to delete alarm: {e}")
|
|
||||||
|
|
||||||
def _show_error(self, message, duration=3):
|
|
||||||
"""Display an error message for a specified duration"""
|
|
||||||
self.error_message = message
|
|
||||||
self.error_timestamp = time.time()
|
|
||||||
|
|
||||||
def _clear_error_if_expired(self):
|
|
||||||
"""Clear error message if it has been displayed long enough"""
|
|
||||||
if self.error_message and self.error_timestamp:
|
|
||||||
if time.time() - self.error_timestamp > 3: # 3 seconds
|
|
||||||
self.error_message = None
|
|
||||||
self.error_timestamp = None
|
|
||||||
|
|
||||||
def _main_loop(self, stdscr):
|
|
||||||
"""
|
|
||||||
Main ncurses event loop
|
|
||||||
"""
|
|
||||||
# Initialize color pairs
|
|
||||||
curses.start_color()
|
|
||||||
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
|
||||||
curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
|
||||||
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK)
|
|
||||||
curses.curs_set(0)
|
|
||||||
|
|
||||||
# Configure screen
|
|
||||||
stdscr.keypad(1)
|
|
||||||
stdscr.timeout(100)
|
|
||||||
|
|
||||||
while not self.stop_event.is_set():
|
|
||||||
# Clear the screen
|
|
||||||
stdscr.clear()
|
|
||||||
|
|
||||||
# Draw appropriate screen based on selected menu
|
|
||||||
if self.selected_menu == 0:
|
|
||||||
_draw_main_clock(stdscr)
|
|
||||||
elif self.selected_menu == 1:
|
|
||||||
_draw_add_alarm(stdscr, {
|
|
||||||
'new_alarm_selected': self.new_alarm_selected,
|
|
||||||
'new_alarm_name': self.new_alarm_name,
|
|
||||||
'new_alarm_hour': self.new_alarm_hour,
|
|
||||||
'new_alarm_minute': self.new_alarm_minute,
|
|
||||||
'new_alarm_enabled': self.new_alarm_enabled,
|
|
||||||
'new_alarm_date': self.new_alarm_date,
|
|
||||||
'new_alarm_weekdays': self.new_alarm_weekdays,
|
|
||||||
'weekday_names': self.weekday_names,
|
|
||||||
'new_alarm_snooze_enabled': getattr(self, 'new_alarm_snooze_enabled', False),
|
|
||||||
'new_alarm_snooze_duration': getattr(self, 'new_alarm_snooze_duration', 5),
|
|
||||||
'new_alarm_snooze_max_count': getattr(self, 'new_alarm_snooze_max_count', 3)
|
|
||||||
})
|
|
||||||
elif self.selected_menu == 2:
|
|
||||||
_draw_list_alarms(stdscr, {
|
|
||||||
'alarms': self.alarms,
|
|
||||||
'weekday_names': self.weekday_names
|
|
||||||
})
|
|
||||||
|
|
||||||
# Refresh the screen
|
|
||||||
stdscr.refresh()
|
|
||||||
|
|
||||||
# Small sleep to reduce CPU usage
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
# Handle input
|
|
||||||
key = stdscr.getch()
|
|
||||||
if key != -1:
|
|
||||||
# Menu navigation and input handling
|
|
||||||
if key == ord('q') or key == 27: # 'q' or Escape
|
|
||||||
break
|
|
||||||
elif key == ord('c'): # Clock/Home screen
|
|
||||||
self.selected_menu = 0
|
|
||||||
elif key == ord('a'): # Add Alarm
|
|
||||||
self.selected_menu = 1
|
|
||||||
elif key == ord('l'): # List Alarms
|
|
||||||
self.selected_menu = 2
|
|
||||||
self.alarms = self.storage.get_saved_alerts()
|
|
||||||
|
|
||||||
# Context-specific input handling
|
|
||||||
if self.selected_menu == 1:
|
|
||||||
self._handle_add_alarm_input(key)
|
|
||||||
elif self.selected_menu == 2:
|
|
||||||
self._handle_list_alarms_input(key)
|
|
@ -1,184 +0,0 @@
|
|||||||
from datetime import datetime
|
|
||||||
import curses
|
|
||||||
from big_digits import BIG_DIGITS
|
|
||||||
|
|
||||||
def _draw_error(stdscr, error_message):
|
|
||||||
"""Draw error message if present"""
|
|
||||||
if error_message:
|
|
||||||
height, width = stdscr.getmaxyx()
|
|
||||||
error_x = width // 2 - len(error_message) // 2
|
|
||||||
error_y = height - 4 # Show near bottom of screen
|
|
||||||
|
|
||||||
# Red color for errors
|
|
||||||
stdscr.attron(curses.color_pair(3))
|
|
||||||
stdscr.addstr(error_y, error_x, error_message)
|
|
||||||
stdscr.attroff(curses.color_pair(3))
|
|
||||||
|
|
||||||
def _draw_big_digit(stdscr, y, x, digit, big_digits):
|
|
||||||
"""
|
|
||||||
Draw a big digit using predefined patterns
|
|
||||||
"""
|
|
||||||
patterns = big_digits[digit]
|
|
||||||
for i, line in enumerate(patterns):
|
|
||||||
stdscr.addstr(y + i, x, line)
|
|
||||||
|
|
||||||
def _draw_big_time(stdscr, big_digits):
|
|
||||||
"""
|
|
||||||
Draw the time in big digits
|
|
||||||
"""
|
|
||||||
current_time = datetime.now()
|
|
||||||
time_str = current_time.strftime("%H:%M:%S")
|
|
||||||
|
|
||||||
# Get terminal dimensions
|
|
||||||
height, width = stdscr.getmaxyx()
|
|
||||||
|
|
||||||
# Calculate starting position to center the big clock
|
|
||||||
digit_width = 14 # Width of each digit pattern including spacing
|
|
||||||
total_width = digit_width * len(time_str)
|
|
||||||
start_x = (width - total_width) // 2
|
|
||||||
start_y = (height - 7) // 2 - 4 # Move up a bit to make room for date
|
|
||||||
|
|
||||||
# Color for the big time
|
|
||||||
stdscr.attron(curses.color_pair(1))
|
|
||||||
for i, digit in enumerate(time_str):
|
|
||||||
_draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit, big_digits)
|
|
||||||
stdscr.attroff(curses.color_pair(1))
|
|
||||||
|
|
||||||
def _draw_main_clock(stdscr):
|
|
||||||
"""
|
|
||||||
Draw the main clock screen
|
|
||||||
"""
|
|
||||||
current_time = datetime.now()
|
|
||||||
time_str = current_time.strftime("%H:%M:%S")
|
|
||||||
date_str = current_time.strftime("%Y-%m-%d")
|
|
||||||
|
|
||||||
# Get terminal dimensions
|
|
||||||
height, width = stdscr.getmaxyx()
|
|
||||||
|
|
||||||
# Draw big time
|
|
||||||
# Note: You'll need to pass BIG_DIGITS from big_digits module when calling
|
|
||||||
_draw_big_time(stdscr, BIG_DIGITS)
|
|
||||||
|
|
||||||
# Draw date
|
|
||||||
date_x = width // 2 - len(date_str) // 2
|
|
||||||
date_y = height // 2 + 4 # Below the big clock
|
|
||||||
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(date_y, date_x, date_str)
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Draw menu options
|
|
||||||
menu_str = "A: Add Alarm L: List Alarms Q: Quit"
|
|
||||||
menu_x = width // 2 - len(menu_str) // 2
|
|
||||||
stdscr.addstr(height - 2, menu_x, menu_str)
|
|
||||||
|
|
||||||
def _draw_add_alarm(stdscr, context):
|
|
||||||
"""
|
|
||||||
Draw the add alarm screen
|
|
||||||
|
|
||||||
Args:
|
|
||||||
stdscr: The curses screen object
|
|
||||||
context: A dictionary containing UI state variables
|
|
||||||
"""
|
|
||||||
height, width = stdscr.getmaxyx()
|
|
||||||
|
|
||||||
form_y = height // 2 - 3
|
|
||||||
stdscr.addstr(form_y, width // 2 - 10, "Add New Alarm")
|
|
||||||
|
|
||||||
# Name input
|
|
||||||
if context['new_alarm_selected'] == 4:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 1, width // 2 - 10, f"Name: {context['new_alarm_name']}")
|
|
||||||
if context['new_alarm_selected'] == 4:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Time selection
|
|
||||||
hour_str = f"{context['new_alarm_hour']:02d}"
|
|
||||||
minute_str = f"{context['new_alarm_minute']:02d}"
|
|
||||||
|
|
||||||
# Highlight selected field
|
|
||||||
if context['new_alarm_selected'] == 0:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 2, width // 2 - 2, hour_str)
|
|
||||||
if context['new_alarm_selected'] == 0:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
stdscr.addstr(form_y + 2, width // 2, ":")
|
|
||||||
|
|
||||||
if context['new_alarm_selected'] == 1:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 2, width // 2 + 1, minute_str)
|
|
||||||
if context['new_alarm_selected'] == 1:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Enabled/Disabled toggle
|
|
||||||
enabled_str = "Enabled" if context['new_alarm_enabled'] else "Disabled"
|
|
||||||
if context['new_alarm_selected'] == 5:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 4, width // 2 - len(enabled_str)//2, enabled_str)
|
|
||||||
if context['new_alarm_selected'] == 5:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Date selection
|
|
||||||
date_str = "No specific date" if not context['new_alarm_date'] else context['new_alarm_date'].strftime("%Y-%m-%d")
|
|
||||||
if context['new_alarm_selected'] == 2:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 3, width // 2 - len(date_str) // 2, date_str)
|
|
||||||
if context['new_alarm_selected'] == 2:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Weekday selection
|
|
||||||
weekday_str = "Repeat: " + " ".join(
|
|
||||||
context['weekday_names'][i] if i in context['new_alarm_weekdays'] else "___"
|
|
||||||
for i in range(7)
|
|
||||||
)
|
|
||||||
if context['new_alarm_selected'] == 3:
|
|
||||||
stdscr.attron(curses.color_pair(2))
|
|
||||||
stdscr.addstr(form_y + 5, width // 2 - len(weekday_str) // 2, weekday_str)
|
|
||||||
if context['new_alarm_selected'] == 3:
|
|
||||||
stdscr.attroff(curses.color_pair(2))
|
|
||||||
|
|
||||||
# Instructions
|
|
||||||
stdscr.addstr(height - 2, 2,
|
|
||||||
"↑↓: Change ←→: Switch Space: Toggle Enter: Save Esc: Cancel")
|
|
||||||
|
|
||||||
def _draw_list_alarms(stdscr, context):
|
|
||||||
"""
|
|
||||||
Draw the list of alarms screen
|
|
||||||
|
|
||||||
Args:
|
|
||||||
stdscr: The curses screen object
|
|
||||||
context: A dictionary containing UI state variables
|
|
||||||
"""
|
|
||||||
height, width = stdscr.getmaxyx()
|
|
||||||
|
|
||||||
# Header
|
|
||||||
stdscr.addstr(2, width // 2 - 5, "Alarms")
|
|
||||||
|
|
||||||
if not context['alarms']:
|
|
||||||
stdscr.addstr(4, width // 2 - 10, "No alarms set")
|
|
||||||
else:
|
|
||||||
for i, alarm in enumerate(context['alarms'][:height-6]):
|
|
||||||
# Format time and repeat information
|
|
||||||
time_str = alarm.get('time', 'Unknown')
|
|
||||||
|
|
||||||
# Format repeat info
|
|
||||||
repeat_info = ""
|
|
||||||
repeat_rule = alarm.get('repeat_rule', {})
|
|
||||||
if repeat_rule:
|
|
||||||
if repeat_rule.get('type') == 'weekly':
|
|
||||||
days = repeat_rule.get('days', [])
|
|
||||||
repeat_info = f" (Every {', '.join(context['weekday_names'][d] for d in days)})"
|
|
||||||
elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'):
|
|
||||||
repeat_info = f" (On {repeat_rule['date']})"
|
|
||||||
|
|
||||||
# Status indicator
|
|
||||||
status = "✓" if alarm.get('enabled', True) else "✗"
|
|
||||||
display_str = f"{status} {time_str}{repeat_info}"
|
|
||||||
|
|
||||||
# Truncate if too long
|
|
||||||
display_str = display_str[:width-4]
|
|
||||||
|
|
||||||
stdscr.addstr(4 + i, 2, display_str)
|
|
||||||
|
|
||||||
stdscr.addstr(height - 2, 2, "D: Delete Enter: Edit Esc: Back")
|
|
@ -1,173 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
API_URL="http://localhost:8000"
|
|
||||||
TEST_ALARM_NAME="Test Alarm"
|
|
||||||
TEST_ALARM_TIME="08:30:00"
|
|
||||||
TEST_AUDIO_FILE="/tmp/test.mp3"
|
|
||||||
|
|
||||||
# Color codes for output
|
|
||||||
GREEN='\033[0;32m'
|
|
||||||
RED='\033[0;31m'
|
|
||||||
BLUE='\033[0;34m'
|
|
||||||
NC='\033[0m' # No Color
|
|
||||||
|
|
||||||
# Helper function to print status messages
|
|
||||||
print_status() {
|
|
||||||
echo -e "${BLUE}=== $1 ===${NC}"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Helper function to check test results
|
|
||||||
check_result() {
|
|
||||||
if [ $? -eq 0 ]; then
|
|
||||||
echo -e "${GREEN}✓ $1${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ $1${NC}"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
# 1. Test server connectivity
|
|
||||||
print_status "Testing server connectivity"
|
|
||||||
curl -s "$API_URL" > /dev/null
|
|
||||||
check_result "Server connectivity check"
|
|
||||||
|
|
||||||
# 2. Get initial state
|
|
||||||
print_status "Getting initial state"
|
|
||||||
INITIAL_STATE=$(curl -s -X GET "$API_URL")
|
|
||||||
echo "$INITIAL_STATE" | jq .
|
|
||||||
check_result "Retrieved initial state"
|
|
||||||
|
|
||||||
# 3. Add a new alarm (POST)
|
|
||||||
print_status "Adding new alarm"
|
|
||||||
POST_DATA=$(jq -n \
|
|
||||||
--arg name "$TEST_ALARM_NAME" \
|
|
||||||
--arg time "$TEST_ALARM_TIME" \
|
|
||||||
'{
|
|
||||||
"name": $name,
|
|
||||||
"time": $time,
|
|
||||||
"repeat_rule": {
|
|
||||||
"type": "weekly",
|
|
||||||
"days_of_week": ["monday", "wednesday", "friday"]
|
|
||||||
},
|
|
||||||
"enabled": true,
|
|
||||||
"snooze": {
|
|
||||||
"enabled": true,
|
|
||||||
"duration": 5,
|
|
||||||
"max_count": 3
|
|
||||||
},
|
|
||||||
"metadata": {
|
|
||||||
"volume": 75,
|
|
||||||
"notes": "Test alarm with full configuration"
|
|
||||||
}
|
|
||||||
}'
|
|
||||||
)
|
|
||||||
|
|
||||||
ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
|
|
||||||
NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id')
|
|
||||||
|
|
||||||
if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then
|
|
||||||
echo -e "${RED}Failed to add alarm${NC}"
|
|
||||||
echo "Response: $ADD_RESPONSE"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}"
|
|
||||||
|
|
||||||
# 4. Test duplicate alarm detection
|
|
||||||
print_status "Testing duplicate alarm detection"
|
|
||||||
DUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
|
|
||||||
if [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then
|
|
||||||
echo -e "${GREEN}✓ Duplicate detection working${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Duplicate detection failed${NC}"
|
|
||||||
echo "Response: $DUPLICATE_RESPONSE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 5. Update the alarm (PUT)
|
|
||||||
print_status "Updating alarm"
|
|
||||||
UPDATE_DATA=$(jq -n \
|
|
||||||
--arg name "$TEST_ALARM_NAME Updated" \
|
|
||||||
--arg time "$TEST_ALARM_TIME" \
|
|
||||||
--argjson id "$NEW_ALARM_ID" \
|
|
||||||
'{
|
|
||||||
"id": $id,
|
|
||||||
"name": $name,
|
|
||||||
"time": $time,
|
|
||||||
"repeat_rule": {
|
|
||||||
"type": "daily"
|
|
||||||
},
|
|
||||||
"enabled": true,
|
|
||||||
"snooze": {
|
|
||||||
"enabled": false,
|
|
||||||
"duration": 10,
|
|
||||||
"max_count": 2
|
|
||||||
},
|
|
||||||
"metadata": {
|
|
||||||
"volume": 90,
|
|
||||||
"notes": "Updated test alarm"
|
|
||||||
}
|
|
||||||
}'
|
|
||||||
)
|
|
||||||
|
|
||||||
UPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL")
|
|
||||||
if [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then
|
|
||||||
echo -e "${GREEN}✓ Alarm update successful${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Alarm update failed${NC}"
|
|
||||||
echo "Response: $UPDATE_RESPONSE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 6. Verify the update
|
|
||||||
print_status "Verifying update"
|
|
||||||
UPDATED_STATE=$(curl -s -X GET "$API_URL")
|
|
||||||
UPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)")
|
|
||||||
if [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then
|
|
||||||
echo -e "${GREEN}✓ Update verification successful${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Update verification failed${NC}"
|
|
||||||
echo "Current alarm state: $UPDATED_ALARM"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 7. Test invalid inputs
|
|
||||||
print_status "Testing invalid inputs"
|
|
||||||
|
|
||||||
# Test invalid time format
|
|
||||||
INVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}')
|
|
||||||
INVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL")
|
|
||||||
if [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
|
|
||||||
echo -e "${GREEN}✓ Invalid time format detection working${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Invalid time format detection failed${NC}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Test invalid repeat rule
|
|
||||||
INVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"')
|
|
||||||
INVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL")
|
|
||||||
if [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
|
|
||||||
echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Invalid repeat rule detection failed${NC}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 8. Delete the test alarm
|
|
||||||
print_status "Deleting test alarm"
|
|
||||||
DELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL")
|
|
||||||
if [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then
|
|
||||||
echo -e "${GREEN}✓ Alarm deletion successful${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Alarm deletion failed${NC}"
|
|
||||||
echo "Response: $DELETE_RESPONSE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 9. Verify deletion
|
|
||||||
print_status "Verifying deletion"
|
|
||||||
FINAL_STATE=$(curl -s -X GET "$API_URL")
|
|
||||||
if [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then
|
|
||||||
echo -e "${GREEN}✓ Deletion verification successful${NC}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}✗ Deletion verification failed${NC}"
|
|
||||||
echo "Current state: $FINAL_STATE"
|
|
||||||
fi
|
|
||||||
|
|
||||||
print_status "Test suite completed!"
|
|
@ -134,17 +134,6 @@ class AlertApi(BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
|
|
||||||
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
|
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
|
||||||
# Set up logging configuration
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(), # Console handler
|
|
||||||
logging.FileHandler('alert_api.log') # File handler
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger('AlertApi')
|
|
||||||
logger.info(f"Starting AlertApi on port {port}")
|
logger.info(f"Starting AlertApi on port {port}")
|
||||||
|
|
||||||
server_address = ("", port)
|
server_address = ("", port)
|
260
clock/alarm_siren.py
Normal file
260
clock/alarm_siren.py
Normal file
@ -0,0 +1,260 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import subprocess
|
||||||
|
import queue
|
||||||
|
import logging
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Optional, Dict, Any
|
||||||
|
|
||||||
|
from logging_config import setup_logging
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logger = setup_logging()
|
||||||
|
|
||||||
|
class AlarmSiren:
|
||||||
|
def __init__(self):
|
||||||
|
# Communication queues
|
||||||
|
self.alarm_queue = queue.Queue()
|
||||||
|
self.control_queue = queue.Queue()
|
||||||
|
|
||||||
|
# Tracking active alarms
|
||||||
|
self.active_alarms: Dict[int, Dict[str, Any]] = {}
|
||||||
|
|
||||||
|
# Playback thread
|
||||||
|
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
|
||||||
|
self.playback_thread.start()
|
||||||
|
|
||||||
|
def schedule_alarm(self, alarm_config: Dict[str, Any]):
|
||||||
|
"""Schedule an alarm based on its configuration"""
|
||||||
|
logger.info(f"Scheduling alarm: {alarm_config}")
|
||||||
|
self.alarm_queue.put(alarm_config)
|
||||||
|
|
||||||
|
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
|
||||||
|
"""Calculate the next alarm trigger time based on repeat rule"""
|
||||||
|
now = datetime.now()
|
||||||
|
current_time = now.time()
|
||||||
|
|
||||||
|
# Parse alarm time
|
||||||
|
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
|
||||||
|
|
||||||
|
# Determine the next trigger
|
||||||
|
repeat_rule = alarm_config['repeat_rule']
|
||||||
|
|
||||||
|
# Determine repeat rule type and details
|
||||||
|
if isinstance(repeat_rule, dict):
|
||||||
|
repeat_type = repeat_rule.get('type')
|
||||||
|
repeat_days = repeat_rule.get('days_of_week', [])
|
||||||
|
repeat_at = repeat_rule.get('at')
|
||||||
|
else:
|
||||||
|
# Assume it's an object-like structure with attributes
|
||||||
|
repeat_type = getattr(repeat_rule, 'type', None)
|
||||||
|
repeat_days = getattr(repeat_rule, 'days_of_week', [])
|
||||||
|
repeat_at = getattr(repeat_rule, 'at', None)
|
||||||
|
|
||||||
|
# Sanity check
|
||||||
|
if repeat_type is None:
|
||||||
|
logger.error(f"Invalid repeat rule configuration: {repeat_rule}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
if repeat_type == 'once':
|
||||||
|
# For one-time alarm, check the specific date
|
||||||
|
try:
|
||||||
|
# If 'at' is None, use current date
|
||||||
|
if repeat_at is None:
|
||||||
|
specific_date = now.date()
|
||||||
|
else:
|
||||||
|
specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date()
|
||||||
|
return datetime.combine(specific_date, alarm_time)
|
||||||
|
except ValueError:
|
||||||
|
logger.error("Invalid one-time alarm configuration")
|
||||||
|
return None
|
||||||
|
|
||||||
|
elif repeat_type == 'daily':
|
||||||
|
# Daily alarm - trigger today or tomorrow
|
||||||
|
next_trigger = datetime.combine(now.date(), alarm_time)
|
||||||
|
if current_time < alarm_time:
|
||||||
|
return next_trigger
|
||||||
|
return next_trigger + timedelta(days=1)
|
||||||
|
|
||||||
|
elif repeat_type == 'weekly':
|
||||||
|
# Weekly alarm - check configured days
|
||||||
|
today = now.strftime("%A").lower()
|
||||||
|
configured_days = [day.lower() for day in repeat_days]
|
||||||
|
|
||||||
|
if today in configured_days and current_time < alarm_time:
|
||||||
|
return datetime.combine(now.date(), alarm_time)
|
||||||
|
|
||||||
|
# Find next configured day
|
||||||
|
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
|
||||||
|
current_index = days_order.index(today)
|
||||||
|
|
||||||
|
for offset in range(1, 8):
|
||||||
|
next_day_index = (current_index + offset) % 7
|
||||||
|
next_day = days_order[next_day_index]
|
||||||
|
|
||||||
|
if next_day in configured_days:
|
||||||
|
next_date = now.date() + timedelta(days=offset)
|
||||||
|
return datetime.combine(next_date, alarm_time)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _play_audio(self, file_path: str, volume: int = 100):
|
||||||
|
"""Play audio file using mpg123 in the background."""
|
||||||
|
try:
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
logger.error(f"Audio file not found: {file_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Construct mpg123 command with volume control
|
||||||
|
volume_adjust = f"-g {volume}"
|
||||||
|
cmd = ["mpg123", volume_adjust, file_path]
|
||||||
|
|
||||||
|
logger.info(f"Playing alarm: {file_path}")
|
||||||
|
|
||||||
|
# Run mpg123 in the background, suppressing stdout/stderr
|
||||||
|
process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||||
|
return process
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error playing audio: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _playback_worker(self):
|
||||||
|
"""Background thread for managing alarm playback"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Check for new alarms to schedule
|
||||||
|
try:
|
||||||
|
new_alarm = self.alarm_queue.get(timeout=1)
|
||||||
|
alarm_time = self._calculate_next_alarm_time(new_alarm)
|
||||||
|
if alarm_time:
|
||||||
|
alarm_id = new_alarm.get('id', id(new_alarm))
|
||||||
|
self.active_alarms[alarm_id] = {
|
||||||
|
'config': new_alarm,
|
||||||
|
'trigger_time': alarm_time,
|
||||||
|
'snooze_count': 0,
|
||||||
|
'process': None
|
||||||
|
}
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Check for control signals (snooze/dismiss)
|
||||||
|
try:
|
||||||
|
control_msg = self.control_queue.get(timeout=0.1)
|
||||||
|
alarm_id = control_msg.get('alarm_id')
|
||||||
|
if control_msg['type'] == 'snooze':
|
||||||
|
self.snooze_alarm(alarm_id)
|
||||||
|
elif control_msg['type'] == 'dismiss':
|
||||||
|
self.dismiss_alarm(alarm_id)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Check for alarms to trigger
|
||||||
|
now = datetime.now()
|
||||||
|
for alarm_id, alarm_info in list(self.active_alarms.items()):
|
||||||
|
# Check if alarm is more than 1 hour late
|
||||||
|
if now > alarm_info['trigger_time'] + timedelta(hours=1):
|
||||||
|
logger.warning(f"Alarm {alarm_id} is over 1 hour late. Disabling.")
|
||||||
|
del self.active_alarms[alarm_id]
|
||||||
|
continue
|
||||||
|
if now >= alarm_info['trigger_time']:
|
||||||
|
# Trigger alarm if not already active
|
||||||
|
if alarm_info['process'] is None:
|
||||||
|
alarm_info['process'] = self._play_audio(
|
||||||
|
alarm_info['config']['file_to_play'],
|
||||||
|
alarm_info['config'].get('metadata', {}).get('volume', 100)
|
||||||
|
)
|
||||||
|
logger.info(f"Alarm {alarm_id} triggered at {now}.")
|
||||||
|
|
||||||
|
# Notify UI about the triggered alarm
|
||||||
|
self.control_queue.put({
|
||||||
|
'type': 'trigger',
|
||||||
|
'alarm_id': alarm_id,
|
||||||
|
'info': alarm_info
|
||||||
|
})
|
||||||
|
|
||||||
|
# Handle alarms that have been snoozed or dismissed
|
||||||
|
if alarm_info['process'] and alarm_info['process'].poll() is not None:
|
||||||
|
# Process has finished naturally
|
||||||
|
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
|
||||||
|
if next_trigger:
|
||||||
|
alarm_info['trigger_time'] = next_trigger
|
||||||
|
alarm_info['process'] = None
|
||||||
|
else:
|
||||||
|
logger.info(f"Removing non-repeating alarm {alarm_id}.")
|
||||||
|
del self.active_alarms[alarm_id]
|
||||||
|
|
||||||
|
# Actively clean up zombie processes
|
||||||
|
for alarm_id, alarm_info in list(self.active_alarms.items()):
|
||||||
|
process = alarm_info.get('process')
|
||||||
|
if process and process.poll() is not None:
|
||||||
|
# Remove terminated processes
|
||||||
|
alarm_info['process'] = None
|
||||||
|
|
||||||
|
time.sleep(0.5) # Prevent tight loop
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in playback worker: {e}")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def snooze_alarm(self, alarm_id: int):
|
||||||
|
"""Snooze a specific alarm"""
|
||||||
|
if alarm_id not in self.active_alarms:
|
||||||
|
logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms")
|
||||||
|
return False
|
||||||
|
|
||||||
|
alarm_info = self.active_alarms[alarm_id]
|
||||||
|
alarm_config = alarm_info['config']
|
||||||
|
|
||||||
|
# Default snooze configuration if not provided
|
||||||
|
snooze_config = alarm_config.get('snooze', {
|
||||||
|
'enabled': True,
|
||||||
|
'duration': 5, # Default 5 minutes
|
||||||
|
'max_count': 3 # Default max 3 snoozes
|
||||||
|
})
|
||||||
|
|
||||||
|
if not snooze_config.get('enabled', True):
|
||||||
|
logger.warning(f"Snooze not enabled for alarm {alarm_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check snooze count
|
||||||
|
if alarm_info.get('snooze_count', 0) >= snooze_config['max_count']:
|
||||||
|
logger.warning(f"Maximum snooze count reached for alarm {alarm_id}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Increment snooze count and set next trigger time
|
||||||
|
alarm_info['snooze_count'] = alarm_info.get('snooze_count', 0) + 1
|
||||||
|
snooze_duration = snooze_config.get('duration', 5)
|
||||||
|
alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
|
||||||
|
|
||||||
|
# Stop any active playback
|
||||||
|
process = alarm_info.get('process')
|
||||||
|
if process:
|
||||||
|
process.terminate()
|
||||||
|
process.wait()
|
||||||
|
alarm_info['process'] = None
|
||||||
|
|
||||||
|
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def dismiss_alarm(self, alarm_id: int):
|
||||||
|
"""Dismiss a specific alarm."""
|
||||||
|
if alarm_id not in self.active_alarms:
|
||||||
|
logger.warning(f"Cannot dismiss alarm {alarm_id} - not found in active alarms")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Stop playback and terminate any running process
|
||||||
|
alarm_info = self.active_alarms.pop(alarm_id, None)
|
||||||
|
if alarm_info and alarm_info.get('process'):
|
||||||
|
try:
|
||||||
|
alarm_info['process'].terminate()
|
||||||
|
alarm_info['process'].wait(timeout=2)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error terminating alarm process: {e}")
|
||||||
|
# Force kill if terminate fails
|
||||||
|
try:
|
||||||
|
alarm_info['process'].kill()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger.info(f"Dismissed alarm {alarm_id}.")
|
||||||
|
return True
|
@ -4,20 +4,10 @@ from typing import List, Optional, Dict, Any
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import logging
|
from logging_config import setup_logging
|
||||||
|
|
||||||
# Set up logging configuration
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.DEBUG, # Set to DEBUG to show all log levels
|
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
||||||
handlers=[
|
|
||||||
logging.StreamHandler(), # Console handler
|
|
||||||
logging.FileHandler('alert_api.log') # File handler
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
logger = logging.getLogger('AlertApi')
|
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logger = setup_logging()
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RepeatRule:
|
class RepeatRule:
|
||||||
@ -28,13 +18,22 @@ class RepeatRule:
|
|||||||
def validate(self) -> bool:
|
def validate(self) -> bool:
|
||||||
"""Validate repeat rule configuration"""
|
"""Validate repeat rule configuration"""
|
||||||
valid_types = {"daily", "weekly", "once"}
|
valid_types = {"daily", "weekly", "once"}
|
||||||
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}
|
valid_days = {"mon", "tue", "wed", "thu", "fri", "sat", "sun"}
|
||||||
|
|
||||||
|
|
||||||
if self.type not in valid_types:
|
if self.type not in valid_types:
|
||||||
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
|
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if self.type == "weekly":
|
if self.type == "weekly":
|
||||||
|
if self.days_of_week is None:
|
||||||
|
logger.error("days_of_week is None")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.days_of_week:
|
||||||
|
logger.error("days_of_week is empty")
|
||||||
|
return False
|
||||||
|
|
||||||
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
|
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
|
||||||
if invalid_days:
|
if invalid_days:
|
||||||
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")
|
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")
|
@ -9,7 +9,6 @@ def setup_logging(log_dir="logs", log_file="alarm_system.log", level=logging.DEB
|
|||||||
level=level,
|
level=level,
|
||||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||||
handlers=[
|
handlers=[
|
||||||
logging.StreamHandler(),
|
|
||||||
logging.FileHandler(log_path, mode='a', encoding='utf-8')
|
logging.FileHandler(log_path, mode='a', encoding='utf-8')
|
||||||
]
|
]
|
||||||
)
|
)
|
@ -5,14 +5,16 @@ import sys
|
|||||||
import signal
|
import signal
|
||||||
import threading
|
import threading
|
||||||
import logging
|
import logging
|
||||||
|
import curses
|
||||||
from http.server import HTTPServer
|
from http.server import HTTPServer
|
||||||
from multiprocessing import Queue
|
from multiprocessing import Queue
|
||||||
|
import subprocess
|
||||||
|
|
||||||
# Import our custom modules
|
# Import our custom modules
|
||||||
from alarm_api import AlertApi, run as run_api
|
from alarm_api import AlertApi, run as run_api
|
||||||
from alarm_storage import AlarmStorage
|
from alarm_storage import AlarmStorage
|
||||||
from alarm_siren import AlarmSiren
|
from alarm_siren import AlarmSiren
|
||||||
from ncurses_ui import UI
|
from ui.ncurses_ui import UI
|
||||||
from logging_config import setup_logging
|
from logging_config import setup_logging
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
@ -56,7 +58,7 @@ class AlarmSystemManager:
|
|||||||
self._sync_alarms()
|
self._sync_alarms()
|
||||||
|
|
||||||
# UI..
|
# UI..
|
||||||
self.ui = UI(self)
|
self.ui = UI(self, control_queue=self.siren.control_queue)
|
||||||
|
|
||||||
def _setup_signal_handlers(self):
|
def _setup_signal_handlers(self):
|
||||||
"""Set up signal handlers for graceful shutdown"""
|
"""Set up signal handlers for graceful shutdown"""
|
||||||
@ -153,6 +155,12 @@ class AlarmSystemManager:
|
|||||||
if self.api_thread and self.api_thread.is_alive():
|
if self.api_thread and self.api_thread.is_alive():
|
||||||
self.api_thread.join(timeout=2)
|
self.api_thread.join(timeout=2)
|
||||||
|
|
||||||
|
# Kill any remaining mpg123 processes
|
||||||
|
try:
|
||||||
|
subprocess.run(['pkill', 'mpg123'], check=False)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error killing mpg123 processes: {e}")
|
||||||
|
|
||||||
logger.info("Alarm System shutdown complete")
|
logger.info("Alarm System shutdown complete")
|
||||||
|
|
||||||
def main():
|
def main():
|
173
clock/tests.sh
Normal file
173
clock/tests.sh
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
API_URL="http://localhost:8000"
|
||||||
|
TEST_ALARM_NAME="Test Alarm"
|
||||||
|
TEST_ALARM_TIME="08:30:00"
|
||||||
|
TEST_AUDIO_FILE="/tmp/test.mp3"
|
||||||
|
|
||||||
|
# Color codes for output
|
||||||
|
GREEN='\033[0;32m'
|
||||||
|
RED='\033[0;31m'
|
||||||
|
BLUE='\033[0;34m'
|
||||||
|
NC='\033[0m' # No Color
|
||||||
|
|
||||||
|
# Helper function to print status messages
|
||||||
|
print_status() {
|
||||||
|
echo -e "${BLUE}=== $1 ===${NC}"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Helper function to check test results
|
||||||
|
check_result() {
|
||||||
|
if [ $? -eq 0 ]; then
|
||||||
|
echo -e "${GREEN}✓ $1${NC}"
|
||||||
|
else
|
||||||
|
echo -e "${RED}✗ $1${NC}"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
# 1. Test server connectivity
|
||||||
|
print_status "Testing server connectivity"
|
||||||
|
curl -s "$API_URL" > /dev/null
|
||||||
|
check_result "Server connectivity check"
|
||||||
|
|
||||||
|
# 2. Get initial state
|
||||||
|
print_status "Getting initial state"
|
||||||
|
INITIAL_STATE=$(curl -s -X GET "$API_URL")
|
||||||
|
echo "$INITIAL_STATE" | jq .
|
||||||
|
check_result "Retrieved initial state"
|
||||||
|
|
||||||
|
# 3. Add a new alarm (POST)
|
||||||
|
print_status "Adding new alarm"
|
||||||
|
POST_DATA=$(jq -n \
|
||||||
|
--arg name "$TEST_ALARM_NAME" \
|
||||||
|
--arg time "$TEST_ALARM_TIME" \
|
||||||
|
'{
|
||||||
|
"name": $name,
|
||||||
|
"time": $time,
|
||||||
|
"repeat_rule": {
|
||||||
|
"type": "weekly",
|
||||||
|
"days_of_week": ["monday", "wednesday", "friday"]
|
||||||
|
},
|
||||||
|
"enabled": true,
|
||||||
|
"snooze": {
|
||||||
|
"enabled": true,
|
||||||
|
"duration": 5,
|
||||||
|
"max_count": 3
|
||||||
|
},
|
||||||
|
"metadata": {
|
||||||
|
"volume": 75,
|
||||||
|
"notes": "Test alarm with full configuration"
|
||||||
|
}
|
||||||
|
}'
|
||||||
|
)
|
||||||
|
|
||||||
|
ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
|
||||||
|
NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id')
|
||||||
|
|
||||||
|
if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then
|
||||||
|
echo -e "${RED}Failed to add alarm${NC}"
|
||||||
|
echo "Response: $ADD_RESPONSE"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}"
|
||||||
|
|
||||||
|
#j# 4. Test duplicate alarm detection
|
||||||
|
#jprint_status "Testing duplicate alarm detection"
|
||||||
|
#jDUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
|
||||||
|
#jif [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Duplicate detection working${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Duplicate detection failed${NC}"
|
||||||
|
#j echo "Response: $DUPLICATE_RESPONSE"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# 5. Update the alarm (PUT)
|
||||||
|
#jprint_status "Updating alarm"
|
||||||
|
#jUPDATE_DATA=$(jq -n \
|
||||||
|
#j --arg name "$TEST_ALARM_NAME Updated" \
|
||||||
|
#j --arg time "$TEST_ALARM_TIME" \
|
||||||
|
#j --argjson id "$NEW_ALARM_ID" \
|
||||||
|
#j '{
|
||||||
|
#j "id": $id,
|
||||||
|
#j "name": $name,
|
||||||
|
#j "time": $time,
|
||||||
|
#j "repeat_rule": {
|
||||||
|
#j "type": "daily"
|
||||||
|
#j },
|
||||||
|
#j "enabled": true,
|
||||||
|
#j "snooze": {
|
||||||
|
#j "enabled": false,
|
||||||
|
#j "duration": 10,
|
||||||
|
#j "max_count": 2
|
||||||
|
#j },
|
||||||
|
#j "metadata": {
|
||||||
|
#j "volume": 90,
|
||||||
|
#j "notes": "Updated test alarm"
|
||||||
|
#j }
|
||||||
|
#j }'
|
||||||
|
#j)
|
||||||
|
#j
|
||||||
|
#jUPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL")
|
||||||
|
#jif [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Alarm update successful${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Alarm update failed${NC}"
|
||||||
|
#j echo "Response: $UPDATE_RESPONSE"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# 6. Verify the update
|
||||||
|
#jprint_status "Verifying update"
|
||||||
|
#jUPDATED_STATE=$(curl -s -X GET "$API_URL")
|
||||||
|
#jUPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)")
|
||||||
|
#jif [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Update verification successful${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Update verification failed${NC}"
|
||||||
|
#j echo "Current alarm state: $UPDATED_ALARM"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# 7. Test invalid inputs
|
||||||
|
#jprint_status "Testing invalid inputs"
|
||||||
|
#j
|
||||||
|
#j# Test invalid time format
|
||||||
|
#jINVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}')
|
||||||
|
#jINVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL")
|
||||||
|
#jif [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Invalid time format detection working${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Invalid time format detection failed${NC}"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# Test invalid repeat rule
|
||||||
|
#jINVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"')
|
||||||
|
#jINVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL")
|
||||||
|
#jif [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Invalid repeat rule detection failed${NC}"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# 8. Delete the test alarm
|
||||||
|
#jprint_status "Deleting test alarm"
|
||||||
|
#jDELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL")
|
||||||
|
#jif [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Alarm deletion successful${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Alarm deletion failed${NC}"
|
||||||
|
#j echo "Response: $DELETE_RESPONSE"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#j# 9. Verify deletion
|
||||||
|
#jprint_status "Verifying deletion"
|
||||||
|
#jFINAL_STATE=$(curl -s -X GET "$API_URL")
|
||||||
|
#jif [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then
|
||||||
|
#j echo -e "${GREEN}✓ Deletion verification successful${NC}"
|
||||||
|
#jelse
|
||||||
|
#j echo -e "${RED}✗ Deletion verification failed${NC}"
|
||||||
|
#j echo "Current state: $FINAL_STATE"
|
||||||
|
#jfi
|
||||||
|
#j
|
||||||
|
#jprint_status "Test suite completed!"
|
0
clock/ui/__init__.py
Normal file
0
clock/ui/__init__.py
Normal file
115
clock/ui/active_alarm.py
Normal file
115
clock/ui/active_alarm.py
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
import curses
|
||||||
|
from datetime import datetime
|
||||||
|
from .utils import init_colors, draw_big_digit
|
||||||
|
|
||||||
|
class ActiveAlarmView:
|
||||||
|
def __init__(self, storage, control_queue):
|
||||||
|
self.storage = storage
|
||||||
|
self.control_queue = control_queue
|
||||||
|
self.active_alarms = {}
|
||||||
|
|
||||||
|
def reset_state(self):
|
||||||
|
"""Reset the view state"""
|
||||||
|
self.active_alarms.clear()
|
||||||
|
|
||||||
|
def draw(self, stdscr):
|
||||||
|
"""Draw the active alarm screen"""
|
||||||
|
init_colors()
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
self._draw_main_clock(stdscr, height, width, current_time)
|
||||||
|
self._draw_active_alarm_info(stdscr, height, width)
|
||||||
|
self._draw_instructions(stdscr, height, width)
|
||||||
|
|
||||||
|
def handle_input(self, key):
|
||||||
|
"""Handle user input and return the next view name or None to stay"""
|
||||||
|
if not self.active_alarms:
|
||||||
|
return 'CLOCK'
|
||||||
|
|
||||||
|
alarm_id = list(self.active_alarms.keys())[0]
|
||||||
|
|
||||||
|
if key == ord('s'):
|
||||||
|
self._handle_snooze(alarm_id)
|
||||||
|
return None
|
||||||
|
elif key == ord('d'):
|
||||||
|
self._handle_dismiss(alarm_id)
|
||||||
|
return 'CLOCK'
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def update_active_alarms(self, active_alarms):
|
||||||
|
"""Update the active alarms state"""
|
||||||
|
self.active_alarms = active_alarms
|
||||||
|
|
||||||
|
def _draw_main_clock(self, stdscr, height, width, current_time):
|
||||||
|
"""Draw the main clock display"""
|
||||||
|
time_str = current_time.strftime("%H:%M:%S")
|
||||||
|
digit_width = 14
|
||||||
|
total_width = digit_width * len(time_str)
|
||||||
|
start_x = (width - total_width) // 2
|
||||||
|
start_y = (height - 7) // 2 - 4
|
||||||
|
|
||||||
|
# Draw blinking dot
|
||||||
|
if int(current_time.timestamp()) % 2 == 0:
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(start_y - 1, start_x + total_width - 2, "•")
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
# Draw big time digits
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
for i, digit in enumerate(time_str):
|
||||||
|
draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
# Draw date
|
||||||
|
date_str = current_time.strftime("%Y-%m-%d")
|
||||||
|
date_x = width // 2 - len(date_str) // 2
|
||||||
|
date_y = height // 2 + 4
|
||||||
|
stdscr.attron(curses.color_pair(2))
|
||||||
|
stdscr.addstr(date_y, date_x, date_str)
|
||||||
|
stdscr.attroff(curses.color_pair(2))
|
||||||
|
|
||||||
|
def _draw_active_alarm_info(self, stdscr, height, width):
|
||||||
|
"""Draw information about the active alarm"""
|
||||||
|
if not self.active_alarms:
|
||||||
|
return
|
||||||
|
|
||||||
|
alarm_id = list(self.active_alarms.keys())[0]
|
||||||
|
alarm_info = self.active_alarms[alarm_id]
|
||||||
|
alarm_config = alarm_info['config']
|
||||||
|
|
||||||
|
alarm_name = alarm_config.get('name', 'Unnamed Alarm')
|
||||||
|
alarm_time = alarm_config.get('time', 'Unknown Time')
|
||||||
|
alarm_str = f"[ {alarm_name} - {alarm_time} ]"
|
||||||
|
|
||||||
|
# Position alarm info above the date
|
||||||
|
date_y = height // 2 + 4 # Same as in _draw_main_clock
|
||||||
|
alarm_y = date_y - 2
|
||||||
|
alarm_x = max(0, width // 2 - len(alarm_str) // 2)
|
||||||
|
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(alarm_y, alarm_x, alarm_str)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _draw_instructions(self, stdscr, height, width):
|
||||||
|
"""Draw the user instructions"""
|
||||||
|
if self.active_alarms:
|
||||||
|
instructions = "S: Snooze D: Dismiss"
|
||||||
|
stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions)
|
||||||
|
|
||||||
|
def _handle_snooze(self, alarm_id):
|
||||||
|
"""Handle snoozing the alarm"""
|
||||||
|
self.control_queue.put({
|
||||||
|
'type': 'snooze',
|
||||||
|
'alarm_id': alarm_id
|
||||||
|
})
|
||||||
|
del self.active_alarms[alarm_id]
|
||||||
|
|
||||||
|
def _handle_dismiss(self, alarm_id):
|
||||||
|
"""Handle dismissing the alarm"""
|
||||||
|
self.control_queue.put({
|
||||||
|
'type': 'dismiss',
|
||||||
|
'alarm_id': alarm_id
|
||||||
|
})
|
||||||
|
del self.active_alarms[alarm_id]
|
298
clock/ui/add_alarm.py
Normal file
298
clock/ui/add_alarm.py
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
import curses
|
||||||
|
from datetime import datetime, date, timedelta
|
||||||
|
|
||||||
|
class AddAlarmView:
|
||||||
|
def __init__(self, storage, control_queue):
|
||||||
|
self.storage = storage
|
||||||
|
self.control_queue = control_queue
|
||||||
|
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||||||
|
self.reset_state()
|
||||||
|
|
||||||
|
def set_alarm_data(self, alarm):
|
||||||
|
"""Load existing alarm data for editing."""
|
||||||
|
self.alarm_data = alarm # Store for pre-filling the form
|
||||||
|
|
||||||
|
# Debug log to inspect the alarm structure
|
||||||
|
import logging
|
||||||
|
logging.getLogger(__name__).debug(f"set_alarm_data received: {alarm}")
|
||||||
|
|
||||||
|
# Ensure alarm is a dictionary
|
||||||
|
if not isinstance(alarm, dict):
|
||||||
|
logging.getLogger(__name__).error(f"Invalid alarm format: {type(alarm)}")
|
||||||
|
|
||||||
|
# Parse time string
|
||||||
|
hour, minute, _ = map(int, alarm['time'].split(':')) # Extract hour and minute return
|
||||||
|
|
||||||
|
# Pre-fill the form fields with alarm data
|
||||||
|
self.alarm_draft = {
|
||||||
|
'hour': hour,
|
||||||
|
'minute': minute,
|
||||||
|
'name': alarm.get('name', 'Unnamed Alarm'),
|
||||||
|
'enabled': alarm.get('enabled', True),
|
||||||
|
'date': None, # Your structure doesn't use date directly
|
||||||
|
'weekdays': alarm.get('repeat_rule', {}).get('days_of_week', []),
|
||||||
|
'current_weekday': 0,
|
||||||
|
'editing_name': False,
|
||||||
|
'temp_name': alarm.get('name', 'Unnamed Alarm'),
|
||||||
|
'selected_item': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def reset_state(self):
|
||||||
|
"""Reset all state variables to their initial values"""
|
||||||
|
self.alarm_draft = {
|
||||||
|
'hour': datetime.now().hour,
|
||||||
|
'minute': datetime.now().minute,
|
||||||
|
'name': 'New Alarm',
|
||||||
|
'enabled': True,
|
||||||
|
'date': None,
|
||||||
|
'weekdays': [],
|
||||||
|
'current_weekday': 0,
|
||||||
|
'editing_name': False,
|
||||||
|
'temp_name': '',
|
||||||
|
'selected_item': 0
|
||||||
|
}
|
||||||
|
self.date_edit_pos = 2 # Default to editing the day
|
||||||
|
|
||||||
|
def draw(self, stdscr):
|
||||||
|
"""Draw the add alarm screen"""
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
form_y = height // 2 - 8
|
||||||
|
|
||||||
|
self._draw_title(stdscr, form_y, width)
|
||||||
|
self._draw_time_field(stdscr, form_y + 2, width)
|
||||||
|
self._draw_date_field(stdscr, form_y + 4, width)
|
||||||
|
self._draw_weekdays(stdscr, form_y + 6, width)
|
||||||
|
self._draw_name_field(stdscr, form_y + 8, width)
|
||||||
|
self._draw_status_field(stdscr, form_y + 10, width)
|
||||||
|
self._draw_instructions(stdscr, height - 2, width)
|
||||||
|
|
||||||
|
def handle_input(self, key):
|
||||||
|
"""Handle user input and return the next view name or None to stay"""
|
||||||
|
if key == 27: # ESC
|
||||||
|
return self._handle_escape()
|
||||||
|
elif key == 10: # ENTER
|
||||||
|
return self._handle_enter()
|
||||||
|
|
||||||
|
if not self.alarm_draft['editing_name']:
|
||||||
|
if key in [ord('h'), curses.KEY_LEFT]:
|
||||||
|
self.alarm_draft['selected_item'] = (self.alarm_draft['selected_item'] - 1) % 6
|
||||||
|
elif key in [ord('l'), curses.KEY_RIGHT]:
|
||||||
|
self.alarm_draft['selected_item'] = (self.alarm_draft['selected_item'] + 1) % 6
|
||||||
|
|
||||||
|
self._handle_field_input(key)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _handle_field_input(self, key):
|
||||||
|
"""Handle input for the currently selected field"""
|
||||||
|
selected_item = self.alarm_draft['selected_item']
|
||||||
|
|
||||||
|
if key in [ord('k'), curses.KEY_UP, ord('j'), curses.KEY_DOWN]:
|
||||||
|
is_up = key in [ord('k'), curses.KEY_UP]
|
||||||
|
if selected_item == 0:
|
||||||
|
self._adjust_hour(is_up)
|
||||||
|
elif selected_item == 1:
|
||||||
|
self._adjust_minute(is_up)
|
||||||
|
elif selected_item == 2:
|
||||||
|
self._adjust_date(is_up)
|
||||||
|
elif selected_item == 3:
|
||||||
|
self._handle_weekday_input(key)
|
||||||
|
elif selected_item == 4:
|
||||||
|
self._handle_name_input(key)
|
||||||
|
elif selected_item == 5 and key == 32:
|
||||||
|
self.alarm_draft['enabled'] = not self.alarm_draft['enabled']
|
||||||
|
|
||||||
|
def _draw_title(self, stdscr, y, width):
|
||||||
|
"""Draw the title of the form"""
|
||||||
|
title = "Add New Alarm"
|
||||||
|
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
stdscr.addstr(y, width // 2 - len(title) // 2, title)
|
||||||
|
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
|
||||||
|
def _draw_field(self, stdscr, y, label, value, is_selected):
|
||||||
|
"""Draw a form field with label and value"""
|
||||||
|
label_str = f"{label}: "
|
||||||
|
x = self._center_text_x(label_str + str(value))
|
||||||
|
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(y, x, label_str)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
if is_selected:
|
||||||
|
stdscr.attron(curses.color_pair(2))
|
||||||
|
stdscr.addstr(y, x + len(label_str), str(value))
|
||||||
|
if is_selected:
|
||||||
|
stdscr.attroff(curses.color_pair(2))
|
||||||
|
|
||||||
|
def _draw_time_field(self, stdscr, y, width):
|
||||||
|
"""Draw the time field"""
|
||||||
|
self._draw_field(stdscr, y, "Time",
|
||||||
|
f"{self.alarm_draft['hour']:02d}:{self.alarm_draft['minute']:02d}",
|
||||||
|
self.alarm_draft['selected_item'] in [0, 1])
|
||||||
|
|
||||||
|
def _draw_date_field(self, stdscr, y, width):
|
||||||
|
"""Draw the date field"""
|
||||||
|
date_str = "Repeating weekly" if self.alarm_draft['weekdays'] else (
|
||||||
|
self.alarm_draft['date'].strftime("%Y-%m-%d") if self.alarm_draft['date'] else "None"
|
||||||
|
)
|
||||||
|
self._draw_field(stdscr, y, "Date", date_str,
|
||||||
|
self.alarm_draft['selected_item'] == 2)
|
||||||
|
|
||||||
|
def _draw_weekdays(self, stdscr, y, width):
|
||||||
|
"""Draw the weekday selection field"""
|
||||||
|
label_x = width // 2 - 20
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(y, label_x, "Repeat: ")
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
weekday_x = label_x + len("Repeat: ")
|
||||||
|
for i, day in enumerate(self.weekday_names):
|
||||||
|
self._draw_weekday(stdscr, y, weekday_x + i * 4, day, i)
|
||||||
|
|
||||||
|
def _draw_weekday(self, stdscr, y, x, day, index):
|
||||||
|
"""Draw a single weekday"""
|
||||||
|
is_selected = (self.alarm_draft['selected_item'] == 3 and
|
||||||
|
index == self.alarm_draft['current_weekday'])
|
||||||
|
is_active = index in self.alarm_draft['weekdays']
|
||||||
|
|
||||||
|
if is_selected:
|
||||||
|
stdscr.attron(curses.color_pair(2))
|
||||||
|
elif is_active:
|
||||||
|
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
else:
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
|
||||||
|
stdscr.addstr(y, x, day)
|
||||||
|
|
||||||
|
if is_selected:
|
||||||
|
stdscr.attroff(curses.color_pair(2))
|
||||||
|
elif is_active:
|
||||||
|
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
else:
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _draw_name_field(self, stdscr, y, width):
|
||||||
|
"""Draw the name field"""
|
||||||
|
self._draw_field(stdscr, y, "Name", self.alarm_draft['name'],
|
||||||
|
self.alarm_draft['selected_item'] == 4)
|
||||||
|
|
||||||
|
def _draw_status_field(self, stdscr, y, width):
|
||||||
|
"""Draw the enabled/disabled status field"""
|
||||||
|
enabled_str = "● Enabled" if self.alarm_draft['enabled'] else "○ Disabled"
|
||||||
|
self._draw_field(stdscr, y, "Status", enabled_str,
|
||||||
|
self.alarm_draft['selected_item'] == 5)
|
||||||
|
|
||||||
|
def _draw_instructions(self, stdscr, y, width):
|
||||||
|
"""Draw the instructions at the bottom of the screen"""
|
||||||
|
instructions = "j/k: Change h/l: Move Space: Toggle Enter: Save Esc: Cancel"
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(y, width // 2 - len(instructions) // 2, instructions)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _center_text_x(self, text):
|
||||||
|
"""Calculate x position to center text"""
|
||||||
|
return curses.COLS // 2 - len(text) // 2
|
||||||
|
|
||||||
|
def _adjust_hour(self, increase):
|
||||||
|
"""Adjust the hour value"""
|
||||||
|
delta = 1 if increase else -1
|
||||||
|
self.alarm_draft['hour'] = (self.alarm_draft['hour'] + delta) % 24
|
||||||
|
|
||||||
|
def _adjust_minute(self, increase):
|
||||||
|
"""Adjust the minute value"""
|
||||||
|
delta = 1 if increase else -1
|
||||||
|
self.alarm_draft['minute'] = (self.alarm_draft['minute'] + delta) % 60
|
||||||
|
|
||||||
|
def _adjust_date(self, increase):
|
||||||
|
"""Adjust the date value"""
|
||||||
|
if not self.alarm_draft['date']:
|
||||||
|
self.alarm_draft['date'] = datetime.now().date()
|
||||||
|
return
|
||||||
|
|
||||||
|
delta = 1 if increase else -1
|
||||||
|
current_date = self.alarm_draft['date']
|
||||||
|
|
||||||
|
try:
|
||||||
|
if self.date_edit_pos == 0: # Year
|
||||||
|
new_year = max(current_date.year + delta, datetime.now().year)
|
||||||
|
self.alarm_draft['date'] = current_date.replace(year=new_year)
|
||||||
|
elif self.date_edit_pos == 1: # Month
|
||||||
|
new_month = max(1, min(12, current_date.month + delta))
|
||||||
|
max_day = (datetime(current_date.year, new_month, 1) +
|
||||||
|
timedelta(days=31)).replace(day=1) - timedelta(days=1)
|
||||||
|
self.alarm_draft['date'] = current_date.replace(
|
||||||
|
month=new_month,
|
||||||
|
day=min(current_date.day, max_day.day)
|
||||||
|
)
|
||||||
|
elif self.date_edit_pos == 2: # Day
|
||||||
|
max_day = (datetime(current_date.year, current_date.month, 1) +
|
||||||
|
timedelta(days=31)).replace(day=1) - timedelta(days=1)
|
||||||
|
new_day = max(1, min(max_day.day, current_date.day + delta))
|
||||||
|
self.alarm_draft['date'] = current_date.replace(day=new_day)
|
||||||
|
except ValueError as e:
|
||||||
|
# Handle date validation errors
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _handle_weekday_input(self, key):
|
||||||
|
"""Handle input for weekday selection"""
|
||||||
|
if key in [ord('h'), curses.KEY_LEFT]:
|
||||||
|
self.alarm_draft['current_weekday'] = (self.alarm_draft['current_weekday'] - 1) % 7
|
||||||
|
elif key in [ord('l'), curses.KEY_RIGHT]:
|
||||||
|
self.alarm_draft['current_weekday'] = (self.alarm_draft['current_weekday'] + 1) % 7
|
||||||
|
elif key == 32: # SPACE
|
||||||
|
current_day = self.alarm_draft['current_weekday']
|
||||||
|
if current_day in self.alarm_draft['weekdays']:
|
||||||
|
self.alarm_draft['weekdays'].remove(current_day)
|
||||||
|
else:
|
||||||
|
self.alarm_draft['weekdays'].append(current_day)
|
||||||
|
self.alarm_draft['weekdays'].sort()
|
||||||
|
|
||||||
|
if self.alarm_draft['weekdays']:
|
||||||
|
self.alarm_draft['date'] = None
|
||||||
|
|
||||||
|
def _handle_name_input(self, key):
|
||||||
|
"""Handle input for name editing"""
|
||||||
|
if key == 32: # SPACE
|
||||||
|
if not self.alarm_draft['editing_name']:
|
||||||
|
self.alarm_draft['editing_name'] = True
|
||||||
|
self.alarm_draft['temp_name'] = self.alarm_draft['name']
|
||||||
|
self.alarm_draft['name'] = ''
|
||||||
|
elif self.alarm_draft['editing_name']:
|
||||||
|
if key == curses.KEY_BACKSPACE or key == 127:
|
||||||
|
self.alarm_draft['name'] = self.alarm_draft['name'][:-1]
|
||||||
|
elif 32 <= key <= 126: # Printable ASCII
|
||||||
|
self.alarm_draft['name'] += chr(key)
|
||||||
|
|
||||||
|
def _handle_escape(self):
|
||||||
|
"""Handle escape key press"""
|
||||||
|
if self.alarm_draft['editing_name']:
|
||||||
|
self.alarm_draft['name'] = self.alarm_draft['temp_name']
|
||||||
|
self.alarm_draft['editing_name'] = False
|
||||||
|
return None
|
||||||
|
return 'CLOCK'
|
||||||
|
|
||||||
|
def _handle_enter(self):
|
||||||
|
"""Handle enter key press"""
|
||||||
|
if self.alarm_draft['editing_name']:
|
||||||
|
self.alarm_draft['editing_name'] = False
|
||||||
|
self.alarm_draft['selected_item'] = 0
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
alarm_data = {
|
||||||
|
"name": self.alarm_draft['name'],
|
||||||
|
"time": f"{self.alarm_draft['hour']:02d}:{self.alarm_draft['minute']:02d}:00",
|
||||||
|
"enabled": self.alarm_draft['enabled'],
|
||||||
|
"repeat_rule": {
|
||||||
|
"type": "weekly" if self.alarm_draft['weekdays'] else "once",
|
||||||
|
"days_of_week": [self.weekday_names[day].lower()
|
||||||
|
for day in self.alarm_draft['weekdays']],
|
||||||
|
"at": (self.alarm_draft['date'].strftime("%Y-%m-%d")
|
||||||
|
if self.alarm_draft['date'] and not self.alarm_draft['weekdays']
|
||||||
|
else None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.storage.save_new_alert(alarm_data)
|
||||||
|
return 'CLOCK'
|
||||||
|
except Exception as e:
|
||||||
|
# Handle save errors
|
||||||
|
return None
|
@ -10,13 +10,13 @@ BIG_DIGITS = {
|
|||||||
" █████████ "
|
" █████████ "
|
||||||
],
|
],
|
||||||
'1': [
|
'1': [
|
||||||
" ███ ",
|
" ████ ",
|
||||||
" █████ ",
|
" ██████ ",
|
||||||
" ███ ",
|
" ████ ",
|
||||||
" ███ ",
|
" ████ ",
|
||||||
" ███ ",
|
" ████ ",
|
||||||
" ███ ",
|
" ████ ",
|
||||||
" ███████ "
|
" █████████ "
|
||||||
],
|
],
|
||||||
'2': [
|
'2': [
|
||||||
" ██████████ ",
|
" ██████████ ",
|
||||||
@ -98,5 +98,13 @@ BIG_DIGITS = {
|
|||||||
" ████ ",
|
" ████ ",
|
||||||
" ████ ",
|
" ████ ",
|
||||||
" "
|
" "
|
||||||
|
],
|
||||||
|
'?': [
|
||||||
|
" ??? ",
|
||||||
|
" ????? ",
|
||||||
|
"?? ??",
|
||||||
|
" ??? ",
|
||||||
|
" ?? ?? ",
|
||||||
|
" ? "
|
||||||
]
|
]
|
||||||
}
|
}
|
170
clock/ui/list_alarms.py
Normal file
170
clock/ui/list_alarms.py
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
import curses
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
class ListAlarmsView:
|
||||||
|
def __init__(self, storage):
|
||||||
|
"""Initialize the list alarms view"""
|
||||||
|
self.storage = storage
|
||||||
|
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
|
||||||
|
self.selected_index = 0
|
||||||
|
self.alarms = []
|
||||||
|
|
||||||
|
def reset_state(self):
|
||||||
|
"""Reset the view state"""
|
||||||
|
self.selected_index = 0
|
||||||
|
self.alarms = self.storage.get_saved_alerts()
|
||||||
|
|
||||||
|
def update_alarms(self, alarms):
|
||||||
|
"""Update the list of alarms to display."""
|
||||||
|
self.alarms = alarms
|
||||||
|
self.selected_index = 0
|
||||||
|
|
||||||
|
def get_selected_alarm(self):
|
||||||
|
"""Get the currently selected alarm."""
|
||||||
|
if 0 <= self.selected_index < len(self.alarms):
|
||||||
|
return self.alarms[self.selected_index]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def draw(self, stdscr):
|
||||||
|
"""Draw the list of alarms screen"""
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
|
||||||
|
self._draw_header(stdscr, width)
|
||||||
|
visible_range = self._calculate_visible_range(height)
|
||||||
|
self._draw_alarm_list(stdscr, height, width, visible_range)
|
||||||
|
self._draw_instructions(stdscr, height, width)
|
||||||
|
|
||||||
|
def handle_input(self, key):
|
||||||
|
"""Handle user input and return the next view name or None to stay"""
|
||||||
|
total_items = len(self.alarms) + 1 # +1 for "Add new alarm" option
|
||||||
|
|
||||||
|
if key == 27: # ESC
|
||||||
|
return 'CLOCK'
|
||||||
|
elif key in [ord('j'), curses.KEY_DOWN]:
|
||||||
|
self.selected_index = (self.selected_index + 1) % total_items
|
||||||
|
elif key in [ord('k'), curses.KEY_UP]:
|
||||||
|
self.selected_index = (self.selected_index - 1) % total_items
|
||||||
|
elif key == ord('d'):
|
||||||
|
return self._handle_delete()
|
||||||
|
elif key in [ord('a'), 10]: # 'a' or Enter
|
||||||
|
return self._handle_add_edit()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _calculate_visible_range(self, height):
|
||||||
|
"""Calculate the visible range for scrolling"""
|
||||||
|
max_visible_items = height - 8 # Space for header and footer
|
||||||
|
total_items = len(self.alarms) + 1 # +1 for "Add new alarm" option
|
||||||
|
|
||||||
|
# Calculate scroll position
|
||||||
|
start_idx = max(0, min(self.selected_index - max_visible_items // 2,
|
||||||
|
total_items - max_visible_items))
|
||||||
|
if start_idx < 0:
|
||||||
|
start_idx = 0
|
||||||
|
end_idx = min(start_idx + max_visible_items, total_items)
|
||||||
|
|
||||||
|
return (start_idx, end_idx)
|
||||||
|
|
||||||
|
def _draw_header(self, stdscr, width):
|
||||||
|
"""Draw the header text"""
|
||||||
|
header_text = "Alarms"
|
||||||
|
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
stdscr.addstr(2, self._center_x(width, header_text), header_text)
|
||||||
|
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
|
||||||
|
|
||||||
|
def _draw_alarm_list(self, stdscr, height, width, visible_range):
|
||||||
|
"""Draw the list of alarms"""
|
||||||
|
start_idx, end_idx = visible_range
|
||||||
|
|
||||||
|
for i in range(start_idx, end_idx):
|
||||||
|
y_pos = 4 + (i - start_idx)
|
||||||
|
display_str = self._format_alarm_display(i)
|
||||||
|
|
||||||
|
# Truncate if too long
|
||||||
|
max_length = width - 6
|
||||||
|
if len(display_str) > max_length:
|
||||||
|
display_str = display_str[:max_length-3] + "..."
|
||||||
|
|
||||||
|
x_pos = self._center_x(width, display_str)
|
||||||
|
self._draw_alarm_item(stdscr, y_pos, x_pos, display_str, i == self.selected_index)
|
||||||
|
|
||||||
|
def _format_alarm_display(self, index):
|
||||||
|
"""Format the display string for an alarm"""
|
||||||
|
if index == len(self.alarms):
|
||||||
|
return "Add new alarm..."
|
||||||
|
|
||||||
|
alarm = self.alarms[index]
|
||||||
|
time_str = alarm.get('time', 'Unknown')
|
||||||
|
repeat_info = self._format_repeat_info(alarm)
|
||||||
|
status = "✓" if alarm.get('enabled', True) else "✗"
|
||||||
|
|
||||||
|
return f"{status} {time_str} {alarm.get('name', 'Unnamed')}{repeat_info}"
|
||||||
|
|
||||||
|
def _format_repeat_info(self, alarm):
|
||||||
|
"""Format the repeat information for an alarm"""
|
||||||
|
repeat_rule = alarm.get('repeat_rule', {})
|
||||||
|
if not repeat_rule:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
if repeat_rule.get('type') == 'weekly':
|
||||||
|
days = repeat_rule.get('days', [])
|
||||||
|
return f" (Every {', '.join(self.weekday_names[d] for d in days)})"
|
||||||
|
elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'):
|
||||||
|
return f" (On {repeat_rule['date']})"
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def _draw_alarm_item(self, stdscr, y_pos, x_pos, display_str, is_selected):
|
||||||
|
"""Draw a single alarm item"""
|
||||||
|
if is_selected:
|
||||||
|
# Draw selection brackets in green
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(y_pos, x_pos - 2, "[ ")
|
||||||
|
stdscr.addstr(y_pos, x_pos + len(display_str), " ]")
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
# Draw text in yellow (highlighted)
|
||||||
|
stdscr.attron(curses.color_pair(2))
|
||||||
|
stdscr.addstr(y_pos, x_pos, display_str)
|
||||||
|
stdscr.attroff(curses.color_pair(2))
|
||||||
|
else:
|
||||||
|
# Draw normal items in green
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(y_pos, x_pos, display_str)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _draw_instructions(self, stdscr, height, width):
|
||||||
|
"""Draw the instructions at the bottom of the screen"""
|
||||||
|
instructions = "j/k: Move d: Delete a: Add/Edit Esc: Back"
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
stdscr.addstr(height - 2, self._center_x(width, instructions), instructions)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _center_x(self, width, text):
|
||||||
|
"""Calculate x coordinate to center text"""
|
||||||
|
return width // 2 - len(text) // 2
|
||||||
|
|
||||||
|
def _handle_delete(self):
|
||||||
|
"""Handle alarm deletion"""
|
||||||
|
if self.selected_index < len(self.alarms):
|
||||||
|
try:
|
||||||
|
alarm_to_delete = self.alarms[self.selected_index]
|
||||||
|
self.storage.remove_saved_alert(alarm_to_delete['id'])
|
||||||
|
self.alarms = self.storage.get_saved_alerts()
|
||||||
|
# Adjust selected item if needed
|
||||||
|
if self.selected_index >= len(self.alarms):
|
||||||
|
self.selected_index = len(self.alarms)
|
||||||
|
except Exception as e:
|
||||||
|
# You might want to add error handling here
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _handle_add_edit(self):
|
||||||
|
"""Handle add/edit action"""
|
||||||
|
if self.selected_index == len(self.alarms):
|
||||||
|
# "Add new alarm" option selected
|
||||||
|
return 'ADD_ALARM'
|
||||||
|
else:
|
||||||
|
# Edit existing alarm
|
||||||
|
selected_alarm = self.alarms[self.selected_index]
|
||||||
|
return ('ADD_ALARM', selected_alarm) # Pass alarm to AddAlarmView
|
71
clock/ui/main_clock.py
Normal file
71
clock/ui/main_clock.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import curses
|
||||||
|
from datetime import datetime
|
||||||
|
from .utils import init_colors, draw_big_digit
|
||||||
|
from .big_digits import BIG_DIGITS
|
||||||
|
|
||||||
|
class MainClockView:
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize the main clock view"""
|
||||||
|
self.digit_width = 14 # Width of each digit pattern in the big clock display
|
||||||
|
|
||||||
|
def draw(self, stdscr):
|
||||||
|
"""Draw the main clock screen"""
|
||||||
|
init_colors()
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
self._draw_big_time(stdscr, current_time, height, width)
|
||||||
|
self._draw_date(stdscr, current_time, height, width)
|
||||||
|
self._draw_menu(stdscr, height, width)
|
||||||
|
|
||||||
|
def handle_input(self, key):
|
||||||
|
"""Handle user input and return the next view name or None to stay"""
|
||||||
|
if key == ord('a'):
|
||||||
|
return 'ADD_ALARM'
|
||||||
|
elif key == ord('s'):
|
||||||
|
return 'LIST_ALARMS'
|
||||||
|
elif key == ord('q'):
|
||||||
|
return 'QUIT'
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _draw_big_time(self, stdscr, current_time, height, width):
|
||||||
|
"""Draw the big time display"""
|
||||||
|
time_str = current_time.strftime("%H:%M:%S")
|
||||||
|
total_width = self.digit_width * len(time_str)
|
||||||
|
start_x = self._center_x(width, total_width)
|
||||||
|
start_y = self._center_y(height, 7) - 4 # 7 is the height of digits
|
||||||
|
|
||||||
|
# Draw each digit in green
|
||||||
|
stdscr.attron(curses.color_pair(1))
|
||||||
|
for i, digit in enumerate(time_str):
|
||||||
|
self._draw_digit(stdscr, start_y, start_x + i * self.digit_width, digit)
|
||||||
|
stdscr.attroff(curses.color_pair(1))
|
||||||
|
|
||||||
|
def _draw_date(self, stdscr, current_time, height, width):
|
||||||
|
"""Draw the current date"""
|
||||||
|
date_str = current_time.strftime("%Y-%m-%d")
|
||||||
|
date_x = self._center_x(width, len(date_str))
|
||||||
|
date_y = height // 2 + 4
|
||||||
|
|
||||||
|
stdscr.attron(curses.color_pair(2))
|
||||||
|
stdscr.addstr(date_y, date_x, date_str)
|
||||||
|
stdscr.attroff(curses.color_pair(2))
|
||||||
|
|
||||||
|
def _draw_menu(self, stdscr, height, width):
|
||||||
|
"""Draw the menu options"""
|
||||||
|
menu_str = "A: Add Alarm S: List Alarms Q: Quit"
|
||||||
|
menu_x = self._center_x(width, len(menu_str))
|
||||||
|
stdscr.addstr(height - 2, menu_x, menu_str)
|
||||||
|
|
||||||
|
def _draw_digit(self, stdscr, y, x, digit):
|
||||||
|
"""Draw a single big digit"""
|
||||||
|
# Delegate to the existing draw_big_digit utility function
|
||||||
|
draw_big_digit(stdscr, y, x, digit)
|
||||||
|
|
||||||
|
def _center_x(self, width, text_width):
|
||||||
|
"""Calculate x coordinate to center text horizontally"""
|
||||||
|
return (width - text_width) // 2
|
||||||
|
|
||||||
|
def _center_y(self, height, text_height):
|
||||||
|
"""Calculate y coordinate to center text vertically"""
|
||||||
|
return (height - text_height) // 2
|
179
clock/ui/ncurses_ui.py
Normal file
179
clock/ui/ncurses_ui.py
Normal file
@ -0,0 +1,179 @@
|
|||||||
|
import curses
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
import queue
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from .utils import init_colors, draw_error, ColorScheme
|
||||||
|
from .active_alarm import ActiveAlarmView
|
||||||
|
from .add_alarm import AddAlarmView
|
||||||
|
from .list_alarms import ListAlarmsView
|
||||||
|
from .main_clock import MainClockView
|
||||||
|
|
||||||
|
class UI:
|
||||||
|
def __init__(self, alarm_system_manager, control_queue):
|
||||||
|
"""Initialize the UI system"""
|
||||||
|
# System components
|
||||||
|
self.alarm_system = alarm_system_manager
|
||||||
|
self.stop_event = alarm_system_manager.stop_event
|
||||||
|
self.storage = alarm_system_manager.storage
|
||||||
|
self.control_queue = control_queue
|
||||||
|
|
||||||
|
# Logging
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Initialize views
|
||||||
|
self._init_views()
|
||||||
|
|
||||||
|
# UI State
|
||||||
|
self.current_view = 'CLOCK'
|
||||||
|
self.error_message = None
|
||||||
|
self.error_timestamp = None
|
||||||
|
|
||||||
|
def _init_views(self):
|
||||||
|
"""Initialize all view classes"""
|
||||||
|
self.views = {
|
||||||
|
'CLOCK': MainClockView(),
|
||||||
|
'ADD_ALARM': AddAlarmView(self.storage, self.control_queue),
|
||||||
|
'LIST_ALARMS': ListAlarmsView(self.storage),
|
||||||
|
'ACTIVE_ALARMS': ActiveAlarmView(self.storage, self.control_queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
"""Start the ncurses UI in a separate thread"""
|
||||||
|
def ui_thread():
|
||||||
|
try:
|
||||||
|
# Start control queue monitor
|
||||||
|
monitor_thread = threading.Thread(
|
||||||
|
target=self._monitor_control_queue,
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
monitor_thread.start()
|
||||||
|
|
||||||
|
# Start UI
|
||||||
|
curses.wrapper(self._main_loop)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"UI Thread Error: {e}")
|
||||||
|
finally:
|
||||||
|
self.stop_event.set()
|
||||||
|
|
||||||
|
ui_thread_obj = threading.Thread(target=ui_thread, daemon=True)
|
||||||
|
ui_thread_obj.start()
|
||||||
|
return ui_thread_obj
|
||||||
|
|
||||||
|
def _monitor_control_queue(self):
|
||||||
|
"""Monitor the control queue for alarm events"""
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
try:
|
||||||
|
# Non-blocking check of control queue
|
||||||
|
try:
|
||||||
|
control_msg = self.control_queue.get(timeout=1)
|
||||||
|
|
||||||
|
if control_msg['type'] == 'trigger':
|
||||||
|
# Update active alarms view
|
||||||
|
active_view = self.views['ACTIVE_ALARMS']
|
||||||
|
alarm_id = control_msg['alarm_id']
|
||||||
|
active_view.update_active_alarms({
|
||||||
|
alarm_id: control_msg['info']
|
||||||
|
})
|
||||||
|
|
||||||
|
# Switch to active alarms view
|
||||||
|
self.current_view = 'ACTIVE_ALARMS'
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error monitoring control queue: {e}")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def _show_error(self, message, duration=3):
|
||||||
|
"""Display an error message"""
|
||||||
|
self.error_message = message
|
||||||
|
self.error_timestamp = time.time()
|
||||||
|
|
||||||
|
def _clear_error_if_expired(self):
|
||||||
|
"""Clear error message if expired"""
|
||||||
|
if self.error_message and self.error_timestamp:
|
||||||
|
if time.time() - self.error_timestamp > 3:
|
||||||
|
self.error_message = None
|
||||||
|
self.error_timestamp = None
|
||||||
|
|
||||||
|
def _main_loop(self, stdscr):
|
||||||
|
"""Main ncurses event loop"""
|
||||||
|
# Setup curses
|
||||||
|
curses.curs_set(0) # Hide cursor
|
||||||
|
stdscr.keypad(1) # Enable keypad
|
||||||
|
stdscr.timeout(100) # Non-blocking input
|
||||||
|
init_colors() # Initialize color pairs
|
||||||
|
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
# Clear screen
|
||||||
|
stdscr.erase()
|
||||||
|
|
||||||
|
# Get current view
|
||||||
|
current_view = self.views.get(self.current_view)
|
||||||
|
if not current_view:
|
||||||
|
self.logger.error(f"Invalid view: {self.current_view}")
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Draw current view
|
||||||
|
current_view.draw(stdscr)
|
||||||
|
|
||||||
|
# Handle any error messages
|
||||||
|
if self.error_message:
|
||||||
|
draw_error(stdscr, self.error_message)
|
||||||
|
self._clear_error_if_expired()
|
||||||
|
|
||||||
|
# Refresh screen
|
||||||
|
stdscr.refresh()
|
||||||
|
|
||||||
|
# Handle input
|
||||||
|
key = stdscr.getch()
|
||||||
|
if key != -1:
|
||||||
|
# Handle quit key globally
|
||||||
|
if key == ord('q'):
|
||||||
|
if self.current_view == 'CLOCK':
|
||||||
|
break # Exit application from clock view
|
||||||
|
else:
|
||||||
|
self.current_view = 'CLOCK' # Return to clock from other views
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Let current view handle input
|
||||||
|
result = current_view.handle_input(key)
|
||||||
|
|
||||||
|
# Handle tuple result (view, data) or just a view change
|
||||||
|
if isinstance(result, tuple):
|
||||||
|
next_view, data = result
|
||||||
|
else:
|
||||||
|
next_view, data = result, None
|
||||||
|
|
||||||
|
# Handle quitting
|
||||||
|
if next_view == 'QUIT':
|
||||||
|
break
|
||||||
|
|
||||||
|
elif next_view:
|
||||||
|
# Update list alarms view data when switching to it
|
||||||
|
if next_view == 'LIST_ALARMS':
|
||||||
|
self.views['LIST_ALARMS'].update_alarms(
|
||||||
|
self.storage.get_saved_alerts()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle editing an alarm by passing data to AddAlarmView
|
||||||
|
elif next_view == 'ADD_ALARM' and data:
|
||||||
|
self.views['ADD_ALARM'].set_alarm_data(data)
|
||||||
|
|
||||||
|
self.current_view = next_view
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error in main loop: {e}")
|
||||||
|
self._show_error(str(e))
|
||||||
|
|
||||||
|
time.sleep(0.1) # Prevent CPU hogging
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the UI system"""
|
||||||
|
self.stop_event.set()
|
138
clock/ui/utils.py
Normal file
138
clock/ui/utils.py
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
import curses
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
from .big_digits import BIG_DIGITS
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ColorScheme:
|
||||||
|
"""Color scheme configuration for the UI"""
|
||||||
|
PRIMARY = 1 # Green on black
|
||||||
|
HIGHLIGHT = 2 # Yellow on black
|
||||||
|
ERROR = 3 # Red on black
|
||||||
|
|
||||||
|
class ViewUtils:
|
||||||
|
"""Common utility functions for view classes"""
|
||||||
|
@staticmethod
|
||||||
|
def center_x(width: int, text_width: int) -> int:
|
||||||
|
"""Calculate x coordinate to center text horizontally"""
|
||||||
|
return max(0, (width - text_width) // 2)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def center_y(height: int, text_height: int) -> int:
|
||||||
|
"""Calculate y coordinate to center text vertically"""
|
||||||
|
return max(0, (height - text_height) // 2)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def draw_centered_text(stdscr, y: int, text: str, color_pair: Optional[int] = None, attrs: int = 0):
|
||||||
|
"""Draw text centered horizontally on the screen with optional color and attributes"""
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
x = ViewUtils.center_x(width, len(text))
|
||||||
|
|
||||||
|
if color_pair is not None:
|
||||||
|
stdscr.attron(curses.color_pair(color_pair) | attrs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
stdscr.addstr(y, x, text)
|
||||||
|
except curses.error:
|
||||||
|
pass # Ignore errors from writing at invalid positions
|
||||||
|
|
||||||
|
if color_pair is not None:
|
||||||
|
stdscr.attroff(curses.color_pair(color_pair) | attrs)
|
||||||
|
|
||||||
|
def init_colors():
|
||||||
|
"""Initialize color pairs for the application"""
|
||||||
|
try:
|
||||||
|
curses.start_color()
|
||||||
|
curses.use_default_colors()
|
||||||
|
|
||||||
|
# Primary color (green text on black background)
|
||||||
|
curses.init_pair(ColorScheme.PRIMARY, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
||||||
|
|
||||||
|
# Highlight color (yellow text on black background)
|
||||||
|
curses.init_pair(ColorScheme.HIGHLIGHT, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
||||||
|
|
||||||
|
# Error color (red text on black background)
|
||||||
|
curses.init_pair(ColorScheme.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
|
||||||
|
except Exception as e:
|
||||||
|
# Log error or handle gracefully if color initialization fails
|
||||||
|
pass
|
||||||
|
|
||||||
|
def draw_error(stdscr, error_message: str, duration_sec: int = 3):
|
||||||
|
"""
|
||||||
|
Draw error message at the bottom of the screen
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stdscr: Curses window object
|
||||||
|
error_message: Message to display
|
||||||
|
duration_sec: How long the error should be displayed (for reference by caller)
|
||||||
|
"""
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
|
||||||
|
# Truncate message if too long
|
||||||
|
max_width = width - 4
|
||||||
|
if len(error_message) > max_width:
|
||||||
|
error_message = error_message[:max_width-3] + "..."
|
||||||
|
|
||||||
|
# Position near bottom of screen
|
||||||
|
error_y = height - 4
|
||||||
|
|
||||||
|
ViewUtils.draw_centered_text(
|
||||||
|
stdscr,
|
||||||
|
error_y,
|
||||||
|
error_message,
|
||||||
|
ColorScheme.ERROR,
|
||||||
|
curses.A_BOLD
|
||||||
|
)
|
||||||
|
|
||||||
|
def draw_big_digit(stdscr, y: int, x: int, digit: str):
|
||||||
|
"""
|
||||||
|
Draw a large digit using the predefined patterns
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stdscr: Curses window object
|
||||||
|
y: Starting y coordinate
|
||||||
|
x: Starting x coordinate
|
||||||
|
digit: Character to draw ('0'-'9', ':', etc)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
patterns = BIG_DIGITS.get(digit, BIG_DIGITS['?'])
|
||||||
|
for i, line in enumerate(patterns):
|
||||||
|
try:
|
||||||
|
stdscr.addstr(y + i, x, line)
|
||||||
|
except curses.error:
|
||||||
|
continue # Skip lines that would write outside the window
|
||||||
|
except (curses.error, IndexError):
|
||||||
|
pass # Ignore any drawing errors
|
||||||
|
|
||||||
|
def safe_addstr(stdscr, y: int, x: int, text: str, color_pair: Optional[int] = None, attrs: int = 0):
|
||||||
|
"""
|
||||||
|
Safely add a string to the screen, handling boundary conditions
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stdscr: Curses window object
|
||||||
|
y: Y coordinate
|
||||||
|
x: X coordinate
|
||||||
|
text: Text to draw
|
||||||
|
color_pair: Optional color pair number
|
||||||
|
attrs: Additional curses attributes
|
||||||
|
"""
|
||||||
|
height, width = stdscr.getmaxyx()
|
||||||
|
|
||||||
|
# Check if the position is within bounds
|
||||||
|
if y < 0 or y >= height or x < 0 or x >= width:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Truncate text if it would extend beyond screen width
|
||||||
|
if x + len(text) > width:
|
||||||
|
text = text[:width - x]
|
||||||
|
|
||||||
|
try:
|
||||||
|
if color_pair is not None:
|
||||||
|
stdscr.attron(curses.color_pair(color_pair) | attrs)
|
||||||
|
|
||||||
|
stdscr.addstr(y, x, text)
|
||||||
|
|
||||||
|
if color_pair is not None:
|
||||||
|
stdscr.attroff(curses.color_pair(color_pair) | attrs)
|
||||||
|
except curses.error:
|
||||||
|
pass # Ignore any drawing errors
|
Reference in New Issue
Block a user