Compare commits

...

10 Commits

20 changed files with 1458 additions and 867 deletions

16
README.md Normal file
View File

@ -0,0 +1,16 @@
# EEE PC Alarm Clock!
Runs on Debian 12.
Requires Python 3 and mpg123.
To use, just run `main.py` from the `clock/` directory.
It includes:
- A local ncurses UI.
- A simple HTTP API
- for how to use it, check out
`alert_clients/alarm_api_client/alarm_api_client.py`.
Put your alarm sounds in `~/alarms/` as MP3 files.
License: MIT

View File

@ -1,183 +0,0 @@
import os
import time
import threading
import subprocess
import queue
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('alarm_siren.log')
]
)
logger = logging.getLogger('AlarmSiren')
class AlarmSiren:
def __init__(self):
# Communication queues
self.alarm_queue = queue.Queue()
self.control_queue = queue.Queue()
# Tracking active alarms
self.active_alarms: Dict[int, Dict[str, Any]] = {}
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_thread.start()
def schedule_alarm(self, alarm_config: Dict[str, Any]):
"""Schedule an alarm based on its configuration"""
logger.info(f"Scheduling alarm: {alarm_config}")
self.alarm_queue.put(alarm_config)
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
"""Calculate the next alarm trigger time based on repeat rule"""
now = datetime.now()
current_time = now.time()
# Parse alarm time
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
# Determine the next trigger
if alarm_config['repeat_rule']['type'] == 'once':
# For one-time alarm, check the specific date
try:
specific_date = datetime.strptime(alarm_config['repeat_rule']['at'], "%d.%m.%Y")
return datetime.combine(specific_date.date(), alarm_time)
except (KeyError, ValueError):
logger.error("Invalid one-time alarm configuration")
return None
elif alarm_config['repeat_rule']['type'] == 'daily':
# Daily alarm - trigger today or tomorrow
next_trigger = datetime.combine(now.date(), alarm_time)
if current_time < alarm_time:
return next_trigger
return next_trigger + timedelta(days=1)
elif alarm_config['repeat_rule']['type'] == 'weekly':
# Weekly alarm - check configured days
today = now.strftime("%A").lower()
configured_days = [day.lower() for day in alarm_config['repeat_rule'].get('days_of_week', [])]
if today in configured_days and current_time < alarm_time:
return datetime.combine(now.date(), alarm_time)
# Find next configured day
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
current_index = days_order.index(today)
for offset in range(1, 8):
next_day_index = (current_index + offset) % 7
next_day = days_order[next_day_index]
if next_day in configured_days:
next_date = now.date() + timedelta(days=offset)
return datetime.combine(next_date, alarm_time)
return None
def _play_audio(self, file_path: str, volume: int = 100):
"""Play audio file using mpg123"""
try:
# Ensure the file exists
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return False
# Construct mpg123 command with volume control
volume_adjust = f"-g {volume}"
cmd = ["mpg123", volume_adjust, file_path]
logger.info(f"Playing alarm: {file_path}")
# Track the process for potential interruption
process = subprocess.Popen(cmd)
return process
except Exception as e:
logger.error(f"Error playing audio: {e}")
return False
def _playback_worker(self):
"""Background thread for managing alarm playback"""
while True:
try:
# Check for new alarms to schedule
try:
new_alarm = self.alarm_queue.get(timeout=1)
alarm_time = self._calculate_next_alarm_time(new_alarm)
if alarm_time:
self.active_alarms[new_alarm.get('id', id(new_alarm))] = {
'config': new_alarm,
'trigger_time': alarm_time,
'snooze_count': 0
}
except queue.Empty:
pass
# Check for control signals (snooze/dismiss)
try:
control_msg = self.control_queue.get(timeout=0.1)
# Handle control message logic
except queue.Empty:
pass
# Check for alarms to trigger
now = datetime.now()
for alarm_id, alarm_info in list(self.active_alarms.items()):
if now >= alarm_info['trigger_time']:
# Trigger alarm
process = self._play_audio(
alarm_info['config']['file_to_play'],
alarm_info['config'].get('metadata', {}).get('volume', 100)
)
# Handle repeat and snooze logic
if process:
# Wait for user interaction or timeout
# In a real implementation, this would be more sophisticated
time.sleep(30) # Placeholder for user interaction
# Determine next trigger based on repeat rule
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
if next_trigger:
alarm_info['trigger_time'] = next_trigger
else:
del self.active_alarms[alarm_id]
time.sleep(1) # Prevent tight loop
except Exception as e:
logger.error(f"Error in playback worker: {e}")
time.sleep(1)
def snooze_alarm(self, alarm_id: int):
"""Snooze a specific alarm"""
if alarm_id in self.active_alarms:
alarm_config = self.active_alarms[alarm_id]['config']
snooze_config = alarm_config.get('snooze', {'enabled': True, 'duration': 10, 'max_count': 3})
if (snooze_config['enabled'] and
self.active_alarms[alarm_id]['snooze_count'] < snooze_config['max_count']):
# Increment snooze count
self.active_alarms[alarm_id]['snooze_count'] += 1
# Set next trigger time
snooze_duration = snooze_config.get('duration', 10)
self.active_alarms[alarm_id]['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes")
else:
logger.warning(f"Cannot snooze alarm {alarm_id} - max snooze count reached")
def dismiss_alarm(self, alarm_id: int):
"""Dismiss a specific alarm"""
if alarm_id in self.active_alarms:
logger.info(f"Dismissed alarm {alarm_id}")
del self.active_alarms[alarm_id]

View File

@ -1,290 +0,0 @@
import curses
import time
from datetime import datetime, date, timedelta
import os
from big_digits import BIG_DIGITS
from ncurses_ui_draw import _draw_big_digit, _draw_main_clock, _draw_add_alarm, _draw_list_alarms, _draw_error
from logging_config import setup_logging
# Set up logging
logger = setup_logging()
class UI:
def __init__(self, alarm_system_manager):
"""
Initialize the ncurses UI for the alarm system
Args:
alarm_system_manager (AlarmSystemManager): The main alarm system manager
"""
self.alarm_system = alarm_system_manager
self.stop_event = alarm_system_manager.stop_event
self.storage = alarm_system_manager.storage
# UI state variables
self.selected_menu = 0
self.new_alarm_name = "Alarm"
self.editing_name = " "
self.new_alarm_hour = datetime.now().hour
self.new_alarm_minute = datetime.now().minute
self.new_alarm_selected = 4
self.new_alarm_date = None
self.new_alarm_weekdays = []
self.new_alarm_enabled = True
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
self.alarms = []
self.error_message = None
self.error_timestamp = None
def run(self):
"""
Start the ncurses UI
"""
def ui_thread():
try:
curses.wrapper(self._main_loop)
except Exception as e:
logger.error(f"Error in UI thread: {e}")
print(f"Error in UI thread: {e}")
finally:
self.stop_event.set()
import threading
ui_thread_obj = threading.Thread(target=ui_thread, daemon=True)
ui_thread_obj.start()
return ui_thread_obj
def _handle_add_alarm_input(self, key):
try:
if key == 27: # Escape
# If in name editing, exit name editing
if self.editing_name:
self.new_alarm_name = self.temp_alarm_name
self.editing_name = False
return
# Otherwise return to main clock
self.selected_menu = 0
return
if key == 10: # Enter
if self.editing_name:
# Exit name editing mode
self.editing_name = False
# Move focus to time selection
self.new_alarm_selected = 0
return
try:
alarm_data = {
"name": self.new_alarm_name,
"time": f"{self.new_alarm_hour:02d}:{self.new_alarm_minute:02d}:00",
"enabled": self.new_alarm_enabled,
"repeat_rule": {
"type": "weekly" if self.new_alarm_weekdays else "once",
"days": self.new_alarm_weekdays if self.new_alarm_weekdays else [],
"date": self.new_alarm_date.strftime("%Y-%m-%d") if self.new_alarm_date else date.today().strftime("%Y-%m-%d")
}
}
self.storage.save_new_alert(alarm_data)
self.selected_menu = 0
except Exception as e:
self._show_error(str(e))
return
# Numeric input for time when on time selection
if self.new_alarm_selected in [0, 1] and not self.editing_name:
if 48 <= key <= 57: # 0-9 keys
current_digit = int(chr(key))
if self.new_alarm_selected == 0: # Hour
self.new_alarm_hour = current_digit if self.new_alarm_hour < 10 else (self.new_alarm_hour % 10 * 10 + current_digit) % 24
else: # Minute
self.new_alarm_minute = current_digit if self.new_alarm_minute < 10 else (self.new_alarm_minute % 10 * 10 + current_digit) % 60
return
# Use hjkl for navigation and selection
if key in [ord('h'), curses.KEY_LEFT]:
self.new_alarm_selected = (self.new_alarm_selected - 1) % 6
elif key in [ord('l'), curses.KEY_RIGHT]:
self.new_alarm_selected = (self.new_alarm_selected + 1) % 6
elif key in [ord('k'), curses.KEY_UP]:
if self.new_alarm_selected == 0:
self.new_alarm_hour = (self.new_alarm_hour + 1) % 24
elif self.new_alarm_selected == 1:
self.new_alarm_minute = (self.new_alarm_minute + 1) % 60
elif self.new_alarm_selected == 2:
if not self.new_alarm_date:
self.new_alarm_date = date.today()
else:
self.new_alarm_date += timedelta(days=1)
elif self.new_alarm_selected == 3 and len(self.new_alarm_weekdays) < 7:
# Add whole groups of days
if key == ord('k'):
# Options: M-F (0-4), Weekends (5-6), All days
if not self.new_alarm_weekdays:
self.new_alarm_weekdays = list(range(5)) # M-F
elif self.new_alarm_weekdays == list(range(5)):
self.new_alarm_weekdays = [5, 6] # Weekends
elif self.new_alarm_weekdays == [5, 6]:
self.new_alarm_weekdays = list(range(7)) # All days
else:
self.new_alarm_weekdays = [] # Reset
self.new_alarm_weekdays.sort()
elif key in [ord('j'), curses.KEY_DOWN]:
if self.new_alarm_selected == 0:
self.new_alarm_hour = (self.new_alarm_hour - 1) % 24
elif self.new_alarm_selected == 1:
self.new_alarm_minute = (self.new_alarm_minute - 1) % 60
elif self.new_alarm_selected == 2 and self.new_alarm_date:
self.new_alarm_date -= timedelta(days=1)
elif self.new_alarm_selected == 3:
# Cycle through weekday groups
if key == ord('j'):
if not self.new_alarm_weekdays:
self.new_alarm_weekdays = [5, 6] # Weekends
elif self.new_alarm_weekdays == [5, 6]:
self.new_alarm_weekdays = list(range(7)) # All days
elif self.new_alarm_weekdays == list(range(7)):
self.new_alarm_weekdays = list(range(5)) # M-F
else:
self.new_alarm_weekdays = [] # Reset
self.new_alarm_weekdays.sort()
elif key == 32: # Space
if self.new_alarm_selected == 4: # Name editing
if not self.editing_name:
self.editing_name = True
self.temp_alarm_name = self.new_alarm_name
self.new_alarm_name = ""
elif self.new_alarm_selected == 2: # Date
self.new_alarm_date = None
elif self.new_alarm_selected == 3: # Weekdays
# Toggle specific day when on weekday selection
current_day = len(self.new_alarm_weekdays)
if current_day < 7:
if current_day in self.new_alarm_weekdays:
self.new_alarm_weekdays.remove(current_day)
else:
self.new_alarm_weekdays.append(current_day)
self.new_alarm_weekdays.sort()
elif self.new_alarm_selected == 5: # Enabled toggle
self.new_alarm_enabled = not self.new_alarm_enabled
# Name editing handling
if self.editing_name:
if key == curses.KEY_BACKSPACE or key == 127:
self.new_alarm_name = self.new_alarm_name[:-1]
elif 32 <= key <= 126: # Printable ASCII
self.new_alarm_name += chr(key)
except Exception as e:
logger.error(f"Error: {e}")
self._show_error(str(e))
def _handle_list_alarms_input(self, key):
"""
Handle input for the list alarms screen
"""
if key == 27: # Escape
self.selected_menu = 0
elif key == ord('d'):
# Delete last alarm if exists
if self.alarms:
last_alarm = self.alarms[-1]
try:
self.storage.remove_saved_alert(last_alarm['id'])
except Exception as e:
_show_error(f"Failed to delete alarm: {e}")
logger.error(f"Failed to delete alarm: {e}")
def _show_error(self, message, duration=3):
"""Display an error message for a specified duration"""
self.error_message = message
self.error_timestamp = time.time()
def _clear_error_if_expired(self):
"""Clear error message if it has been displayed long enough"""
if self.error_message and self.error_timestamp:
if time.time() - self.error_timestamp > 3: # 3 seconds
self.error_message = None
self.error_timestamp = None
def _main_loop(self, stdscr):
"""
Main ncurses event loop
"""
# Initialize color pairs
curses.start_color()
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK)
curses.curs_set(0)
# Configure screen
stdscr.keypad(1)
stdscr.timeout(100)
time.sleep(0.2)
stdscr.clear()
while not self.stop_event.is_set():
# Clear the screen
#stdscr.clear()
stdscr.erase()
# Draw appropriate screen based on selected menu
if self.selected_menu == 0:
_draw_main_clock(stdscr)
elif self.selected_menu == 1:
_draw_add_alarm(stdscr, {
'new_alarm_selected': self.new_alarm_selected,
'new_alarm_name': self.new_alarm_name,
'editing_name': getattr(self, 'editing_name', False),
'new_alarm_hour': self.new_alarm_hour,
'new_alarm_minute': self.new_alarm_minute,
'new_alarm_enabled': self.new_alarm_enabled,
'new_alarm_date': self.new_alarm_date,
'new_alarm_weekdays': self.new_alarm_weekdays,
'weekday_names': self.weekday_names,
'new_alarm_snooze_enabled': getattr(self, 'new_alarm_snooze_enabled', False),
'new_alarm_snooze_duration': getattr(self, 'new_alarm_snooze_duration', 5),
'new_alarm_snooze_max_count': getattr(self, 'new_alarm_snooze_max_count', 3)
})
elif self.selected_menu == 2:
_draw_list_alarms(stdscr, {
'alarms': self.alarms,
'weekday_names': self.weekday_names
})
# Draw error if exists
if self.error_message:
_draw_error(stdscr, self.error_message)
self._clear_error_if_expired()
# Refresh the screen
stdscr.refresh()
# Small sleep to reduce CPU usage
time.sleep(0.2)
# Handle input
key = stdscr.getch()
if key != -1:
# Menu navigation and input handling
if key == ord('q') or key == 27: # 'q' or Escape
if self.selected_menu != 0:
self.selected_menu = 0
else:
break
elif key == ord('c'): # Clock/Home screen
self.selected_menu = 0
elif key == ord('a'): # Add Alarm
self.selected_menu = 1
elif key == ord('l'): # List Alarms
self.selected_menu = 2
self.alarms = self.storage.get_saved_alerts()
# Context-specific input handling
if self.selected_menu == 1:
self._handle_add_alarm_input(key)
elif self.selected_menu == 2:
self._handle_list_alarms_input(key)

View File

@ -1,187 +0,0 @@
from datetime import datetime
import curses
from big_digits import BIG_DIGITS
def _draw_error(stdscr, error_message):
"""Draw error message if present"""
if error_message:
height, width = stdscr.getmaxyx()
# Truncate message if too long
error_message = error_message[:width-4]
error_x = max(0, width // 2 - len(error_message) // 2)
error_y = height - 4 # Show near bottom of screen
# Red color for errors
stdscr.attron(curses.color_pair(3) | curses.A_BOLD)
stdscr.addstr(error_y, error_x, error_message)
stdscr.attroff(curses.color_pair(3) | curses.A_BOLD)
def _draw_big_digit(stdscr, y, x, digit, big_digits):
"""
Draw a big digit using predefined patterns
"""
patterns = big_digits[digit]
for i, line in enumerate(patterns):
stdscr.addstr(y + i, x, line)
def _draw_big_time(stdscr, big_digits):
"""
Draw the time in big digits
"""
current_time = datetime.now()
time_str = current_time.strftime("%H:%M:%S")
# Get terminal dimensions
height, width = stdscr.getmaxyx()
# Calculate starting position to center the big clock
digit_width = 14 # Width of each digit pattern including spacing
total_width = digit_width * len(time_str)
start_x = (width - total_width) // 2
start_y = (height - 7) // 2 - 4 # Move up a bit to make room for date
# Color for the big time
stdscr.attron(curses.color_pair(1))
for i, digit in enumerate(time_str):
_draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit, big_digits)
stdscr.attroff(curses.color_pair(1))
def _draw_main_clock(stdscr):
"""
Draw the main clock screen
"""
current_time = datetime.now()
time_str = current_time.strftime("%H:%M:%S")
date_str = current_time.strftime("%Y-%m-%d")
# Get terminal dimensions
height, width = stdscr.getmaxyx()
# Draw big time
# Note: You'll need to pass BIG_DIGITS from big_digits module when calling
_draw_big_time(stdscr, BIG_DIGITS)
# Draw date
date_x = width // 2 - len(date_str) // 2
date_y = height // 2 + 4 # Below the big clock
stdscr.attron(curses.color_pair(2))
stdscr.addstr(date_y, date_x, date_str)
stdscr.attroff(curses.color_pair(2))
# Draw menu options
menu_str = "A: Add Alarm L: List Alarms Q: Quit"
menu_x = width // 2 - len(menu_str) // 2
stdscr.addstr(height - 2, menu_x, menu_str)
def _draw_add_alarm(stdscr, context):
"""
Draw the add alarm screen
Args:
stdscr: The curses screen object
context: A dictionary containing UI state variables
"""
height, width = stdscr.getmaxyx()
form_y = height // 2 - 3
stdscr.addstr(form_y -1, width // 2 - 10, "Add New Alarm")
# Name input
if context['new_alarm_selected'] == 4:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 1, width // 2 - 10, f"Name: {context['new_alarm_name']}")
if context['new_alarm_selected'] == 4:
stdscr.attroff(curses.color_pair(2))
# Time selection
hour_str = f"{context['new_alarm_hour']:02d}"
minute_str = f"{context['new_alarm_minute']:02d}"
# Highlight selected field
if context['new_alarm_selected'] == 0:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 2, width // 2 - 2, hour_str)
if context['new_alarm_selected'] == 0:
stdscr.attroff(curses.color_pair(2))
stdscr.addstr(form_y + 2, width // 2, ":")
if context['new_alarm_selected'] == 1:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 2, width // 2 + 1, minute_str)
if context['new_alarm_selected'] == 1:
stdscr.attroff(curses.color_pair(2))
# Enabled/Disabled toggle
enabled_str = "Enabled" if context['new_alarm_enabled'] else "Disabled"
if context['new_alarm_selected'] == 5:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 4, width // 2 - len(enabled_str)//2, enabled_str)
if context['new_alarm_selected'] == 5:
stdscr.attroff(curses.color_pair(2))
# Date selection
date_str = "No specific date" if not context['new_alarm_date'] else context['new_alarm_date'].strftime("%Y-%m-%d")
if context['new_alarm_selected'] == 2:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 3, width // 2 - len(date_str) // 2, date_str)
if context['new_alarm_selected'] == 2:
stdscr.attroff(curses.color_pair(2))
# Weekday selection
weekday_str = "Repeat: " + " ".join(
context['weekday_names'][i] if i in context['new_alarm_weekdays'] else "___"
for i in range(7)
)
if context['new_alarm_selected'] == 3:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(form_y + 5, width // 2 - len(weekday_str) // 2, weekday_str)
if context['new_alarm_selected'] == 3:
stdscr.attroff(curses.color_pair(2))
# Instructions
stdscr.addstr(height - 2, 2,
"↑↓: Change ←→: Switch Space: Toggle Enter: Save Esc: Cancel")
def _draw_list_alarms(stdscr, context):
"""
Draw the list of alarms screen
Args:
stdscr: The curses screen object
context: A dictionary containing UI state variables
"""
height, width = stdscr.getmaxyx()
# Header
stdscr.addstr(2, width // 2 - 5, "Alarms")
if not context['alarms']:
stdscr.addstr(4, width // 2 - 10, "No alarms set")
else:
for i, alarm in enumerate(context['alarms'][:height-6]):
# Format time and repeat information
time_str = alarm.get('time', 'Unknown')
# Format repeat info
repeat_info = ""
repeat_rule = alarm.get('repeat_rule', {})
if repeat_rule:
if repeat_rule.get('type') == 'weekly':
days = repeat_rule.get('days', [])
repeat_info = f" (Every {', '.join(context['weekday_names'][d] for d in days)})"
elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'):
repeat_info = f" (On {repeat_rule['date']})"
# Status indicator
status = "" if alarm.get('enabled', True) else ""
display_str = f"{status} {time_str}{repeat_info}"
# Truncate if too long
display_str = display_str[:width-4]
stdscr.addstr(4 + i, 2, display_str)
stdscr.addstr(height - 2, 2, "D: Delete Enter: Edit Esc: Back")

View File

@ -1,173 +0,0 @@
#!/bin/bash
# Configuration
API_URL="http://localhost:8000"
TEST_ALARM_NAME="Test Alarm"
TEST_ALARM_TIME="08:30:00"
TEST_AUDIO_FILE="/tmp/test.mp3"
# Color codes for output
GREEN='\033[0;32m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Helper function to print status messages
print_status() {
echo -e "${BLUE}=== $1 ===${NC}"
}
# Helper function to check test results
check_result() {
if [ $? -eq 0 ]; then
echo -e "${GREEN}$1${NC}"
else
echo -e "${RED}$1${NC}"
exit 1
fi
}
# 1. Test server connectivity
print_status "Testing server connectivity"
curl -s "$API_URL" > /dev/null
check_result "Server connectivity check"
# 2. Get initial state
print_status "Getting initial state"
INITIAL_STATE=$(curl -s -X GET "$API_URL")
echo "$INITIAL_STATE" | jq .
check_result "Retrieved initial state"
# 3. Add a new alarm (POST)
print_status "Adding new alarm"
POST_DATA=$(jq -n \
--arg name "$TEST_ALARM_NAME" \
--arg time "$TEST_ALARM_TIME" \
'{
"name": $name,
"time": $time,
"repeat_rule": {
"type": "weekly",
"days_of_week": ["monday", "wednesday", "friday"]
},
"enabled": true,
"snooze": {
"enabled": true,
"duration": 5,
"max_count": 3
},
"metadata": {
"volume": 75,
"notes": "Test alarm with full configuration"
}
}'
)
ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id')
if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then
echo -e "${RED}Failed to add alarm${NC}"
echo "Response: $ADD_RESPONSE"
exit 1
fi
echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}"
# 4. Test duplicate alarm detection
print_status "Testing duplicate alarm detection"
DUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
if [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then
echo -e "${GREEN}✓ Duplicate detection working${NC}"
else
echo -e "${RED}✗ Duplicate detection failed${NC}"
echo "Response: $DUPLICATE_RESPONSE"
fi
# 5. Update the alarm (PUT)
print_status "Updating alarm"
UPDATE_DATA=$(jq -n \
--arg name "$TEST_ALARM_NAME Updated" \
--arg time "$TEST_ALARM_TIME" \
--argjson id "$NEW_ALARM_ID" \
'{
"id": $id,
"name": $name,
"time": $time,
"repeat_rule": {
"type": "daily"
},
"enabled": true,
"snooze": {
"enabled": false,
"duration": 10,
"max_count": 2
},
"metadata": {
"volume": 90,
"notes": "Updated test alarm"
}
}'
)
UPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL")
if [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then
echo -e "${GREEN}✓ Alarm update successful${NC}"
else
echo -e "${RED}✗ Alarm update failed${NC}"
echo "Response: $UPDATE_RESPONSE"
fi
# 6. Verify the update
print_status "Verifying update"
UPDATED_STATE=$(curl -s -X GET "$API_URL")
UPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)")
if [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then
echo -e "${GREEN}✓ Update verification successful${NC}"
else
echo -e "${RED}✗ Update verification failed${NC}"
echo "Current alarm state: $UPDATED_ALARM"
fi
# 7. Test invalid inputs
print_status "Testing invalid inputs"
# Test invalid time format
INVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}')
INVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL")
if [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
echo -e "${GREEN}✓ Invalid time format detection working${NC}"
else
echo -e "${RED}✗ Invalid time format detection failed${NC}"
fi
# Test invalid repeat rule
INVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"')
INVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL")
if [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}"
else
echo -e "${RED}✗ Invalid repeat rule detection failed${NC}"
fi
# 8. Delete the test alarm
print_status "Deleting test alarm"
DELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL")
if [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then
echo -e "${GREEN}✓ Alarm deletion successful${NC}"
else
echo -e "${RED}✗ Alarm deletion failed${NC}"
echo "Response: $DELETE_RESPONSE"
fi
# 9. Verify deletion
print_status "Verifying deletion"
FINAL_STATE=$(curl -s -X GET "$API_URL")
if [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then
echo -e "${GREEN}✓ Deletion verification successful${NC}"
else
echo -e "${RED}✗ Deletion verification failed${NC}"
echo "Current state: $FINAL_STATE"
fi
print_status "Test suite completed!"

View File

@ -134,17 +134,6 @@ class AlertApi(BaseHTTPRequestHandler):
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG to show all log levels
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
logger.info(f"Starting AlertApi on port {port}") logger.info(f"Starting AlertApi on port {port}")
server_address = ("", port) server_address = ("", port)

260
clock/alarm_siren.py Normal file
View File

@ -0,0 +1,260 @@
import os
import time
import threading
import subprocess
import queue
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from logging_config import setup_logging
# Set up logging
logger = setup_logging()
class AlarmSiren:
def __init__(self):
# Communication queues
self.alarm_queue = queue.Queue()
self.control_queue = queue.Queue()
# Tracking active alarms
self.active_alarms: Dict[int, Dict[str, Any]] = {}
# Playback thread
self.playback_thread = threading.Thread(target=self._playback_worker, daemon=True)
self.playback_thread.start()
def schedule_alarm(self, alarm_config: Dict[str, Any]):
"""Schedule an alarm based on its configuration"""
logger.info(f"Scheduling alarm: {alarm_config}")
self.alarm_queue.put(alarm_config)
def _calculate_next_alarm_time(self, alarm_config: Dict[str, Any]) -> Optional[datetime]:
"""Calculate the next alarm trigger time based on repeat rule"""
now = datetime.now()
current_time = now.time()
# Parse alarm time
alarm_time = datetime.strptime(alarm_config['time'], "%H:%M:%S").time()
# Determine the next trigger
repeat_rule = alarm_config['repeat_rule']
# Determine repeat rule type and details
if isinstance(repeat_rule, dict):
repeat_type = repeat_rule.get('type')
repeat_days = repeat_rule.get('days_of_week', [])
repeat_at = repeat_rule.get('at')
else:
# Assume it's an object-like structure with attributes
repeat_type = getattr(repeat_rule, 'type', None)
repeat_days = getattr(repeat_rule, 'days_of_week', [])
repeat_at = getattr(repeat_rule, 'at', None)
# Sanity check
if repeat_type is None:
logger.error(f"Invalid repeat rule configuration: {repeat_rule}")
return None
if repeat_type == 'once':
# For one-time alarm, check the specific date
try:
# If 'at' is None, use current date
if repeat_at is None:
specific_date = now.date()
else:
specific_date = datetime.strptime(repeat_at, "%d.%m.%Y").date()
return datetime.combine(specific_date, alarm_time)
except ValueError:
logger.error("Invalid one-time alarm configuration")
return None
elif repeat_type == 'daily':
# Daily alarm - trigger today or tomorrow
next_trigger = datetime.combine(now.date(), alarm_time)
if current_time < alarm_time:
return next_trigger
return next_trigger + timedelta(days=1)
elif repeat_type == 'weekly':
# Weekly alarm - check configured days
today = now.strftime("%A").lower()
configured_days = [day.lower() for day in repeat_days]
if today in configured_days and current_time < alarm_time:
return datetime.combine(now.date(), alarm_time)
# Find next configured day
days_order = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
current_index = days_order.index(today)
for offset in range(1, 8):
next_day_index = (current_index + offset) % 7
next_day = days_order[next_day_index]
if next_day in configured_days:
next_date = now.date() + timedelta(days=offset)
return datetime.combine(next_date, alarm_time)
return None
def _play_audio(self, file_path: str, volume: int = 100):
"""Play audio file using mpg123 in the background."""
try:
if not os.path.exists(file_path):
logger.error(f"Audio file not found: {file_path}")
return None
# Construct mpg123 command with volume control
volume_adjust = f"-g {volume}"
cmd = ["mpg123", volume_adjust, file_path]
logger.info(f"Playing alarm: {file_path}")
# Run mpg123 in the background, suppressing stdout/stderr
process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return process
except Exception as e:
logger.error(f"Error playing audio: {e}")
return None
def _playback_worker(self):
"""Background thread for managing alarm playback"""
while True:
try:
# Check for new alarms to schedule
try:
new_alarm = self.alarm_queue.get(timeout=1)
alarm_time = self._calculate_next_alarm_time(new_alarm)
if alarm_time:
alarm_id = new_alarm.get('id', id(new_alarm))
self.active_alarms[alarm_id] = {
'config': new_alarm,
'trigger_time': alarm_time,
'snooze_count': 0,
'process': None
}
except queue.Empty:
pass
# Check for control signals (snooze/dismiss)
try:
control_msg = self.control_queue.get(timeout=0.1)
alarm_id = control_msg.get('alarm_id')
if control_msg['type'] == 'snooze':
self.snooze_alarm(alarm_id)
elif control_msg['type'] == 'dismiss':
self.dismiss_alarm(alarm_id)
except queue.Empty:
pass
# Check for alarms to trigger
now = datetime.now()
for alarm_id, alarm_info in list(self.active_alarms.items()):
# Check if alarm is more than 1 hour late
if now > alarm_info['trigger_time'] + timedelta(hours=1):
logger.warning(f"Alarm {alarm_id} is over 1 hour late. Disabling.")
del self.active_alarms[alarm_id]
continue
if now >= alarm_info['trigger_time']:
# Trigger alarm if not already active
if alarm_info['process'] is None:
alarm_info['process'] = self._play_audio(
alarm_info['config']['file_to_play'],
alarm_info['config'].get('metadata', {}).get('volume', 100)
)
logger.info(f"Alarm {alarm_id} triggered at {now}.")
# Notify UI about the triggered alarm
self.control_queue.put({
'type': 'trigger',
'alarm_id': alarm_id,
'info': alarm_info
})
# Handle alarms that have been snoozed or dismissed
if alarm_info['process'] and alarm_info['process'].poll() is not None:
# Process has finished naturally
next_trigger = self._calculate_next_alarm_time(alarm_info['config'])
if next_trigger:
alarm_info['trigger_time'] = next_trigger
alarm_info['process'] = None
else:
logger.info(f"Removing non-repeating alarm {alarm_id}.")
del self.active_alarms[alarm_id]
# Actively clean up zombie processes
for alarm_id, alarm_info in list(self.active_alarms.items()):
process = alarm_info.get('process')
if process and process.poll() is not None:
# Remove terminated processes
alarm_info['process'] = None
time.sleep(0.5) # Prevent tight loop
except Exception as e:
logger.error(f"Error in playback worker: {e}")
time.sleep(1)
def snooze_alarm(self, alarm_id: int):
"""Snooze a specific alarm"""
if alarm_id not in self.active_alarms:
logger.warning(f"Cannot snooze alarm {alarm_id} - not found in active alarms")
return False
alarm_info = self.active_alarms[alarm_id]
alarm_config = alarm_info['config']
# Default snooze configuration if not provided
snooze_config = alarm_config.get('snooze', {
'enabled': True,
'duration': 5, # Default 5 minutes
'max_count': 3 # Default max 3 snoozes
})
if not snooze_config.get('enabled', True):
logger.warning(f"Snooze not enabled for alarm {alarm_id}")
return False
# Check snooze count
if alarm_info.get('snooze_count', 0) >= snooze_config['max_count']:
logger.warning(f"Maximum snooze count reached for alarm {alarm_id}")
return False
# Increment snooze count and set next trigger time
alarm_info['snooze_count'] = alarm_info.get('snooze_count', 0) + 1
snooze_duration = snooze_config.get('duration', 5)
alarm_info['trigger_time'] = datetime.now() + timedelta(minutes=snooze_duration)
# Stop any active playback
process = alarm_info.get('process')
if process:
process.terminate()
process.wait()
alarm_info['process'] = None
logger.info(f"Snoozed alarm {alarm_id} for {snooze_duration} minutes.")
return True
def dismiss_alarm(self, alarm_id: int):
"""Dismiss a specific alarm."""
if alarm_id not in self.active_alarms:
logger.warning(f"Cannot dismiss alarm {alarm_id} - not found in active alarms")
return False
# Stop playback and terminate any running process
alarm_info = self.active_alarms.pop(alarm_id, None)
if alarm_info and alarm_info.get('process'):
try:
alarm_info['process'].terminate()
alarm_info['process'].wait(timeout=2)
except Exception as e:
logger.error(f"Error terminating alarm process: {e}")
# Force kill if terminate fails
try:
alarm_info['process'].kill()
except Exception:
pass
logger.info(f"Dismissed alarm {alarm_id}.")
return True

View File

@ -4,20 +4,10 @@ from typing import List, Optional, Dict, Any
from datetime import datetime from datetime import datetime
import os import os
import re import re
import logging from logging_config import setup_logging
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG to show all log levels
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
# Set up logging
logger = setup_logging()
@dataclass @dataclass
class RepeatRule: class RepeatRule:
@ -28,13 +18,22 @@ class RepeatRule:
def validate(self) -> bool: def validate(self) -> bool:
"""Validate repeat rule configuration""" """Validate repeat rule configuration"""
valid_types = {"daily", "weekly", "once"} valid_types = {"daily", "weekly", "once"}
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} valid_days = {"mon", "tue", "wed", "thu", "fri", "sat", "sun"}
if self.type not in valid_types: if self.type not in valid_types:
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
return False return False
if self.type == "weekly": if self.type == "weekly":
if self.days_of_week is None:
logger.error("days_of_week is None")
return False
if not self.days_of_week:
logger.error("days_of_week is empty")
return False
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
if invalid_days: if invalid_days:
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")

View File

@ -5,14 +5,16 @@ import sys
import signal import signal
import threading import threading
import logging import logging
import curses
from http.server import HTTPServer from http.server import HTTPServer
from multiprocessing import Queue from multiprocessing import Queue
import subprocess
# Import our custom modules # Import our custom modules
from alarm_api import AlertApi, run as run_api from alarm_api import AlertApi, run as run_api
from alarm_storage import AlarmStorage from alarm_storage import AlarmStorage
from alarm_siren import AlarmSiren from alarm_siren import AlarmSiren
from ncurses_ui import UI from ui.ncurses_ui import UI
from logging_config import setup_logging from logging_config import setup_logging
# Set up logging # Set up logging
@ -56,7 +58,7 @@ class AlarmSystemManager:
self._sync_alarms() self._sync_alarms()
# UI.. # UI..
self.ui = UI(self) self.ui = UI(self, control_queue=self.siren.control_queue)
def _setup_signal_handlers(self): def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown""" """Set up signal handlers for graceful shutdown"""
@ -153,6 +155,12 @@ class AlarmSystemManager:
if self.api_thread and self.api_thread.is_alive(): if self.api_thread and self.api_thread.is_alive():
self.api_thread.join(timeout=2) self.api_thread.join(timeout=2)
# Kill any remaining mpg123 processes
try:
subprocess.run(['pkill', 'mpg123'], check=False)
except Exception as e:
logger.error(f"Error killing mpg123 processes: {e}")
logger.info("Alarm System shutdown complete") logger.info("Alarm System shutdown complete")
def main(): def main():

173
clock/tests.sh Normal file
View File

@ -0,0 +1,173 @@
#!/bin/bash
# Configuration
API_URL="http://localhost:8000"
TEST_ALARM_NAME="Test Alarm"
TEST_ALARM_TIME="08:30:00"
TEST_AUDIO_FILE="/tmp/test.mp3"
# Color codes for output
GREEN='\033[0;32m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Helper function to print status messages
print_status() {
echo -e "${BLUE}=== $1 ===${NC}"
}
# Helper function to check test results
check_result() {
if [ $? -eq 0 ]; then
echo -e "${GREEN}$1${NC}"
else
echo -e "${RED}$1${NC}"
exit 1
fi
}
# 1. Test server connectivity
print_status "Testing server connectivity"
curl -s "$API_URL" > /dev/null
check_result "Server connectivity check"
# 2. Get initial state
print_status "Getting initial state"
INITIAL_STATE=$(curl -s -X GET "$API_URL")
echo "$INITIAL_STATE" | jq .
check_result "Retrieved initial state"
# 3. Add a new alarm (POST)
print_status "Adding new alarm"
POST_DATA=$(jq -n \
--arg name "$TEST_ALARM_NAME" \
--arg time "$TEST_ALARM_TIME" \
'{
"name": $name,
"time": $time,
"repeat_rule": {
"type": "weekly",
"days_of_week": ["monday", "wednesday", "friday"]
},
"enabled": true,
"snooze": {
"enabled": true,
"duration": 5,
"max_count": 3
},
"metadata": {
"volume": 75,
"notes": "Test alarm with full configuration"
}
}'
)
ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id')
if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then
echo -e "${RED}Failed to add alarm${NC}"
echo "Response: $ADD_RESPONSE"
exit 1
fi
echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}"
#j# 4. Test duplicate alarm detection
#jprint_status "Testing duplicate alarm detection"
#jDUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
#jif [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then
#j echo -e "${GREEN}✓ Duplicate detection working${NC}"
#jelse
#j echo -e "${RED}✗ Duplicate detection failed${NC}"
#j echo "Response: $DUPLICATE_RESPONSE"
#jfi
#j
#j# 5. Update the alarm (PUT)
#jprint_status "Updating alarm"
#jUPDATE_DATA=$(jq -n \
#j --arg name "$TEST_ALARM_NAME Updated" \
#j --arg time "$TEST_ALARM_TIME" \
#j --argjson id "$NEW_ALARM_ID" \
#j '{
#j "id": $id,
#j "name": $name,
#j "time": $time,
#j "repeat_rule": {
#j "type": "daily"
#j },
#j "enabled": true,
#j "snooze": {
#j "enabled": false,
#j "duration": 10,
#j "max_count": 2
#j },
#j "metadata": {
#j "volume": 90,
#j "notes": "Updated test alarm"
#j }
#j }'
#j)
#j
#jUPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL")
#jif [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then
#j echo -e "${GREEN}✓ Alarm update successful${NC}"
#jelse
#j echo -e "${RED}✗ Alarm update failed${NC}"
#j echo "Response: $UPDATE_RESPONSE"
#jfi
#j
#j# 6. Verify the update
#jprint_status "Verifying update"
#jUPDATED_STATE=$(curl -s -X GET "$API_URL")
#jUPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)")
#jif [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then
#j echo -e "${GREEN}✓ Update verification successful${NC}"
#jelse
#j echo -e "${RED}✗ Update verification failed${NC}"
#j echo "Current alarm state: $UPDATED_ALARM"
#jfi
#j
#j# 7. Test invalid inputs
#jprint_status "Testing invalid inputs"
#j
#j# Test invalid time format
#jINVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}')
#jINVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL")
#jif [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
#j echo -e "${GREEN}✓ Invalid time format detection working${NC}"
#jelse
#j echo -e "${RED}✗ Invalid time format detection failed${NC}"
#jfi
#j
#j# Test invalid repeat rule
#jINVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"')
#jINVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL")
#jif [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
#j echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}"
#jelse
#j echo -e "${RED}✗ Invalid repeat rule detection failed${NC}"
#jfi
#j
#j# 8. Delete the test alarm
#jprint_status "Deleting test alarm"
#jDELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL")
#jif [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then
#j echo -e "${GREEN}✓ Alarm deletion successful${NC}"
#jelse
#j echo -e "${RED}✗ Alarm deletion failed${NC}"
#j echo "Response: $DELETE_RESPONSE"
#jfi
#j
#j# 9. Verify deletion
#jprint_status "Verifying deletion"
#jFINAL_STATE=$(curl -s -X GET "$API_URL")
#jif [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then
#j echo -e "${GREEN}✓ Deletion verification successful${NC}"
#jelse
#j echo -e "${RED}✗ Deletion verification failed${NC}"
#j echo "Current state: $FINAL_STATE"
#jfi
#j
#jprint_status "Test suite completed!"

0
clock/ui/__init__.py Normal file
View File

115
clock/ui/active_alarm.py Normal file
View File

@ -0,0 +1,115 @@
import curses
from datetime import datetime
from .utils import init_colors, draw_big_digit
class ActiveAlarmView:
def __init__(self, storage, control_queue):
self.storage = storage
self.control_queue = control_queue
self.active_alarms = {}
def reset_state(self):
"""Reset the view state"""
self.active_alarms.clear()
def draw(self, stdscr):
"""Draw the active alarm screen"""
init_colors()
height, width = stdscr.getmaxyx()
current_time = datetime.now()
self._draw_main_clock(stdscr, height, width, current_time)
self._draw_active_alarm_info(stdscr, height, width)
self._draw_instructions(stdscr, height, width)
def handle_input(self, key):
"""Handle user input and return the next view name or None to stay"""
if not self.active_alarms:
return 'CLOCK'
alarm_id = list(self.active_alarms.keys())[0]
if key == ord('s'):
self._handle_snooze(alarm_id)
return None
elif key == ord('d'):
self._handle_dismiss(alarm_id)
return 'CLOCK'
return None
def update_active_alarms(self, active_alarms):
"""Update the active alarms state"""
self.active_alarms = active_alarms
def _draw_main_clock(self, stdscr, height, width, current_time):
"""Draw the main clock display"""
time_str = current_time.strftime("%H:%M:%S")
digit_width = 14
total_width = digit_width * len(time_str)
start_x = (width - total_width) // 2
start_y = (height - 7) // 2 - 4
# Draw blinking dot
if int(current_time.timestamp()) % 2 == 0:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(start_y - 1, start_x + total_width - 2, "")
stdscr.attroff(curses.color_pair(1))
# Draw big time digits
stdscr.attron(curses.color_pair(1))
for i, digit in enumerate(time_str):
draw_big_digit(stdscr, start_y, start_x + i * digit_width, digit)
stdscr.attroff(curses.color_pair(1))
# Draw date
date_str = current_time.strftime("%Y-%m-%d")
date_x = width // 2 - len(date_str) // 2
date_y = height // 2 + 4
stdscr.attron(curses.color_pair(2))
stdscr.addstr(date_y, date_x, date_str)
stdscr.attroff(curses.color_pair(2))
def _draw_active_alarm_info(self, stdscr, height, width):
"""Draw information about the active alarm"""
if not self.active_alarms:
return
alarm_id = list(self.active_alarms.keys())[0]
alarm_info = self.active_alarms[alarm_id]
alarm_config = alarm_info['config']
alarm_name = alarm_config.get('name', 'Unnamed Alarm')
alarm_time = alarm_config.get('time', 'Unknown Time')
alarm_str = f"[ {alarm_name} - {alarm_time} ]"
# Position alarm info above the date
date_y = height // 2 + 4 # Same as in _draw_main_clock
alarm_y = date_y - 2
alarm_x = max(0, width // 2 - len(alarm_str) // 2)
stdscr.attron(curses.color_pair(1))
stdscr.addstr(alarm_y, alarm_x, alarm_str)
stdscr.attroff(curses.color_pair(1))
def _draw_instructions(self, stdscr, height, width):
"""Draw the user instructions"""
if self.active_alarms:
instructions = "S: Snooze D: Dismiss"
stdscr.addstr(height - 2, width // 2 - len(instructions) // 2, instructions)
def _handle_snooze(self, alarm_id):
"""Handle snoozing the alarm"""
self.control_queue.put({
'type': 'snooze',
'alarm_id': alarm_id
})
del self.active_alarms[alarm_id]
def _handle_dismiss(self, alarm_id):
"""Handle dismissing the alarm"""
self.control_queue.put({
'type': 'dismiss',
'alarm_id': alarm_id
})
del self.active_alarms[alarm_id]

298
clock/ui/add_alarm.py Normal file
View File

@ -0,0 +1,298 @@
import curses
from datetime import datetime, date, timedelta
class AddAlarmView:
def __init__(self, storage, control_queue):
self.storage = storage
self.control_queue = control_queue
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
self.reset_state()
def set_alarm_data(self, alarm):
"""Load existing alarm data for editing."""
self.alarm_data = alarm # Store for pre-filling the form
# Debug log to inspect the alarm structure
import logging
logging.getLogger(__name__).debug(f"set_alarm_data received: {alarm}")
# Ensure alarm is a dictionary
if not isinstance(alarm, dict):
logging.getLogger(__name__).error(f"Invalid alarm format: {type(alarm)}")
# Parse time string
hour, minute, _ = map(int, alarm['time'].split(':')) # Extract hour and minute return
# Pre-fill the form fields with alarm data
self.alarm_draft = {
'hour': hour,
'minute': minute,
'name': alarm.get('name', 'Unnamed Alarm'),
'enabled': alarm.get('enabled', True),
'date': None, # Your structure doesn't use date directly
'weekdays': alarm.get('repeat_rule', {}).get('days_of_week', []),
'current_weekday': 0,
'editing_name': False,
'temp_name': alarm.get('name', 'Unnamed Alarm'),
'selected_item': 0
}
def reset_state(self):
"""Reset all state variables to their initial values"""
self.alarm_draft = {
'hour': datetime.now().hour,
'minute': datetime.now().minute,
'name': 'New Alarm',
'enabled': True,
'date': None,
'weekdays': [],
'current_weekday': 0,
'editing_name': False,
'temp_name': '',
'selected_item': 0
}
self.date_edit_pos = 2 # Default to editing the day
def draw(self, stdscr):
"""Draw the add alarm screen"""
height, width = stdscr.getmaxyx()
form_y = height // 2 - 8
self._draw_title(stdscr, form_y, width)
self._draw_time_field(stdscr, form_y + 2, width)
self._draw_date_field(stdscr, form_y + 4, width)
self._draw_weekdays(stdscr, form_y + 6, width)
self._draw_name_field(stdscr, form_y + 8, width)
self._draw_status_field(stdscr, form_y + 10, width)
self._draw_instructions(stdscr, height - 2, width)
def handle_input(self, key):
"""Handle user input and return the next view name or None to stay"""
if key == 27: # ESC
return self._handle_escape()
elif key == 10: # ENTER
return self._handle_enter()
if not self.alarm_draft['editing_name']:
if key in [ord('h'), curses.KEY_LEFT]:
self.alarm_draft['selected_item'] = (self.alarm_draft['selected_item'] - 1) % 6
elif key in [ord('l'), curses.KEY_RIGHT]:
self.alarm_draft['selected_item'] = (self.alarm_draft['selected_item'] + 1) % 6
self._handle_field_input(key)
return None
def _handle_field_input(self, key):
"""Handle input for the currently selected field"""
selected_item = self.alarm_draft['selected_item']
if key in [ord('k'), curses.KEY_UP, ord('j'), curses.KEY_DOWN]:
is_up = key in [ord('k'), curses.KEY_UP]
if selected_item == 0:
self._adjust_hour(is_up)
elif selected_item == 1:
self._adjust_minute(is_up)
elif selected_item == 2:
self._adjust_date(is_up)
elif selected_item == 3:
self._handle_weekday_input(key)
elif selected_item == 4:
self._handle_name_input(key)
elif selected_item == 5 and key == 32:
self.alarm_draft['enabled'] = not self.alarm_draft['enabled']
def _draw_title(self, stdscr, y, width):
"""Draw the title of the form"""
title = "Add New Alarm"
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(y, width // 2 - len(title) // 2, title)
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
def _draw_field(self, stdscr, y, label, value, is_selected):
"""Draw a form field with label and value"""
label_str = f"{label}: "
x = self._center_text_x(label_str + str(value))
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, label_str)
stdscr.attroff(curses.color_pair(1))
if is_selected:
stdscr.attron(curses.color_pair(2))
stdscr.addstr(y, x + len(label_str), str(value))
if is_selected:
stdscr.attroff(curses.color_pair(2))
def _draw_time_field(self, stdscr, y, width):
"""Draw the time field"""
self._draw_field(stdscr, y, "Time",
f"{self.alarm_draft['hour']:02d}:{self.alarm_draft['minute']:02d}",
self.alarm_draft['selected_item'] in [0, 1])
def _draw_date_field(self, stdscr, y, width):
"""Draw the date field"""
date_str = "Repeating weekly" if self.alarm_draft['weekdays'] else (
self.alarm_draft['date'].strftime("%Y-%m-%d") if self.alarm_draft['date'] else "None"
)
self._draw_field(stdscr, y, "Date", date_str,
self.alarm_draft['selected_item'] == 2)
def _draw_weekdays(self, stdscr, y, width):
"""Draw the weekday selection field"""
label_x = width // 2 - 20
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, label_x, "Repeat: ")
stdscr.attroff(curses.color_pair(1))
weekday_x = label_x + len("Repeat: ")
for i, day in enumerate(self.weekday_names):
self._draw_weekday(stdscr, y, weekday_x + i * 4, day, i)
def _draw_weekday(self, stdscr, y, x, day, index):
"""Draw a single weekday"""
is_selected = (self.alarm_draft['selected_item'] == 3 and
index == self.alarm_draft['current_weekday'])
is_active = index in self.alarm_draft['weekdays']
if is_selected:
stdscr.attron(curses.color_pair(2))
elif is_active:
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
else:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, day)
if is_selected:
stdscr.attroff(curses.color_pair(2))
elif is_active:
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
else:
stdscr.attroff(curses.color_pair(1))
def _draw_name_field(self, stdscr, y, width):
"""Draw the name field"""
self._draw_field(stdscr, y, "Name", self.alarm_draft['name'],
self.alarm_draft['selected_item'] == 4)
def _draw_status_field(self, stdscr, y, width):
"""Draw the enabled/disabled status field"""
enabled_str = "● Enabled" if self.alarm_draft['enabled'] else "○ Disabled"
self._draw_field(stdscr, y, "Status", enabled_str,
self.alarm_draft['selected_item'] == 5)
def _draw_instructions(self, stdscr, y, width):
"""Draw the instructions at the bottom of the screen"""
instructions = "j/k: Change h/l: Move Space: Toggle Enter: Save Esc: Cancel"
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, width // 2 - len(instructions) // 2, instructions)
stdscr.attroff(curses.color_pair(1))
def _center_text_x(self, text):
"""Calculate x position to center text"""
return curses.COLS // 2 - len(text) // 2
def _adjust_hour(self, increase):
"""Adjust the hour value"""
delta = 1 if increase else -1
self.alarm_draft['hour'] = (self.alarm_draft['hour'] + delta) % 24
def _adjust_minute(self, increase):
"""Adjust the minute value"""
delta = 1 if increase else -1
self.alarm_draft['minute'] = (self.alarm_draft['minute'] + delta) % 60
def _adjust_date(self, increase):
"""Adjust the date value"""
if not self.alarm_draft['date']:
self.alarm_draft['date'] = datetime.now().date()
return
delta = 1 if increase else -1
current_date = self.alarm_draft['date']
try:
if self.date_edit_pos == 0: # Year
new_year = max(current_date.year + delta, datetime.now().year)
self.alarm_draft['date'] = current_date.replace(year=new_year)
elif self.date_edit_pos == 1: # Month
new_month = max(1, min(12, current_date.month + delta))
max_day = (datetime(current_date.year, new_month, 1) +
timedelta(days=31)).replace(day=1) - timedelta(days=1)
self.alarm_draft['date'] = current_date.replace(
month=new_month,
day=min(current_date.day, max_day.day)
)
elif self.date_edit_pos == 2: # Day
max_day = (datetime(current_date.year, current_date.month, 1) +
timedelta(days=31)).replace(day=1) - timedelta(days=1)
new_day = max(1, min(max_day.day, current_date.day + delta))
self.alarm_draft['date'] = current_date.replace(day=new_day)
except ValueError as e:
# Handle date validation errors
pass
def _handle_weekday_input(self, key):
"""Handle input for weekday selection"""
if key in [ord('h'), curses.KEY_LEFT]:
self.alarm_draft['current_weekday'] = (self.alarm_draft['current_weekday'] - 1) % 7
elif key in [ord('l'), curses.KEY_RIGHT]:
self.alarm_draft['current_weekday'] = (self.alarm_draft['current_weekday'] + 1) % 7
elif key == 32: # SPACE
current_day = self.alarm_draft['current_weekday']
if current_day in self.alarm_draft['weekdays']:
self.alarm_draft['weekdays'].remove(current_day)
else:
self.alarm_draft['weekdays'].append(current_day)
self.alarm_draft['weekdays'].sort()
if self.alarm_draft['weekdays']:
self.alarm_draft['date'] = None
def _handle_name_input(self, key):
"""Handle input for name editing"""
if key == 32: # SPACE
if not self.alarm_draft['editing_name']:
self.alarm_draft['editing_name'] = True
self.alarm_draft['temp_name'] = self.alarm_draft['name']
self.alarm_draft['name'] = ''
elif self.alarm_draft['editing_name']:
if key == curses.KEY_BACKSPACE or key == 127:
self.alarm_draft['name'] = self.alarm_draft['name'][:-1]
elif 32 <= key <= 126: # Printable ASCII
self.alarm_draft['name'] += chr(key)
def _handle_escape(self):
"""Handle escape key press"""
if self.alarm_draft['editing_name']:
self.alarm_draft['name'] = self.alarm_draft['temp_name']
self.alarm_draft['editing_name'] = False
return None
return 'CLOCK'
def _handle_enter(self):
"""Handle enter key press"""
if self.alarm_draft['editing_name']:
self.alarm_draft['editing_name'] = False
self.alarm_draft['selected_item'] = 0
return None
try:
alarm_data = {
"name": self.alarm_draft['name'],
"time": f"{self.alarm_draft['hour']:02d}:{self.alarm_draft['minute']:02d}:00",
"enabled": self.alarm_draft['enabled'],
"repeat_rule": {
"type": "weekly" if self.alarm_draft['weekdays'] else "once",
"days_of_week": [self.weekday_names[day].lower()
for day in self.alarm_draft['weekdays']],
"at": (self.alarm_draft['date'].strftime("%Y-%m-%d")
if self.alarm_draft['date'] and not self.alarm_draft['weekdays']
else None)
}
}
self.storage.save_new_alert(alarm_data)
return 'CLOCK'
except Exception as e:
# Handle save errors
return None

View File

@ -10,13 +10,13 @@ BIG_DIGITS = {
" █████████ " " █████████ "
], ],
'1': [ '1': [
" ███ ", " ███ ",
" █████ ", " █████ ",
" ███ ", " ███ ",
" ███ ", " ███ ",
" ███ ", " ███ ",
" ███ ", " ███ ",
" ███████ " " █████████ "
], ],
'2': [ '2': [
" ██████████ ", " ██████████ ",
@ -98,5 +98,13 @@ BIG_DIGITS = {
" ████ ", " ████ ",
" ████ ", " ████ ",
" " " "
],
'?': [
" ??? ",
" ????? ",
"?? ??",
" ??? ",
" ?? ?? ",
" ? "
] ]
} }

170
clock/ui/list_alarms.py Normal file
View File

@ -0,0 +1,170 @@
import curses
from datetime import datetime
class ListAlarmsView:
def __init__(self, storage):
"""Initialize the list alarms view"""
self.storage = storage
self.weekday_names = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
self.selected_index = 0
self.alarms = []
def reset_state(self):
"""Reset the view state"""
self.selected_index = 0
self.alarms = self.storage.get_saved_alerts()
def update_alarms(self, alarms):
"""Update the list of alarms to display."""
self.alarms = alarms
self.selected_index = 0
def get_selected_alarm(self):
"""Get the currently selected alarm."""
if 0 <= self.selected_index < len(self.alarms):
return self.alarms[self.selected_index]
return None
def draw(self, stdscr):
"""Draw the list of alarms screen"""
height, width = stdscr.getmaxyx()
self._draw_header(stdscr, width)
visible_range = self._calculate_visible_range(height)
self._draw_alarm_list(stdscr, height, width, visible_range)
self._draw_instructions(stdscr, height, width)
def handle_input(self, key):
"""Handle user input and return the next view name or None to stay"""
total_items = len(self.alarms) + 1 # +1 for "Add new alarm" option
if key == 27: # ESC
return 'CLOCK'
elif key in [ord('j'), curses.KEY_DOWN]:
self.selected_index = (self.selected_index + 1) % total_items
elif key in [ord('k'), curses.KEY_UP]:
self.selected_index = (self.selected_index - 1) % total_items
elif key == ord('d'):
return self._handle_delete()
elif key in [ord('a'), 10]: # 'a' or Enter
return self._handle_add_edit()
return None
def _calculate_visible_range(self, height):
"""Calculate the visible range for scrolling"""
max_visible_items = height - 8 # Space for header and footer
total_items = len(self.alarms) + 1 # +1 for "Add new alarm" option
# Calculate scroll position
start_idx = max(0, min(self.selected_index - max_visible_items // 2,
total_items - max_visible_items))
if start_idx < 0:
start_idx = 0
end_idx = min(start_idx + max_visible_items, total_items)
return (start_idx, end_idx)
def _draw_header(self, stdscr, width):
"""Draw the header text"""
header_text = "Alarms"
stdscr.attron(curses.color_pair(1) | curses.A_BOLD)
stdscr.addstr(2, self._center_x(width, header_text), header_text)
stdscr.attroff(curses.color_pair(1) | curses.A_BOLD)
def _draw_alarm_list(self, stdscr, height, width, visible_range):
"""Draw the list of alarms"""
start_idx, end_idx = visible_range
for i in range(start_idx, end_idx):
y_pos = 4 + (i - start_idx)
display_str = self._format_alarm_display(i)
# Truncate if too long
max_length = width - 6
if len(display_str) > max_length:
display_str = display_str[:max_length-3] + "..."
x_pos = self._center_x(width, display_str)
self._draw_alarm_item(stdscr, y_pos, x_pos, display_str, i == self.selected_index)
def _format_alarm_display(self, index):
"""Format the display string for an alarm"""
if index == len(self.alarms):
return "Add new alarm..."
alarm = self.alarms[index]
time_str = alarm.get('time', 'Unknown')
repeat_info = self._format_repeat_info(alarm)
status = "" if alarm.get('enabled', True) else ""
return f"{status} {time_str} {alarm.get('name', 'Unnamed')}{repeat_info}"
def _format_repeat_info(self, alarm):
"""Format the repeat information for an alarm"""
repeat_rule = alarm.get('repeat_rule', {})
if not repeat_rule:
return ""
if repeat_rule.get('type') == 'weekly':
days = repeat_rule.get('days', [])
return f" (Every {', '.join(self.weekday_names[d] for d in days)})"
elif repeat_rule.get('type') == 'once' and repeat_rule.get('date'):
return f" (On {repeat_rule['date']})"
return ""
def _draw_alarm_item(self, stdscr, y_pos, x_pos, display_str, is_selected):
"""Draw a single alarm item"""
if is_selected:
# Draw selection brackets in green
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y_pos, x_pos - 2, "[ ")
stdscr.addstr(y_pos, x_pos + len(display_str), " ]")
stdscr.attroff(curses.color_pair(1))
# Draw text in yellow (highlighted)
stdscr.attron(curses.color_pair(2))
stdscr.addstr(y_pos, x_pos, display_str)
stdscr.attroff(curses.color_pair(2))
else:
# Draw normal items in green
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y_pos, x_pos, display_str)
stdscr.attroff(curses.color_pair(1))
def _draw_instructions(self, stdscr, height, width):
"""Draw the instructions at the bottom of the screen"""
instructions = "j/k: Move d: Delete a: Add/Edit Esc: Back"
stdscr.attron(curses.color_pair(1))
stdscr.addstr(height - 2, self._center_x(width, instructions), instructions)
stdscr.attroff(curses.color_pair(1))
def _center_x(self, width, text):
"""Calculate x coordinate to center text"""
return width // 2 - len(text) // 2
def _handle_delete(self):
"""Handle alarm deletion"""
if self.selected_index < len(self.alarms):
try:
alarm_to_delete = self.alarms[self.selected_index]
self.storage.remove_saved_alert(alarm_to_delete['id'])
self.alarms = self.storage.get_saved_alerts()
# Adjust selected item if needed
if self.selected_index >= len(self.alarms):
self.selected_index = len(self.alarms)
except Exception as e:
# You might want to add error handling here
pass
return None
def _handle_add_edit(self):
"""Handle add/edit action"""
if self.selected_index == len(self.alarms):
# "Add new alarm" option selected
return 'ADD_ALARM'
else:
# Edit existing alarm
selected_alarm = self.alarms[self.selected_index]
return ('ADD_ALARM', selected_alarm) # Pass alarm to AddAlarmView

71
clock/ui/main_clock.py Normal file
View File

@ -0,0 +1,71 @@
import curses
from datetime import datetime
from .utils import init_colors, draw_big_digit
from .big_digits import BIG_DIGITS
class MainClockView:
def __init__(self):
"""Initialize the main clock view"""
self.digit_width = 14 # Width of each digit pattern in the big clock display
def draw(self, stdscr):
"""Draw the main clock screen"""
init_colors()
height, width = stdscr.getmaxyx()
current_time = datetime.now()
self._draw_big_time(stdscr, current_time, height, width)
self._draw_date(stdscr, current_time, height, width)
self._draw_menu(stdscr, height, width)
def handle_input(self, key):
"""Handle user input and return the next view name or None to stay"""
if key == ord('a'):
return 'ADD_ALARM'
elif key == ord('s'):
return 'LIST_ALARMS'
elif key == ord('q'):
return 'QUIT'
return None
def _draw_big_time(self, stdscr, current_time, height, width):
"""Draw the big time display"""
time_str = current_time.strftime("%H:%M:%S")
total_width = self.digit_width * len(time_str)
start_x = self._center_x(width, total_width)
start_y = self._center_y(height, 7) - 4 # 7 is the height of digits
# Draw each digit in green
stdscr.attron(curses.color_pair(1))
for i, digit in enumerate(time_str):
self._draw_digit(stdscr, start_y, start_x + i * self.digit_width, digit)
stdscr.attroff(curses.color_pair(1))
def _draw_date(self, stdscr, current_time, height, width):
"""Draw the current date"""
date_str = current_time.strftime("%Y-%m-%d")
date_x = self._center_x(width, len(date_str))
date_y = height // 2 + 4
stdscr.attron(curses.color_pair(2))
stdscr.addstr(date_y, date_x, date_str)
stdscr.attroff(curses.color_pair(2))
def _draw_menu(self, stdscr, height, width):
"""Draw the menu options"""
menu_str = "A: Add Alarm S: List Alarms Q: Quit"
menu_x = self._center_x(width, len(menu_str))
stdscr.addstr(height - 2, menu_x, menu_str)
def _draw_digit(self, stdscr, y, x, digit):
"""Draw a single big digit"""
# Delegate to the existing draw_big_digit utility function
draw_big_digit(stdscr, y, x, digit)
def _center_x(self, width, text_width):
"""Calculate x coordinate to center text horizontally"""
return (width - text_width) // 2
def _center_y(self, height, text_height):
"""Calculate y coordinate to center text vertically"""
return (height - text_height) // 2

179
clock/ui/ncurses_ui.py Normal file
View File

@ -0,0 +1,179 @@
import curses
import time
import threading
import logging
import queue
from datetime import datetime
from .utils import init_colors, draw_error, ColorScheme
from .active_alarm import ActiveAlarmView
from .add_alarm import AddAlarmView
from .list_alarms import ListAlarmsView
from .main_clock import MainClockView
class UI:
def __init__(self, alarm_system_manager, control_queue):
"""Initialize the UI system"""
# System components
self.alarm_system = alarm_system_manager
self.stop_event = alarm_system_manager.stop_event
self.storage = alarm_system_manager.storage
self.control_queue = control_queue
# Logging
self.logger = logging.getLogger(__name__)
# Initialize views
self._init_views()
# UI State
self.current_view = 'CLOCK'
self.error_message = None
self.error_timestamp = None
def _init_views(self):
"""Initialize all view classes"""
self.views = {
'CLOCK': MainClockView(),
'ADD_ALARM': AddAlarmView(self.storage, self.control_queue),
'LIST_ALARMS': ListAlarmsView(self.storage),
'ACTIVE_ALARMS': ActiveAlarmView(self.storage, self.control_queue)
}
def run(self):
"""Start the ncurses UI in a separate thread"""
def ui_thread():
try:
# Start control queue monitor
monitor_thread = threading.Thread(
target=self._monitor_control_queue,
daemon=True
)
monitor_thread.start()
# Start UI
curses.wrapper(self._main_loop)
except Exception as e:
self.logger.error(f"UI Thread Error: {e}")
finally:
self.stop_event.set()
ui_thread_obj = threading.Thread(target=ui_thread, daemon=True)
ui_thread_obj.start()
return ui_thread_obj
def _monitor_control_queue(self):
"""Monitor the control queue for alarm events"""
while not self.stop_event.is_set():
try:
# Non-blocking check of control queue
try:
control_msg = self.control_queue.get(timeout=1)
if control_msg['type'] == 'trigger':
# Update active alarms view
active_view = self.views['ACTIVE_ALARMS']
alarm_id = control_msg['alarm_id']
active_view.update_active_alarms({
alarm_id: control_msg['info']
})
# Switch to active alarms view
self.current_view = 'ACTIVE_ALARMS'
except queue.Empty:
pass
time.sleep(0.1)
except Exception as e:
self.logger.error(f"Error monitoring control queue: {e}")
time.sleep(1)
def _show_error(self, message, duration=3):
"""Display an error message"""
self.error_message = message
self.error_timestamp = time.time()
def _clear_error_if_expired(self):
"""Clear error message if expired"""
if self.error_message and self.error_timestamp:
if time.time() - self.error_timestamp > 3:
self.error_message = None
self.error_timestamp = None
def _main_loop(self, stdscr):
"""Main ncurses event loop"""
# Setup curses
curses.curs_set(0) # Hide cursor
stdscr.keypad(1) # Enable keypad
stdscr.timeout(100) # Non-blocking input
init_colors() # Initialize color pairs
while not self.stop_event.is_set():
# Clear screen
stdscr.erase()
# Get current view
current_view = self.views.get(self.current_view)
if not current_view:
self.logger.error(f"Invalid view: {self.current_view}")
break
try:
# Draw current view
current_view.draw(stdscr)
# Handle any error messages
if self.error_message:
draw_error(stdscr, self.error_message)
self._clear_error_if_expired()
# Refresh screen
stdscr.refresh()
# Handle input
key = stdscr.getch()
if key != -1:
# Handle quit key globally
if key == ord('q'):
if self.current_view == 'CLOCK':
break # Exit application from clock view
else:
self.current_view = 'CLOCK' # Return to clock from other views
continue
# Let current view handle input
result = current_view.handle_input(key)
# Handle tuple result (view, data) or just a view change
if isinstance(result, tuple):
next_view, data = result
else:
next_view, data = result, None
# Handle quitting
if next_view == 'QUIT':
break
elif next_view:
# Update list alarms view data when switching to it
if next_view == 'LIST_ALARMS':
self.views['LIST_ALARMS'].update_alarms(
self.storage.get_saved_alerts()
)
# Handle editing an alarm by passing data to AddAlarmView
elif next_view == 'ADD_ALARM' and data:
self.views['ADD_ALARM'].set_alarm_data(data)
self.current_view = next_view
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
self._show_error(str(e))
time.sleep(0.1) # Prevent CPU hogging
def stop(self):
"""Stop the UI system"""
self.stop_event.set()

138
clock/ui/utils.py Normal file
View File

@ -0,0 +1,138 @@
import curses
from dataclasses import dataclass
from typing import Optional
from .big_digits import BIG_DIGITS
@dataclass
class ColorScheme:
"""Color scheme configuration for the UI"""
PRIMARY = 1 # Green on black
HIGHLIGHT = 2 # Yellow on black
ERROR = 3 # Red on black
class ViewUtils:
"""Common utility functions for view classes"""
@staticmethod
def center_x(width: int, text_width: int) -> int:
"""Calculate x coordinate to center text horizontally"""
return max(0, (width - text_width) // 2)
@staticmethod
def center_y(height: int, text_height: int) -> int:
"""Calculate y coordinate to center text vertically"""
return max(0, (height - text_height) // 2)
@staticmethod
def draw_centered_text(stdscr, y: int, text: str, color_pair: Optional[int] = None, attrs: int = 0):
"""Draw text centered horizontally on the screen with optional color and attributes"""
height, width = stdscr.getmaxyx()
x = ViewUtils.center_x(width, len(text))
if color_pair is not None:
stdscr.attron(curses.color_pair(color_pair) | attrs)
try:
stdscr.addstr(y, x, text)
except curses.error:
pass # Ignore errors from writing at invalid positions
if color_pair is not None:
stdscr.attroff(curses.color_pair(color_pair) | attrs)
def init_colors():
"""Initialize color pairs for the application"""
try:
curses.start_color()
curses.use_default_colors()
# Primary color (green text on black background)
curses.init_pair(ColorScheme.PRIMARY, curses.COLOR_GREEN, curses.COLOR_BLACK)
# Highlight color (yellow text on black background)
curses.init_pair(ColorScheme.HIGHLIGHT, curses.COLOR_YELLOW, curses.COLOR_BLACK)
# Error color (red text on black background)
curses.init_pair(ColorScheme.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
except Exception as e:
# Log error or handle gracefully if color initialization fails
pass
def draw_error(stdscr, error_message: str, duration_sec: int = 3):
"""
Draw error message at the bottom of the screen
Args:
stdscr: Curses window object
error_message: Message to display
duration_sec: How long the error should be displayed (for reference by caller)
"""
height, width = stdscr.getmaxyx()
# Truncate message if too long
max_width = width - 4
if len(error_message) > max_width:
error_message = error_message[:max_width-3] + "..."
# Position near bottom of screen
error_y = height - 4
ViewUtils.draw_centered_text(
stdscr,
error_y,
error_message,
ColorScheme.ERROR,
curses.A_BOLD
)
def draw_big_digit(stdscr, y: int, x: int, digit: str):
"""
Draw a large digit using the predefined patterns
Args:
stdscr: Curses window object
y: Starting y coordinate
x: Starting x coordinate
digit: Character to draw ('0'-'9', ':', etc)
"""
try:
patterns = BIG_DIGITS.get(digit, BIG_DIGITS['?'])
for i, line in enumerate(patterns):
try:
stdscr.addstr(y + i, x, line)
except curses.error:
continue # Skip lines that would write outside the window
except (curses.error, IndexError):
pass # Ignore any drawing errors
def safe_addstr(stdscr, y: int, x: int, text: str, color_pair: Optional[int] = None, attrs: int = 0):
"""
Safely add a string to the screen, handling boundary conditions
Args:
stdscr: Curses window object
y: Y coordinate
x: X coordinate
text: Text to draw
color_pair: Optional color pair number
attrs: Additional curses attributes
"""
height, width = stdscr.getmaxyx()
# Check if the position is within bounds
if y < 0 or y >= height or x < 0 or x >= width:
return
# Truncate text if it would extend beyond screen width
if x + len(text) > width:
text = text[:width - x]
try:
if color_pair is not None:
stdscr.attron(curses.color_pair(color_pair) | attrs)
stdscr.addstr(y, x, text)
if color_pair is not None:
stdscr.attroff(curses.color_pair(color_pair) | attrs)
except curses.error:
pass # Ignore any drawing errors