214 lines
6.4 KiB
Python
214 lines
6.4 KiB
Python
import requests
|
|
import json
|
|
import uuid
|
|
import argparse
|
|
import sys
|
|
|
|
# --- Configuration ---
|
|
DEFAULT_BASE_URL = "http://localhost:8500" # Adjust if running elsewhere
|
|
|
|
# --- Helper Functions ---
|
|
|
|
|
|
def make_request(
|
|
method: str,
|
|
endpoint: str,
|
|
api_token: str,
|
|
base_url: str = DEFAULT_BASE_URL,
|
|
params: dict = None,
|
|
json_data: dict = None,
|
|
):
|
|
"""Helper function to make authenticated requests."""
|
|
headers = {
|
|
"X-API-Token": api_token,
|
|
"Accept": "application/json", # Indicate we prefer JSON responses
|
|
}
|
|
if json_data:
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
url = f"{base_url}{endpoint}"
|
|
|
|
try:
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
json=json_data,
|
|
timeout=10, # Add a timeout
|
|
)
|
|
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
|
|
# Try to parse JSON, but handle cases where response might be empty (like 200 OK on deregister)
|
|
try:
|
|
return response.json()
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"status_code": response.status_code,
|
|
"content": response.text,
|
|
} # Return status/text if no JSON body
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error making request to {method} {url}: {e}", file=sys.stderr)
|
|
return None
|
|
except Exception as e:
|
|
print(f"An unexpected error occurred: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def register_service(
|
|
api_token: str,
|
|
service_name: str,
|
|
address: str,
|
|
port: int,
|
|
tags: list = None,
|
|
metadata: dict = None,
|
|
service_id: str = None,
|
|
):
|
|
"""Registers a service instance."""
|
|
if service_id is None:
|
|
service_id = f"{service_name}-{uuid.uuid4()}" # Generate a unique ID
|
|
|
|
instance_data = {
|
|
"id": service_id,
|
|
"name": service_name,
|
|
"address": address,
|
|
"port": port,
|
|
"tags": tags if tags is not None else [],
|
|
"metadata": metadata if metadata is not None else {},
|
|
# Health defaults to 'passing' on the server side if not provided
|
|
}
|
|
|
|
print(f"[*] Registering service: {json.dumps(instance_data)}")
|
|
result = make_request(
|
|
"POST",
|
|
"/v1/agent/service/register",
|
|
api_token=api_token,
|
|
json_data=instance_data,
|
|
)
|
|
if result:
|
|
print(f"[+] Registration successful: {result}")
|
|
return service_id # Return the ID used
|
|
else:
|
|
print(f"[-] Registration failed.")
|
|
return None
|
|
|
|
|
|
def deregister_service(api_token: str, service_id: str):
|
|
"""Deregisters a service instance."""
|
|
print(f"[*] Deregistering service ID: {service_id}")
|
|
endpoint = f"/v1/agent/service/deregister/{service_id}"
|
|
result = make_request(
|
|
"PUT", endpoint, api_token=api_token # Note: Consul uses PUT for deregister
|
|
)
|
|
if result:
|
|
# Check status code as PUT might return 200 OK with no body on success
|
|
if isinstance(result, dict) and result.get("status_code") == 200:
|
|
print(f"[+] Deregistration successful (Status Code 200).")
|
|
return True
|
|
else:
|
|
print(f"[+] Deregistration response: {result}")
|
|
return True # Assume success if no exception raised
|
|
else:
|
|
print(f"[-] Deregistration failed.")
|
|
return False
|
|
|
|
|
|
def get_service_instances(
|
|
api_token: str, service_name: str, passing_only: bool = False
|
|
):
|
|
"""Gets instances for a specific service."""
|
|
print(f"[*] Querying service '{service_name}' (Passing only: {passing_only})")
|
|
if passing_only:
|
|
endpoint = f"/v1/health/service/{service_name}"
|
|
params = {"passing": "true"}
|
|
else:
|
|
endpoint = f"/v1/catalog/service/{service_name}"
|
|
params = None
|
|
|
|
result = make_request("GET", endpoint, api_token=api_token, params=params)
|
|
|
|
if result is not None: # Result could be an empty list [] which is valid
|
|
print(f"[+] Found {len(result)} instance(s):")
|
|
print(json.dumps(result, indent=2))
|
|
return result
|
|
else:
|
|
print(f"[-] Failed to query service '{service_name}'.")
|
|
return None
|
|
|
|
|
|
def list_all_services(api_token: str):
|
|
"""Lists all known service names."""
|
|
print("[*] Listing all known service names...")
|
|
endpoint = "/v1/catalog/services"
|
|
result = make_request("GET", endpoint, api_token=api_token)
|
|
if result:
|
|
print("[+] Known services:")
|
|
print(json.dumps(result, indent=2))
|
|
return result
|
|
else:
|
|
print("[-] Failed to list services.")
|
|
return None
|
|
|
|
|
|
# --- Main Execution ---
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="MiniDiscovery Client Example")
|
|
parser.add_argument(
|
|
"-t",
|
|
"--token",
|
|
required=True,
|
|
help="API Token for MiniDiscovery authentication",
|
|
)
|
|
parser.add_argument(
|
|
"--base-url",
|
|
default=DEFAULT_BASE_URL,
|
|
help=f"Base URL of the MiniDiscovery API (default: {DEFAULT_BASE_URL})",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print(f"--- MiniDiscovery Client Demo (Using API at {args.base_url}) ---")
|
|
|
|
# 1. Register a couple of services
|
|
print("\n--- Step 1: Register Services ---")
|
|
web_id_1 = register_service(
|
|
args.token,
|
|
"web",
|
|
"192.168.1.10",
|
|
8080,
|
|
tags=["frontend", "prod"],
|
|
metadata={"version": "1.2"},
|
|
)
|
|
web_id_2 = register_service(
|
|
args.token,
|
|
"web",
|
|
"192.168.1.11",
|
|
8080,
|
|
tags=["frontend", "prod"],
|
|
metadata={"version": "1.2"},
|
|
)
|
|
db_id_1 = register_service(
|
|
args.token, "database", "10.0.0.5", 5432, tags=["backend", "prod-db"]
|
|
)
|
|
|
|
# 2. List all service names
|
|
print("\n--- Step 2: List All Service Names ---")
|
|
list_all_services(args.token)
|
|
|
|
# 3. Query specific services
|
|
print("\n--- Step 3: Query Services ---")
|
|
get_service_instances(args.token, "web")
|
|
get_service_instances(
|
|
args.token, "database", passing_only=True
|
|
) # Assume health check runs
|
|
get_service_instances(args.token, "nonexistent-service")
|
|
|
|
# 4. Deregister one instance
|
|
print("\n--- Step 4: Deregister an Instance ---")
|
|
if web_id_1:
|
|
deregister_service(args.token, web_id_1)
|
|
|
|
# 5. Query again to see the change
|
|
print("\n--- Step 5: Query 'web' Service Again ---")
|
|
get_service_instances(args.token, "web")
|
|
|
|
print("\n--- Demo Complete ---")
|