python import problems.
This commit is contained in:
0
alert_clients/alarm_api_client/__init__.py
Normal file
0
alert_clients/alarm_api_client/__init__.py
Normal file
102
alert_clients/alarm_api_client/alarm_api_client.py
Normal file
102
alert_clients/alarm_api_client/alarm_api_client.py
Normal file
@ -0,0 +1,102 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
|
||||
class AlarmApiClient:
|
||||
def __init__(self, base_url: str):
|
||||
"""
|
||||
Initialize the Alarm API client.
|
||||
:param base_url: Base URL of the Alarm API server (e.g., 'http://localhost:8000')
|
||||
"""
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def _handle_response(self, response: requests.Response) -> dict:
|
||||
"""
|
||||
Handle API responses, raising exceptions for errors.
|
||||
:param response: The HTTP response object.
|
||||
:return: Parsed JSON data if the response is successful.
|
||||
"""
|
||||
try:
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.HTTPError as e:
|
||||
raise Exception(f"HTTP error {response.status_code}: {response.text}") from e
|
||||
except json.JSONDecodeError:
|
||||
raise Exception("Invalid JSON response")
|
||||
|
||||
def get_alarms(self) -> list:
|
||||
"""
|
||||
Retrieve all alarms from the API.
|
||||
:return: List of alarm configurations.
|
||||
"""
|
||||
response = requests.get(f"{self.base_url}/")
|
||||
return self._handle_response(response)["data"]
|
||||
|
||||
def create_alarm(self, alarm_data: dict) -> int:
|
||||
"""
|
||||
Create a new alarm.
|
||||
:param alarm_data: Dictionary containing alarm configuration.
|
||||
:return: ID of the newly created alarm.
|
||||
"""
|
||||
response = requests.post(f"{self.base_url}/", json=alarm_data)
|
||||
return self._handle_response(response)["data"]["id"]
|
||||
|
||||
def update_alarm(self, alarm_id: int, alarm_data: dict) -> bool:
|
||||
"""
|
||||
Update an existing alarm.
|
||||
:param alarm_id: ID of the alarm to update.
|
||||
:param alarm_data: Updated alarm configuration.
|
||||
:return: True if the update was successful.
|
||||
"""
|
||||
alarm_data["id"] = alarm_id
|
||||
response = requests.put(f"{self.base_url}/", json=alarm_data)
|
||||
return "message" in self._handle_response(response)
|
||||
|
||||
def delete_alarm(self, alarm_id: int) -> bool:
|
||||
"""
|
||||
Delete an alarm by its ID.
|
||||
:param alarm_id: ID of the alarm to delete.
|
||||
:return: True if the deletion was successful.
|
||||
"""
|
||||
response = requests.delete(f"{self.base_url}/", json={"id": alarm_id})
|
||||
return "message" in self._handle_response(response)
|
||||
|
||||
|
||||
# Example Usage
|
||||
#if __name__ == "__main__":
|
||||
# client = AlarmApiClient("http://localhost:8000")
|
||||
#
|
||||
# # Create a new alarm
|
||||
# new_alarm = {
|
||||
# "name": "Morning Alarm",
|
||||
# "time": "07:00:00",
|
||||
# "repeat_rule": {
|
||||
# "type": "daily"
|
||||
# },
|
||||
# "enabled": True,
|
||||
# "snooze": {
|
||||
# "enabled": True,
|
||||
# "duration": 10,
|
||||
# "max_count": 3
|
||||
# },
|
||||
# "metadata": {
|
||||
# "volume": 80,
|
||||
# "notes": "Wake up alarm"
|
||||
# },
|
||||
# }
|
||||
# alarm_id = client.create_alarm(new_alarm)
|
||||
# print(f"Created alarm with ID: {alarm_id}")
|
||||
#
|
||||
# # Get all alarms
|
||||
# alarms = client.get_alarms()
|
||||
# print(f"Current alarms: {alarms}")
|
||||
#
|
||||
# # Update the alarm
|
||||
# updated_alarm = new_alarm.copy()
|
||||
# updated_alarm["time"] = "07:30:00"
|
||||
# client.update_alarm(alarm_id, updated_alarm)
|
||||
# print(f"Updated alarm ID {alarm_id}")
|
||||
#
|
||||
# # Delete the alarm
|
||||
# client.delete_alarm(alarm_id)
|
||||
# print(f"Deleted alarm ID {alarm_id}")
|
Reference in New Issue
Block a user