Added alert_api.

This commit is contained in:
Kalzu Rekku 2025-01-23 20:16:58 +02:00
commit 47745e7fd5
2 changed files with 664 additions and 0 deletions

491
alert_api/alert_api.py Executable file
View File

@ -0,0 +1,491 @@
#!/usr/bin/python3
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
import os
import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any
from datetime import datetime
from http import HTTPStatus
import re
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG to show all log levels
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
@dataclass
class RepeatRule:
type: str # "daily" or "weekly" or "once"
days_of_week: Optional[List[str]] = field(default_factory=list)
at: Optional[str] = None
def validate(self) -> bool:
"""Validate repeat rule configuration"""
valid_types = {"daily", "weekly", "once"}
valid_days = {"monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"}
if self.type not in valid_types:
logger.error(f"Invalid RepeatRule type: '{self.type}'. Must be one of {valid_types}")
return False
if self.type == "weekly":
invalid_days = [day for day in self.days_of_week if day.lower() not in valid_days]
if invalid_days:
logger.error(f"Invalid RepeatRule weekday names: {invalid_days}. Must be one of {valid_days}")
return False
if not self.days_of_week:
logger.error("Weekly repeat rule must specify at least one day")
return False
if self.type == "once" and self.days_of_week:
if self.days_of_week:
logger.error("One-time alert does not support days_of_week")
return False
if not self.at:
logger.error("One-time repeat rule must specify a valid 'at' date")
return False
try:
datetime.strptime(self.at, "%d.%m.%Y")
except ValueError:
logger.error(f"Invalid 'at' date format: '{self.at}'. Expected format: 'dd.mm.yyyy'")
return False
logger.debug(f"RepeatRule validation passed: {self.__dict__}")
return True
@dataclass
class Snooze:
enabled: bool = True
duration: int = 10 # minutes
max_count: int = 3
def validate(self) -> bool:
"""Validate snooze configuration"""
if not isinstance(self.enabled, bool):
logger.error(f"Snooze enabled must be boolean, got {type(self.enabled)}")
return False
if not isinstance(self.duration, int):
logger.error(f"Snooze duration must be integer, got {type(self.duration)}")
return False
if self.duration <= 0:
logger.error(f"Snooze duration must be positive, got {self.duration}")
return False
if not isinstance(self.max_count, int):
logger.error(f"Snooze max_count must be integer, got {type(self.max_count)}")
return False
if self.max_count <= 0:
logger.error(f"Snooze max_count must be positive, got {self.max_count}")
return False
logger.debug(f"Snooze validation passed: {self.__dict__}")
return True
@dataclass
class Metadata:
volume: int = 100
notes: str = ""
md5sum: str = ""
def validate(self) -> bool:
"""Validate metadata configuration"""
if not isinstance(self.volume, int):
logger.error(f"Volume must be integer, got {type(self.volume)}")
return False
if not 0 <= self.volume <= 100:
logger.error(f"Volume must be between 0 and 100, got {self.volume}")
return False
logger.debug(f"Metadata validation passed: {self.__dict__}")
return True
@dataclass
class Alarm:
name: str
time: str # Format: "HH:MM:SS"
repeat_rule: RepeatRule
file_to_play: str = field(default=os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
enabled: bool = field(default=True)
snooze: Snooze = field(default_factory=Snooze)
metadata: Metadata = field(default_factory=Metadata)
id: Optional[int] = field(default=None)
def validate(self) -> bool:
"""Validate complete alarm configuration"""
logger.debug(f"Starting validation for alarm: {self.name}")
# Validate name
if not isinstance(self.name, str) or len(self.name.strip()) == 0:
logger.error(f"Invalid alarm name: '{self.name}'. Must be non-empty string")
return False
# Validate time format
time_pattern = re.compile(r'^([0-1]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$')
if not time_pattern.match(self.time):
logger.error(f"Invalid time format: '{self.time}'. Must match HH:MM:SS")
return False
# Validate enabled flag
if not isinstance(self.enabled, bool):
logger.error(f"Enabled must be boolean, got {type(self.enabled)}")
return False
# Create default alarms directory if it doesn't exist
default_alarms_dir = os.path.expanduser("~/.alarms")
os.makedirs(default_alarms_dir, exist_ok=True)
# Validate audio file
if self.file_to_play != os.path.expanduser("~/.alarms/alarm-lofi.mp3") and not os.path.exists(self.file_to_play):
logger.error(f"Audio file not found: '{self.file_to_play}'")
return False
# Validate repeat rule
if not isinstance(self.repeat_rule, RepeatRule):
logger.error(f"Invalid repeat_rule type: {type(self.repeat_rule)}")
return False
if not self.repeat_rule.validate():
logger.error("Repeat rule validation failed")
return False
# Validate snooze
if not isinstance(self.snooze, Snooze):
logger.error(f"Invalid snooze type: {type(self.snooze)}")
return False
if not self.snooze.validate():
logger.error("Snooze validation failed")
return False
# Validate metadata
if not isinstance(self.metadata, Metadata):
logger.error(f"Invalid metadata type: {type(self.metadata)}")
return False
if not self.metadata.validate():
logger.error("Metadata validation failed")
return False
logger.debug(f"Alarm validation passed: {self.name}")
return True
class StorageHandler:
def __init__(self, file_path: str):
self.file_path = file_path
self._ensure_storage_directory()
def _ensure_storage_directory(self) -> None:
"""Ensure the directory for the storage file exists"""
directory = os.path.dirname(os.path.abspath(self.file_path))
if directory:
os.makedirs(directory, exist_ok=True)
@staticmethod
def _generate_alarm_hash(alarm: dict) -> str:
"""Generate an MD5 hash for the given alarm data"""
# Convert dataclass instances to dictionaries if needed
def to_dict(value):
return asdict(value) if hasattr(value, '__dataclass_fields__') else value
alarm_data = alarm.copy()
alarm_data["repeat_rule"] = to_dict(alarm_data.get("repeat_rule"))
alarm_data["snooze"] = to_dict(alarm_data.get("snooze"))
alarm_data["metadata"] = to_dict(alarm_data.get("metadata"))
hash_input = json.dumps({
"name": alarm_data["name"],
"time": alarm_data["time"],
"file_to_play": alarm_data.get("file_to_play", os.path.expanduser("~/.alarms/alarm-lofi.mp3")),
"repeat_rule": alarm_data["repeat_rule"]
}, sort_keys=True)
return hashlib.md5(hash_input.encode('utf-8')).hexdigest()
def load_data(self) -> dict:
"""Load the JSON data from the file"""
try:
with open(self.file_path, 'r') as file:
data = json.load(file)
if not isinstance(data, dict):
raise ValueError("Invalid data format")
return data
except (FileNotFoundError, json.JSONDecodeError):
return {"last_id": 0, "alarms": []}
def save_data(self, data: dict) -> None:
"""Save the JSON data back to the file"""
def convert_to_dict(obj):
"""Convert dataclass instances to dictionaries recursively"""
if isinstance(obj, list):
return [convert_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {key: convert_to_dict(value) for key, value in obj.items()}
elif hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
else:
return obj
# Convert data to JSON-serializable format
serializable_data = convert_to_dict(data)
with open(self.file_path, 'w') as file:
json.dump(serializable_data, file, indent=4)
def save_new_alert(self, alarm_data: dict) -> int:
"""Add a new alarm to the storage"""
# Remove id if present in input data
alarm_data.pop('id', None)
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
# Generate hash and check for duplicates
alarm_hash = self._generate_alarm_hash(alarm_data)
data = self.load_data()
if any(a.get("hash") == alarm_hash for a in data["alarms"]):
raise ValueError("Duplicate alarm detected")
# Add new alarm
alarm_data["id"] = data.get("last_id", 0) + 1
alarm_data["hash"] = alarm_hash
data["last_id"] = alarm_data["id"]
data["alarms"].append(alarm_data)
self.save_data(data)
return alarm_data["id"]
def get_saved_alerts(self) -> list:
"""Retrieve the list of saved alarms"""
return self.load_data().get("alarms", [])
def remove_saved_alert(self, alarm_id: int) -> bool:
"""Remove an alarm by its ID"""
data = self.load_data()
original_length = len(data["alarms"])
data["alarms"] = [a for a in data["alarms"] if a["id"] != alarm_id]
if len(data["alarms"]) == original_length:
return False
self.save_data(data)
return True
def update_alert(self, alarm_id: int, alarm_data: dict) -> bool:
"""Update an existing alarm"""
alarm_data["id"] = alarm_id
# Set default file_to_play if not provided
alarm_data['file_to_play'] = alarm_data.get('file_to_play', os.path.expanduser("~/.alarms/alarm-lofi.mp3"))
# Convert nested dictionaries to dataclass instances
try:
if 'repeat_rule' in alarm_data:
alarm_data['repeat_rule'] = RepeatRule(**alarm_data['repeat_rule'])
if 'snooze' in alarm_data:
alarm_data['snooze'] = Snooze(**alarm_data['snooze'])
if 'metadata' in alarm_data:
alarm_data['metadata'] = Metadata(**alarm_data['metadata'])
except TypeError as e:
raise ValueError(f"Invalid format in one of the nested fields: {str(e)}")
# Validate input
try:
alarm = Alarm(**alarm_data)
if not alarm.validate():
raise ValueError("Invalid alarm configuration")
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid alarm data: {str(e)}")
data = self.load_data()
for i, existing_alarm in enumerate(data["alarms"]):
if existing_alarm["id"] == alarm_id:
alarm_data["hash"] = self._generate_alarm_hash(alarm_data)
data["alarms"][i] = alarm_data
self.save_data(data)
return True
return False
class AlertApi(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.storage = StorageHandler("data/alerts.json")
self.logger = logging.getLogger('AlertApi')
super().__init__(*args, **kwargs)
def _send_response(self, status_code: int, data: Any = None, error: str = None) -> None:
"""Send a JSON response with the given status code and data/error"""
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
response = {}
if data is not None:
response["data"] = data
if error is not None:
response["error"] = error
response_json = json.dumps(response)
self.logger.debug(f"Sending response: {response_json}")
self.wfile.write(response_json.encode("utf-8"))
def _handle_request(self, method: str) -> None:
"""Handle incoming requests with proper error handling"""
self.logger.info(f"Received {method} request from {self.client_address[0]}")
try:
if method in ["POST", "PUT", "DELETE"]:
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
raise ValueError("Missing request body")
post_data = self.rfile.read(content_length)
self.logger.debug(f"Received {method} payload: {post_data.decode('utf-8')}")
post_data = json.loads(post_data)
else:
post_data = None
# Route request to appropriate handler
handler = getattr(self, f"_handle_{method.lower()}", None)
if handler:
self.logger.debug(f"Routing to handler: _handle_{method.lower()}")
handler(post_data)
else:
self.logger.warning(f"Method not allowed: {method}")
self._send_response(HTTPStatus.METHOD_NOT_ALLOWED, error="Method not allowed")
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {str(e)}")
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid JSON in request body")
except ValueError as e:
self.logger.error(f"Validation error: {str(e)}")
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
except Exception as e:
self.logger.error(f"Unexpected error: {str(e)}", exc_info=True)
self._send_response(HTTPStatus.INTERNAL_SERVER_ERROR, error="Internal server error")
def _handle_get(self, _) -> None:
"""Handle GET request"""
self.logger.debug("Processing GET request for all alarms")
alarms = self.storage.get_saved_alerts()
self.logger.debug(f"Retrieved {len(alarms)} alarms")
self._send_response(HTTPStatus.OK, data=alarms)
def _handle_post(self, data: dict) -> None:
"""Handle POST request"""
self.logger.debug(f"Processing POST request with data: {json.dumps(data, indent=2)}")
try:
alarm_id = self.storage.save_new_alert(data)
self.logger.info(f"Successfully created new alarm with ID: {alarm_id}")
self._send_response(HTTPStatus.CREATED, data={"id": alarm_id})
except ValueError as e:
self.logger.error(f"Failed to create alarm: {str(e)}")
self._send_response(HTTPStatus.BAD_REQUEST, error=str(e))
def _handle_put(self, data: dict) -> None:
"""Handle PUT request"""
alarm_id = data.pop("id", None)
self.logger.debug(f"Processing PUT request for alarm ID {alarm_id} with data: {json.dumps(data, indent=2)}")
if alarm_id is None:
self.logger.error("PUT request missing alarm ID")
self._send_response(HTTPStatus.BAD_REQUEST, error="Missing alarm ID")
return
if self.storage.update_alert(alarm_id, data):
self.logger.info(f"Successfully updated alarm ID: {alarm_id}")
self._send_response(HTTPStatus.OK, data={"message": "Alarm updated successfully"})
else:
self.logger.warning(f"Alarm not found for update: {alarm_id}")
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
def _handle_delete(self, data: dict) -> None:
"""Handle DELETE request"""
alarm_id = data.get("id")
self.logger.debug(f"Processing DELETE request for alarm ID: {alarm_id}")
if not isinstance(alarm_id, int):
self.logger.error(f"Invalid alarm ID format: {alarm_id}")
self._send_response(HTTPStatus.BAD_REQUEST, error="Invalid alarm ID")
return
if self.storage.remove_saved_alert(alarm_id):
self.logger.info(f"Successfully deleted alarm ID: {alarm_id}")
self._send_response(HTTPStatus.OK, data={"message": "Alarm removed successfully"})
else:
self.logger.warning(f"Alarm not found for deletion: {alarm_id}")
self._send_response(HTTPStatus.NOT_FOUND, error="Alarm not found")
def do_GET(self): self._handle_request("GET")
def do_POST(self): self._handle_request("POST")
def do_PUT(self): self._handle_request("PUT")
def do_DELETE(self): self._handle_request("DELETE")
def run(server_class=HTTPServer, handler_class=AlertApi, port=8000):
# Set up logging configuration
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG to show all log levels
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console handler
logging.FileHandler('alert_api.log') # File handler
]
)
logger = logging.getLogger('AlertApi')
logger.info(f"Starting AlertApi on port {port}")
server_address = ("", port)
httpd = server_class(server_address, handler_class)
try:
logger.info("Server is ready to handle requests")
httpd.serve_forever()
except KeyboardInterrupt:
logger.info("Received shutdown signal")
except Exception as e:
logger.error(f"Server error: {str(e)}", exc_info=True)
finally:
httpd.server_close()
logger.info("Server stopped")
if __name__ == "__main__":
from sys import argv
run(port=int(argv[1]) if len(argv) == 2 else 8000)

173
alert_api/tests.sh Normal file
View File

@ -0,0 +1,173 @@
#!/bin/bash
# Configuration
API_URL="http://localhost:8000"
TEST_ALARM_NAME="Test Alarm"
TEST_ALARM_TIME="08:30:00"
TEST_AUDIO_FILE="/tmp/test.mp3"
# Color codes for output
GREEN='\033[0;32m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Helper function to print status messages
print_status() {
echo -e "${BLUE}=== $1 ===${NC}"
}
# Helper function to check test results
check_result() {
if [ $? -eq 0 ]; then
echo -e "${GREEN}$1${NC}"
else
echo -e "${RED}$1${NC}"
exit 1
fi
}
# 1. Test server connectivity
print_status "Testing server connectivity"
curl -s "$API_URL" > /dev/null
check_result "Server connectivity check"
# 2. Get initial state
print_status "Getting initial state"
INITIAL_STATE=$(curl -s -X GET "$API_URL")
echo "$INITIAL_STATE" | jq .
check_result "Retrieved initial state"
# 3. Add a new alarm (POST)
print_status "Adding new alarm"
POST_DATA=$(jq -n \
--arg name "$TEST_ALARM_NAME" \
--arg time "$TEST_ALARM_TIME" \
'{
"name": $name,
"time": $time,
"repeat_rule": {
"type": "weekly",
"days_of_week": ["monday", "wednesday", "friday"]
},
"enabled": true,
"snooze": {
"enabled": true,
"duration": 5,
"max_count": 3
},
"metadata": {
"volume": 75,
"notes": "Test alarm with full configuration"
}
}'
)
ADD_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
NEW_ALARM_ID=$(echo "$ADD_RESPONSE" | jq -r '.data.id')
if [[ "$NEW_ALARM_ID" == "null" || -z "$NEW_ALARM_ID" ]]; then
echo -e "${RED}Failed to add alarm${NC}"
echo "Response: $ADD_RESPONSE"
exit 1
fi
echo -e "${GREEN}Added alarm with ID: $NEW_ALARM_ID${NC}"
# 4. Test duplicate alarm detection
print_status "Testing duplicate alarm detection"
DUPLICATE_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$POST_DATA" "$API_URL")
if [[ $(echo "$DUPLICATE_RESPONSE" | jq -r '.error') == *"Duplicate alarm detected"* ]]; then
echo -e "${GREEN}✓ Duplicate detection working${NC}"
else
echo -e "${RED}✗ Duplicate detection failed${NC}"
echo "Response: $DUPLICATE_RESPONSE"
fi
# 5. Update the alarm (PUT)
print_status "Updating alarm"
UPDATE_DATA=$(jq -n \
--arg name "$TEST_ALARM_NAME Updated" \
--arg time "$TEST_ALARM_TIME" \
--argjson id "$NEW_ALARM_ID" \
'{
"id": $id,
"name": $name,
"time": $time,
"repeat_rule": {
"type": "daily"
},
"enabled": true,
"snooze": {
"enabled": false,
"duration": 10,
"max_count": 2
},
"metadata": {
"volume": 90,
"notes": "Updated test alarm"
}
}'
)
UPDATE_RESPONSE=$(curl -s -X PUT -H "Content-Type: application/json" -d "$UPDATE_DATA" "$API_URL")
if [[ $(echo "$UPDATE_RESPONSE" | jq -r '.data.message') == "Alarm updated successfully" ]]; then
echo -e "${GREEN}✓ Alarm update successful${NC}"
else
echo -e "${RED}✗ Alarm update failed${NC}"
echo "Response: $UPDATE_RESPONSE"
fi
# 6. Verify the update
print_status "Verifying update"
UPDATED_STATE=$(curl -s -X GET "$API_URL")
UPDATED_ALARM=$(echo "$UPDATED_STATE" | jq -r ".data[] | select(.id == $NEW_ALARM_ID)")
if [[ $(echo "$UPDATED_ALARM" | jq -r '.name') == "$TEST_ALARM_NAME Updated" ]]; then
echo -e "${GREEN}✓ Update verification successful${NC}"
else
echo -e "${RED}✗ Update verification failed${NC}"
echo "Current alarm state: $UPDATED_ALARM"
fi
# 7. Test invalid inputs
print_status "Testing invalid inputs"
# Test invalid time format
INVALID_TIME_DATA=$(echo "$POST_DATA" | jq '. + {"time": "25:00:00"}')
INVALID_TIME_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_TIME_DATA" "$API_URL")
if [[ $(echo "$INVALID_TIME_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
echo -e "${GREEN}✓ Invalid time format detection working${NC}"
else
echo -e "${RED}✗ Invalid time format detection failed${NC}"
fi
# Test invalid repeat rule
INVALID_REPEAT_DATA=$(echo "$POST_DATA" | jq '.repeat_rule.type = "monthly"')
INVALID_REPEAT_RESPONSE=$(curl -s -X POST -H "Content-Type: application/json" -d "$INVALID_REPEAT_DATA" "$API_URL")
if [[ $(echo "$INVALID_REPEAT_RESPONSE" | jq -r '.error') == *"Invalid alarm configuration"* ]]; then
echo -e "${GREEN}✓ Invalid repeat rule detection working${NC}"
else
echo -e "${RED}✗ Invalid repeat rule detection failed${NC}"
fi
# 8. Delete the test alarm
print_status "Deleting test alarm"
DELETE_RESPONSE=$(curl -s -X DELETE -H "Content-Type: application/json" -d "{\"id\":$NEW_ALARM_ID}" "$API_URL")
if [[ $(echo "$DELETE_RESPONSE" | jq -r '.data.message') == "Alarm removed successfully" ]]; then
echo -e "${GREEN}✓ Alarm deletion successful${NC}"
else
echo -e "${RED}✗ Alarm deletion failed${NC}"
echo "Response: $DELETE_RESPONSE"
fi
# 9. Verify deletion
print_status "Verifying deletion"
FINAL_STATE=$(curl -s -X GET "$API_URL")
if [[ $(echo "$FINAL_STATE" | jq ".data[] | select(.id == $NEW_ALARM_ID)") == "" ]]; then
echo -e "${GREEN}✓ Deletion verification successful${NC}"
else
echo -e "${RED}✗ Deletion verification failed${NC}"
echo "Current state: $FINAL_STATE"
fi
print_status "Test suite completed!"