commit 47745e7fd5b0cf4bc1fa5b8b8fd06288743ee422 Author: Kalzu Rekku Date: Thu Jan 23 20:16:58 2025 +0200 Added alert_api. diff --git a/alert_api/alert_api.py b/alert_api/alert_api.py new file mode 100755 index 0000000..2440c97 --- /dev/null +++ b/alert_api/alert_api.py @@ -0,0 +1,491 @@ +#!/usr/bin/python3 + +from http.server import BaseHTTPRequestHandler, HTTPServer +import logging +import os +import json +import hashlib +from dataclasses import dataclass, field, asdict +from typing import List, Optional, Dict, Any +from datetime import datetime +from http import HTTPStatus +import re + +# Set up logging configuration +logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG to show all log levels + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] +) + +logger = logging.getLogger('AlertApi') + + +@dataclass +class RepeatRule: + type: str # "daily" or "weekly" or "once" + days_of_week: Optional[List[str]] = field(default_factory=list) + at: Optional[str] = None + + def validate(self) -> bool: + """Validate repeat rule configuration""" + valid_types = {"daily", "weekly", "once"} + valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"} + + if self.type not in valid_types: + logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}") + return False + + if self.type == "weekly": + invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days] + if invalid_days: + logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}") + return False + if not self.days_of_week: + logger.error("Weekly repeat rule must specify at least one day") + return False + + if self.type == "once" and self.days_of_week: + if self.days_of_week: + logger.error("One-time alert does not support days_of_week") + return False + if not self.at: + logger.error("One-time repeat rule must specify a valid 'at' date") + return False + try: + datetime.strptime(self.at, "%d.%m.%Y") + except ValueError: + logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'") + return False + + logger.debug(f"RepeatRule validation passed: {self.__dict__}") + return True + + +@dataclass +class Snooze: + enabled: bool = True + duration: int = 10 # minutes + max_count: int = 3 + + def validate(self) -> bool: + """Validate snooze configuration""" + if not isinstance(self.enabled, bool): + logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}") + return False + + if not isinstance(self.duration, int): + logger.error(f"Snooze duration must be integer, got {type(self.duration)}") + return False + + if self.duration <= 0: + logger.error(f"Snooze duration must be positive, got {self.duration}") + return False + + if not isinstance(self.max_count, int): + logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}") + return False + + if self.max_count <= 0: + logger.error(f"Snooze max_count must be positive, got {self.max_count}") + return False + + logger.debug(f"Snooze validation passed: {self.__dict__}") + return True + + +@dataclass +class Metadata: + volume: int = 100 + notes: str = "" + md5sum: str = "" + + def validate(self) -> bool: + """Validate metadata configuration""" + if not isinstance(self.volume, int): + logger.error(f"Volume must be integer, got {type(self.volume)}") + return False + + if not 0 <= self.volume <= 100: + logger.error(f"Volume must be between 0 and 100, got {self.volume}") + return False + + logger.debug(f"Metadata validation passed: {self.__dict__}") + return True + + +@dataclass +class Alarm: + name: str + time: str # Format: "HH:MM:SS" + repeat_rule: RepeatRule + file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + enabled: bool = field(default=True) + snooze: Snooze = field(default_factory=Snooze) + metadata: Metadata = field(default_factory=Metadata) + id: Optional[int] = field(default=None) + + def validate(self) -> bool: + """Validate complete alarm configuration""" + logger.debug(f"Starting validation for alarm: {self.name}") + + # Validate name + if not isinstance(self.name, str) or len(self.name.strip()) == 0: + logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string") + return False + + # Validate time format + time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$') + if not time_pattern.match(self.time): + logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS") + return False + + # Validate enabled flag + if not isinstance(self.enabled, bool): + logger.error(f"Enabled must be boolean, got {type(self.enabled)}") + return False + + # Create default alarms directory if it doesn't exist + default_alarms_dir = os.path.expanduser("~/.alarms") + os.makedirs(default_alarms_dir, exist_ok=True) + + # Validate audio file + if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play): + logger.error(f"Audio file not found: '{self.file_to_play}'") + return False + + # Validate repeat rule + if not isinstance(self.repeat_rule, RepeatRule): + logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}") + return False + if not self.repeat_rule.validate(): + logger.error("Repeat rule validation failed") + return False + + # Validate snooze + if not isinstance(self.snooze, Snooze): + logger.error(f"Invalid snooze type: {type(self.snooze)}") + return False + if not self.snooze.validate(): + logger.error("Snooze validation failed") + return False + + # Validate metadata + if not isinstance(self.metadata, Metadata): + logger.error(f"Invalid metadata type: {type(self.metadata)}") + return False + if not self.metadata.validate(): + logger.error("Metadata validation failed") + return False + + logger.debug(f"Alarm validation passed: {self.name}") + return True + +class StorageHandler: + def __init__(self, file_path: str): + self.file_path = file_path + self._ensure_storage_directory() + + def _ensure_storage_directory(self) -> None: + """Ensure the directory for the storage file exists""" + directory = os.path.dirname(os.path.abspath(self.file_path)) + if directory: + os.makedirs(directory, exist_ok=True) + + @staticmethod + def _generate_alarm_hash(alarm: dict) -> str: + """Generate an MD5 hash for the given alarm data""" + # Convert dataclass instances to dictionaries if needed + def to_dict(value): + return asdict(value) if hasattr(value, '__dataclass_fields__') else value + + alarm_data = alarm.copy() + alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule")) + alarm_data["snooze"] = to_dict(alarm_data.get("snooze")) + alarm_data["metadata"] = to_dict(alarm_data.get("metadata")) + + hash_input = json.dumps({ + "name": alarm_data["name"], + "time": alarm_data["time"], + "file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")), + "repeat_rule": alarm_data["repeat_rule"] + }, sort_keys=True) + return hashlib.md5(hash_input.encode('utf-8')).hexdigest() + + def load_data(self) -> dict: + """Load the JSON data from the file""" + try: + with open(self.file_path, 'r') as file: + data = json.load(file) + if not isinstance(data, dict): + raise ValueError("Invalid data format") + return data + except (FileNotFoundError, json.JSONDecodeError): + return {"last_id": 0, "alarms": []} + + def save_data(self, data: dict) -> None: + """Save the JSON data back to the file""" + def convert_to_dict(obj): + """Convert dataclass instances to dictionaries recursively""" + if isinstance(obj, list): + return [convert_to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {key: convert_to_dict(value) for key, value in obj.items()} + elif hasattr(obj, "__dataclass_fields__"): + return asdict(obj) + else: + return obj + + # Convert data to JSON-serializable format + serializable_data = convert_to_dict(data) + + with open(self.file_path, 'w') as file: + json.dump(serializable_data, file, indent=4) + + def save_new_alert(self, alarm_data: dict) -> int: + """Add a new alarm to the storage""" + # Remove id if present in input data + alarm_data.pop('id', None) + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + # Generate hash and check for duplicates + alarm_hash = self._generate_alarm_hash(alarm_data) + data = self.load_data() + + if any(a.get("hash") == alarm_hash for a in data["alarms"]): + raise ValueError("Duplicate alarm detected") + + # Add new alarm + alarm_data["id"] = data.get("last_id", 0) + 1 + alarm_data["hash"] = alarm_hash + data["last_id"] = alarm_data["id"] + data["alarms"].append(alarm_data) + + self.save_data(data) + return alarm_data["id"] + + def get_saved_alerts(self) -> list: + """Retrieve the list of saved alarms""" + return self.load_data().get("alarms", []) + + def remove_saved_alert(self, alarm_id: int) -> bool: + """Remove an alarm by its ID""" + data = self.load_data() + original_length = len(data["alarms"]) + data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id] + + if len(data["alarms"]) == original_length: + return False + + self.save_data(data) + return True + + def update_alert(self, alarm_id: int, alarm_data: dict) -> bool: + """Update an existing alarm""" + alarm_data["id"] = alarm_id + + # Set default file_to_play if not provided + alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3")) + + # Convert nested dictionaries to dataclass instances + try: + if 'repeat_rule' in alarm_data: + alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule']) + if 'snooze' in alarm_data: + alarm_data['snooze'] = Snooze(**alarm_data['snooze']) + if 'metadata' in alarm_data: + alarm_data['metadata'] = Metadata(**alarm_data['metadata']) + except TypeError as e: + raise ValueError(f"Invalid format in one of the nested fields: {str(e)}") + + # Validate input + try: + alarm = Alarm(**alarm_data) + if not alarm.validate(): + raise ValueError("Invalid alarm configuration") + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid alarm data: {str(e)}") + + data = self.load_data() + for i, existing_alarm in enumerate(data["alarms"]): + if existing_alarm["id"] == alarm_id: + alarm_data["hash"] = self._generate_alarm_hash(alarm_data) + data["alarms"][i] = alarm_data + self.save_data(data) + return True + return False + + +class AlertApi(BaseHTTPRequestHandler): + def __init__(self, *args, **kwargs): + self.storage = StorageHandler("data/alerts.json") + self.logger = logging.getLogger('AlertApi') + super().__init__(*args, **kwargs) + + def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None: + """Send a JSON response with the given status code and data/error""" + self.send_response(status_code) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + response = {} + if data is not None: + response["data"] = data + if error is not None: + response["error"] = error + + response_json = json.dumps(response) + self.logger.debug(f"Sending response: {response_json}") + self.wfile.write(response_json.encode("utf-8")) + + def _handle_request(self, method: str) -> None: + """Handle incoming requests with proper error handling""" + self.logger.info(f"Received {method} request from {self.client_address[0]}") + + try: + if method in ["POST", "PUT", "DELETE"]: + content_length = int(self.headers.get("Content-Length", 0)) + if content_length == 0: + raise ValueError("Missing request body") + + post_data = self.rfile.read(content_length) + self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}") + + post_data = json.loads(post_data) + else: + post_data = None + + # Route request to appropriate handler + handler = getattr(self, f"_handle_{method.lower()}", None) + if handler: + self.logger.debug(f"Routing to handler: _handle_{method.lower()}") + handler(post_data) + else: + self.logger.warning(f"Method not allowed: {method}") + self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed") + + except json.JSONDecodeError as e: + self.logger.error(f"JSON decode error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body") + except ValueError as e: + self.logger.error(f"Validation error: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + except Exception as e: + self.logger.error(f"Unexpected error: {str(e)}", exc_info=True) + self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error") + + def _handle_get(self, _) -> None: + """Handle GET request""" + self.logger.debug("Processing GET request for all alarms") + alarms = self.storage.get_saved_alerts() + self.logger.debug(f"Retrieved {len(alarms)} alarms") + self._send_response(HTTPStatus.OK, data=alarms) + + def _handle_post(self, data: dict) -> None: + """Handle POST request""" + self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}") + try: + alarm_id = self.storage.save_new_alert(data) + self.logger.info(f"Successfully created new alarm with ID: {alarm_id}") + self._send_response(HTTPStatus.CREATED, data={"id": alarm_id}) + except ValueError as e: + self.logger.error(f"Failed to create alarm: {str(e)}") + self._send_response(HTTPStatus.BAD_REQUEST, error=str(e)) + + def _handle_put(self, data: dict) -> None: + """Handle PUT request""" + alarm_id = data.pop("id", None) + self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}") + + if alarm_id is None: + self.logger.error("PUT request missing alarm ID") + self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID") + return + + if self.storage.update_alert(alarm_id, data): + self.logger.info(f"Successfully updated alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"}) + else: + self.logger.warning(f"Alarm not found for update: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def _handle_delete(self, data: dict) -> None: + """Handle DELETE request""" + alarm_id = data.get("id") + self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}") + + if not isinstance(alarm_id, int): + self.logger.error(f"Invalid alarm ID format: {alarm_id}") + self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID") + return + + if self.storage.remove_saved_alert(alarm_id): + self.logger.info(f"Successfully deleted alarm ID: {alarm_id}") + self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"}) + else: + self.logger.warning(f"Alarm not found for deletion: {alarm_id}") + self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found") + + def do_GET(self): self._handle_request("GET") + def do_POST(self): self._handle_request("POST") + def do_PUT(self): self._handle_request("PUT") + def do_DELETE(self): self._handle_request("DELETE") + + +def run(server_class=HTTPServer, handler_class=AlertApi, port=8000): + # Set up logging configuration + logging.basicConfig( + level=logging.DEBUG, # Set to DEBUG to show all log levels + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), # Console handler + logging.FileHandler('alert_api.log') # File handler + ] + ) + + logger = logging.getLogger('AlertApi') + logger.info(f"Starting AlertApi on port {port}") + + server_address = ("", port) + httpd = server_class(server_address, handler_class) + + try: + logger.info("Server is ready to handle requests") + httpd.serve_forever() + except KeyboardInterrupt: + logger.info("Received shutdown signal") + except Exception as e: + logger.error(f"Server error: {str(e)}", exc_info=True) + finally: + httpd.server_close() + logger.info("Server stopped") + + +if __name__ == "__main__": + from sys import argv + run(port=int(argv[1]) if len(argv) == 2 else 8000) diff --git a/alert_api/tests.sh b/alert_api/tests.sh new file mode 100644 index 0000000..9bd4572 --- /dev/null +++ b/alert_api/tests.sh @@ -0,0 +1,173 @@ +#!/bin/bash + +# Configuration +API_URL="http://localhost:8000" +TEST_ALARM_NAME="Test Alarm" +TEST_ALARM_TIME="08:30:00" +TEST_AUDIO_FILE="/tmp/test.mp3" + +# Color codes for output +GREEN='\033[0;32m' +RED='\033[0;31m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Helper function to print status messages +print_status() { + echo -e "${BLUE}=== $1 ===${NC}" +} + +# Helper function to check test results +check_result() { + if [ $? -eq 0 ]; then + echo -e "${GREEN}✓ $1${NC}" + else + echo -e "${RED}✗ $1${NC}" + exit 1 + fi +} + +# 1. Test server connectivity +print_status "Testing server connectivity" +curl -s "$API_URL" > /dev/null +check_result "Server connectivity check" + +# 2. Get initial state +print_status "Getting initial state" +INITIAL_STATE=$(curl -s -X GET "$API_URL") +echo "$INITIAL_STATE" | jq . +check_result "Retrieved initial state" + +# 3. Add a new alarm (POST) +print_status "Adding new alarm" +POST_DATA=$(jq -n \ + --arg name "$TEST_ALARM_NAME" \ + --arg time "$TEST_ALARM_TIME" \ + '{ + "name": $name, + "time": $time, + "repeat_rule": { + "type": "weekly", + "days_of_week": ["monday", "wednesday", "friday"] + }, + "enabled": true, + "snooze": { + "enabled": true, + "duration": 5, + "max_count": 3 + }, + "metadata": { + "volume": 75, + "notes": "Test alarm with full configuration" + } + }' +) + +ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") +NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id') + +if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then + echo -e "${RED}Failed to add alarm${NC}" + echo "Response: $ADD_RESPONSE" + exit 1 +fi + +echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}" + +# 4. Test duplicate alarm detection +print_status "Testing duplicate alarm detection" +DUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL") +if [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then + echo -e "${GREEN}✓ Duplicate detection working${NC}" +else + echo -e "${RED}✗ Duplicate detection failed${NC}" + echo "Response: $DUPLICATE_RESPONSE" +fi + +# 5. Update the alarm (PUT) +print_status "Updating alarm" +UPDATE_DATA=$(jq -n \ + --arg name "$TEST_ALARM_NAME Updated" \ + --arg time "$TEST_ALARM_TIME" \ + --argjson id "$NEW_ALARM_ID" \ + '{ + "id": $id, + "name": $name, + "time": $time, + "repeat_rule": { + "type": "daily" + }, + "enabled": true, + "snooze": { + "enabled": false, + "duration": 10, + "max_count": 2 + }, + "metadata": { + "volume": 90, + "notes": "Updated test alarm" + } + }' +) + +UPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL") +if [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then + echo -e "${GREEN}✓ Alarm update successful${NC}" +else + echo -e "${RED}✗ Alarm update failed${NC}" + echo "Response: $UPDATE_RESPONSE" +fi + +# 6. Verify the update +print_status "Verifying update" +UPDATED_STATE=$(curl -s -X GET "$API_URL") +UPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)") +if [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then + echo -e "${GREEN}✓ Update verification successful${NC}" +else + echo -e "${RED}✗ Update verification failed${NC}" + echo "Current alarm state: $UPDATED_ALARM" +fi + +# 7. Test invalid inputs +print_status "Testing invalid inputs" + +# Test invalid time format +INVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}') +INVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL") +if [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then + echo -e "${GREEN}✓ Invalid time format detection working${NC}" +else + echo -e "${RED}✗ Invalid time format detection failed${NC}" +fi + +# Test invalid repeat rule +INVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"') +INVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL") +if [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then + echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}" +else + echo -e "${RED}✗ Invalid repeat rule detection failed${NC}" +fi + +# 8. Delete the test alarm +print_status "Deleting test alarm" +DELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL") +if [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then + echo -e "${GREEN}✓ Alarm deletion successful${NC}" +else + echo -e "${RED}✗ Alarm deletion failed${NC}" + echo "Response: $DELETE_RESPONSE" +fi + +# 9. Verify deletion +print_status "Verifying deletion" +FINAL_STATE=$(curl -s -X GET "$API_URL") +if [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then + echo -e "${GREEN}✓ Deletion verification successful${NC}" +else + echo -e "${RED}✗ Deletion verification failed${NC}" + echo "Current state: $FINAL_STATE" +fi + +print_status "Test suite completed!"