From f63609962320d6c57cbcb4fa0e1d6256683fb1e9 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Wed, 11 Jun 2025 00:30:32 +0300 Subject: [PATCH] Maybe functional clients. --- client.py | 222 ++++++++++++++++++++++++++++++++++++------------- test-client.py | 163 ++++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+), 57 deletions(-) create mode 100644 test-client.py diff --git a/client.py b/client.py index 6e40753..f7ef287 100644 --- a/client.py +++ b/client.py @@ -1,22 +1,32 @@ +# realistic_client.py + import os import uuid import time import requests -import random import json import logging from datetime import datetime, timezone +import platform +import socket # For getting local IP +import sys + +# --- Install necessary libraries if not already present --- +try: + import psutil # For system metrics + from pythonping import ping as python_ping # Renamed to avoid conflict with common 'ping' +except ImportError: + print("Required libraries 'psutil' and 'pythonping' not found.") + print("Please install them: pip install psutil pythonping") + sys.exit(1) # --- Client Configuration --- -# The UUID of THIS client node. Generated on startup. -# Can be overridden by an environment variable for persistent client identity. +# The UUID of THIS client node. Generated on startup, or from environment variable. NODE_UUID = os.environ.get("NODE_UUID", str(uuid.uuid4())) # The UUID of the target monitoring service (the main.py server). # IMPORTANT: This MUST match the SERVICE_UUID of your running FastAPI server. # You can get this from the server's initial console output or by accessing its root endpoint ('/'). -# Replace the placeholder string below with your actual server's SERVICE_UUID. -# For example: TARGET_SERVICE_UUID = "a1b2c3d4-e5f6-7890-1234-567890abcdef" TARGET_SERVICE_UUID = os.environ.get( "TARGET_SERVICE_UUID", "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID" ) @@ -27,6 +37,9 @@ SERVER_BASE_URL = os.environ.get("SERVER_URL", "http://localhost:8000") # How often to send status updates (in seconds) UPDATE_INTERVAL_SECONDS = int(os.environ.get("UPDATE_INTERVAL_SECONDS", 5)) +# File to store known peers' UUIDs and IPs for persistence +PEERS_FILE = os.environ.get("PEERS_FILE", f"known_peers_{NODE_UUID}.json") + # --- Logging Configuration --- logging.basicConfig( level=logging.INFO, @@ -34,77 +47,162 @@ logging.basicConfig( ) logger = logging.getLogger("NodeClient") -# --- Global state for simulation --- -uptime_seconds = 0 -# Dictionary to store UUIDs of other nodes received from the server -# Format: { "node_uuid_str": { "last_seen": "iso_timestamp", "ip": "..." } } -known_peers = {} +# --- Global state --- +uptime_seconds = 0 # Will be updated by psutil.boot_time() or incremented +# known_peers will store { "node_uuid_str": "ip_address_str" } +known_peers: dict[str, str] = {} -# --- Data Generation Functions --- +# Determine local IP for self-pinging and reporting to server +LOCAL_IP = "127.0.0.1" # Default fallback +try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) # Connect to an external host (doesn't send data) + LOCAL_IP = s.getsockname()[0] + s.close() +except Exception: + logger.warning("Could not determine local IP, defaulting to 127.0.0.1 for pings.") -def generate_node_status_data(): - """Generates simulated node status metrics.""" +# --- File Operations for Peers --- +def load_peers(): + """Loads known peers (UUID: IP) from a local JSON file.""" + global known_peers + if os.path.exists(PEERS_FILE): + try: + with open(PEERS_FILE, 'r') as f: + loaded_data = json.load(f) + # Ensure loaded peers are in the correct {uuid: ip} format + # Handle cases where the file might contain server's full peer info + temp_peers = {} + for k, v in loaded_data.items(): + if isinstance(v, str): # Already in {uuid: ip} format + temp_peers[k] = v + elif isinstance(v, dict) and 'ip' in v: # Server's full peer info + temp_peers[k] = v['ip'] + known_peers = temp_peers + logger.info(f"Loaded {len(known_peers)} known peers from {PEERS_FILE}") + except json.JSONDecodeError as e: + logger.error(f"Error decoding JSON from {PEERS_FILE}: {e}. Starting with no known peers.") + known_peers = {} # Reset if file is corrupt + except Exception as e: + logger.error(f"Error loading peers from {PEERS_FILE}: {e}. Starting with no known peers.") + known_peers = {} + else: + logger.info(f"No existing peers file found at {PEERS_FILE}.") + +def save_peers(): + """Saves current known peers (UUID: IP) to a local JSON file.""" + try: + with open(PEERS_FILE, 'w') as f: + json.dump(known_peers, f, indent=2) + logger.debug(f"Saved {len(known_peers)} known peers to {PEERS_FILE}") + except Exception as e: + logger.error(f"Error saving peers to {PEERS_FILE}: {e}") + +# --- System Metrics Collection --- +def get_system_metrics(): + """Collects actual system load and memory usage using psutil.""" global uptime_seconds - uptime_seconds += UPDATE_INTERVAL_SECONDS + random.randint(0, 2) # Simulate slight variation + + # Uptime + # psutil.boot_time() returns a timestamp in seconds since epoch + uptime_seconds = int(time.time() - psutil.boot_time()) - # Simulate load average (3 values: 1-min, 5-min, 15-min) - # Load averages will fluctuate. - load_avg = [ - round(random.uniform(0.1, 2.0), 2), - round(random.uniform(0.1, 1.8), 2), - round(random.uniform(0.1, 1.5), 2) - ] + # Load Average + # os.getloadavg() is Unix-specific. psutil provides CPU usage. + # For cross-platform consistency, we'll use psutil.cpu_percent() + # and simulate 5/15 min averages if os.getloadavg is not available. + load_avg = [0.0, 0.0, 0.0] + if hasattr(os, 'getloadavg'): + load_avg = list(os.getloadavg()) + else: # Fallback for Windows or systems without getloadavg + # psutil.cpu_percent() gives current CPU utilization over an interval. + # It's not true load average, but a reasonable proxy for monitoring. + # We'll use a short interval to get a "current" load. + cpu_percent = psutil.cpu_percent(interval=0.5) / 100.0 # CPU usage as a fraction + load_avg = [cpu_percent, cpu_percent * 0.9, cpu_percent * 0.8] # Simulate decay + logger.debug(f"Using psutil.cpu_percent() for load_avg (non-Unix): {load_avg}") - # Simulate memory usage percentage - memory_usage_percent = round(random.uniform(30.0, 90.0), 2) + # Memory Usage + memory = psutil.virtual_memory() + memory_usage_percent = memory.percent return { "uptime_seconds": uptime_seconds, - "load_avg": load_avg, - "memory_usage_percent": memory_usage_percent + "load_avg": [round(l, 2) for l in load_avg], + "memory_usage_percent": round(memory_usage_percent, 2) } -def generate_ping_data(): - """Generates simulated ping latencies to known peers.""" - pings = {} - - # Simulate ping to self (loopback) - always very low latency - pings[str(NODE_UUID)] = round(random.uniform(0.1, 1.0), 2) +# --- Ping Logic --- +def perform_pings(targets: dict[str, str]) -> dict[str, float]: + """Performs actual pings to target IPs and returns latencies in ms.""" + pings_results = {} - # Simulate pings to other known peers - for peer_uuid in known_peers.keys(): - if peer_uuid != str(NODE_UUID): # Don't ping self twice - # Varying latency for external peers - pings[peer_uuid] = round(random.uniform(10.0, 200.0), 2) - return pings + # Ping self (loopback) + try: + # Use a very short timeout for local pings + response_list = python_ping(LOCAL_IP, count=1, timeout=0.5, verbose=False) + if response_list.success: + # pythonping returns response_time in seconds, convert to milliseconds + pings_results[str(NODE_UUID)] = round(response_list.rtt_avg_ms, 2) + else: + pings_results[str(NODE_UUID)] = -1.0 # Indicate failure + logger.debug(f"Ping to self ({LOCAL_IP}): {pings_results[str(NODE_UUID)]}ms") + except Exception as e: + logger.warning(f"Failed to ping self ({LOCAL_IP}): {e}") + pings_results[str(NODE_UUID)] = -1.0 + + # Ping other known peers + for peer_uuid, peer_ip in targets.items(): + if peer_uuid == str(NODE_UUID): + continue # Already pinged self + + try: + # Use a longer timeout for external pings + response_list = python_ping(peer_ip, count=1, timeout=2, verbose=False) + if response_list.success: + pings_results[peer_uuid] = round(response_list.rtt_avg_ms, 2) + else: + pings_results[peer_uuid] = -1.0 # Indicate failure + logger.debug(f"Ping to {peer_uuid} ({peer_ip}): {pings_results[peer_uuid]}ms") + except Exception as e: + logger.warning(f"Failed to ping {peer_uuid} ({peer_ip}): {e}") + pings_results[peer_uuid] = -1.0 + + return pings_results # --- Main Client Logic --- - def run_client(): global known_peers + logger.info(f"Starting Node Client {NODE_UUID}") + logger.info(f"Local IP for pings: {LOCAL_IP}") logger.info(f"Target Service UUID: {TARGET_SERVICE_UUID}") logger.info(f"Server URL: {SERVER_BASE_URL}") logger.info(f"Update Interval: {UPDATE_INTERVAL_SECONDS} seconds") + logger.info(f"Peers file: {PEERS_FILE}") if TARGET_SERVICE_UUID == "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID": logger.error("-" * 50) logger.error("ERROR: TARGET_SERVICE_UUID is not set correctly!") - logger.error("Please replace 'REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID' in client.py") + logger.error("Please replace 'REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID' in the script") logger.error("or set the environment variable TARGET_SERVICE_UUID.") logger.error("You can find the server's UUID by running main.py and checking its console output") logger.error("or by visiting 'http://localhost:8000/' in your browser.") logger.error("-" * 50) return + # Load known peers on startup + load_peers() + while True: try: - # 1. Generate status data - status_data = generate_node_status_data() - ping_data = generate_ping_data() + # 1. Get real system metrics + status_data = get_system_metrics() + + # 2. Perform pings to known peers (and self) + ping_data = perform_pings(known_peers) - # 2. Construct the payload matching the StatusUpdate model - # Use datetime.now(timezone.utc) for timezone-aware UTC timestamp + # 3. Construct the payload payload = { "node": str(NODE_UUID), "timestamp": datetime.now(timezone.utc).isoformat(), @@ -112,30 +210,40 @@ def run_client(): "pings": ping_data } - # 3. Define the endpoint URL + # 4. Define the endpoint URL endpoint_url = f"{SERVER_BASE_URL}/{TARGET_SERVICE_UUID}/{NODE_UUID}/" - # 4. Send the PUT request - logger.info(f"Sending update to {endpoint_url}. Uptime: {status_data['uptime_seconds']}s, Load: {status_data['load_avg']}, Pings: {len(ping_data)}") + # 5. Send the PUT request + logger.info( + f"Sending update. Uptime: {status_data['uptime_seconds']}s, " + f"Load: {status_data['load_avg']}, Mem: {status_data['memory_usage_percent']}%, " + f"Pings: {len(ping_data)}" + ) - response = requests.put(endpoint_url, json=payload, timeout=10) # 10-second timeout + response = requests.put(endpoint_url, json=payload, timeout=15) # Increased timeout - # 5. Process the response + # 6. Process the response if response.status_code == 200: response_data = response.json() logger.info(f"Successfully sent update. Server message: '{response_data.get('message')}'") if "peers" in response_data and isinstance(response_data["peers"], dict): - # Update known_peers, converting keys to strings from JSON - new_peers = {k: v for k, v in response_data["peers"].items()} + # Update known_peers from server response + updated_peers = {} + # The server returns {uuid: {"last_seen": "...", "ip": "..."}} + # We only need the UUID and IP for pinging. + for peer_uuid, peer_info in response_data["peers"].items(): + if 'ip' in peer_info: + updated_peers[peer_uuid] = peer_info['ip'] - # Log if new peers are discovered - newly_discovered = set(new_peers.keys()) - set(known_peers.keys()) + # Log newly discovered peers + newly_discovered = set(updated_peers.keys()) - set(known_peers.keys()) if newly_discovered: logger.info(f"Discovered new peer(s): {', '.join(newly_discovered)}") - known_peers = new_peers - logger.info(f"Total known peers (including self if returned by server): {len(known_peers)}") + known_peers = updated_peers + save_peers() # Save updated peers to file for persistence + logger.info(f"Total known peers for pinging: {len(known_peers)}") else: logger.warning("Server response did not contain a valid 'peers' field or it was empty.") else: @@ -146,7 +254,7 @@ def run_client(): logger.error(f"Server validation error (422 Unprocessable Entity): {response.json()}") except requests.exceptions.Timeout: - logger.error(f"Request timed out after {10} seconds. Is the server running and responsive?") + logger.error(f"Request timed out after {15} seconds. Is the server running and responsive?") except requests.exceptions.ConnectionError as e: logger.error(f"Connection error: {e}. Is the server running at {SERVER_BASE_URL}?") except requests.exceptions.RequestException as e: @@ -156,7 +264,7 @@ def run_client(): except Exception as e: logger.error(f"An unexpected error occurred in the client loop: {e}", exc_info=True) - # 6. Wait for the next update + # 7. Wait for the next update time.sleep(UPDATE_INTERVAL_SECONDS) if __name__ == "__main__": diff --git a/test-client.py b/test-client.py new file mode 100644 index 0000000..6e40753 --- /dev/null +++ b/test-client.py @@ -0,0 +1,163 @@ +import os +import uuid +import time +import requests +import random +import json +import logging +from datetime import datetime, timezone + +# --- Client Configuration --- +# The UUID of THIS client node. Generated on startup. +# Can be overridden by an environment variable for persistent client identity. +NODE_UUID = os.environ.get("NODE_UUID", str(uuid.uuid4())) + +# The UUID of the target monitoring service (the main.py server). +# IMPORTANT: This MUST match the SERVICE_UUID of your running FastAPI server. +# You can get this from the server's initial console output or by accessing its root endpoint ('/'). +# Replace the placeholder string below with your actual server's SERVICE_UUID. +# For example: TARGET_SERVICE_UUID = "a1b2c3d4-e5f6-7890-1234-567890abcdef" +TARGET_SERVICE_UUID = os.environ.get( + "TARGET_SERVICE_UUID", "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID" +) + +# The base URL of the FastAPI monitoring service +SERVER_BASE_URL = os.environ.get("SERVER_URL", "http://localhost:8000") + +# How often to send status updates (in seconds) +UPDATE_INTERVAL_SECONDS = int(os.environ.get("UPDATE_INTERVAL_SECONDS", 5)) + +# --- Logging Configuration --- +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger("NodeClient") + +# --- Global state for simulation --- +uptime_seconds = 0 +# Dictionary to store UUIDs of other nodes received from the server +# Format: { "node_uuid_str": { "last_seen": "iso_timestamp", "ip": "..." } } +known_peers = {} + +# --- Data Generation Functions --- + +def generate_node_status_data(): + """Generates simulated node status metrics.""" + global uptime_seconds + uptime_seconds += UPDATE_INTERVAL_SECONDS + random.randint(0, 2) # Simulate slight variation + + # Simulate load average (3 values: 1-min, 5-min, 15-min) + # Load averages will fluctuate. + load_avg = [ + round(random.uniform(0.1, 2.0), 2), + round(random.uniform(0.1, 1.8), 2), + round(random.uniform(0.1, 1.5), 2) + ] + + # Simulate memory usage percentage + memory_usage_percent = round(random.uniform(30.0, 90.0), 2) + + return { + "uptime_seconds": uptime_seconds, + "load_avg": load_avg, + "memory_usage_percent": memory_usage_percent + } + +def generate_ping_data(): + """Generates simulated ping latencies to known peers.""" + pings = {} + + # Simulate ping to self (loopback) - always very low latency + pings[str(NODE_UUID)] = round(random.uniform(0.1, 1.0), 2) + + # Simulate pings to other known peers + for peer_uuid in known_peers.keys(): + if peer_uuid != str(NODE_UUID): # Don't ping self twice + # Varying latency for external peers + pings[peer_uuid] = round(random.uniform(10.0, 200.0), 2) + return pings + +# --- Main Client Logic --- + +def run_client(): + global known_peers + logger.info(f"Starting Node Client {NODE_UUID}") + logger.info(f"Target Service UUID: {TARGET_SERVICE_UUID}") + logger.info(f"Server URL: {SERVER_BASE_URL}") + logger.info(f"Update Interval: {UPDATE_INTERVAL_SECONDS} seconds") + + if TARGET_SERVICE_UUID == "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID": + logger.error("-" * 50) + logger.error("ERROR: TARGET_SERVICE_UUID is not set correctly!") + logger.error("Please replace 'REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID' in client.py") + logger.error("or set the environment variable TARGET_SERVICE_UUID.") + logger.error("You can find the server's UUID by running main.py and checking its console output") + logger.error("or by visiting 'http://localhost:8000/' in your browser.") + logger.error("-" * 50) + return + + while True: + try: + # 1. Generate status data + status_data = generate_node_status_data() + ping_data = generate_ping_data() + + # 2. Construct the payload matching the StatusUpdate model + # Use datetime.now(timezone.utc) for timezone-aware UTC timestamp + payload = { + "node": str(NODE_UUID), + "timestamp": datetime.now(timezone.utc).isoformat(), + "status": status_data, + "pings": ping_data + } + + # 3. Define the endpoint URL + endpoint_url = f"{SERVER_BASE_URL}/{TARGET_SERVICE_UUID}/{NODE_UUID}/" + + # 4. Send the PUT request + logger.info(f"Sending update to {endpoint_url}. Uptime: {status_data['uptime_seconds']}s, Load: {status_data['load_avg']}, Pings: {len(ping_data)}") + + response = requests.put(endpoint_url, json=payload, timeout=10) # 10-second timeout + + # 5. Process the response + if response.status_code == 200: + response_data = response.json() + logger.info(f"Successfully sent update. Server message: '{response_data.get('message')}'") + + if "peers" in response_data and isinstance(response_data["peers"], dict): + # Update known_peers, converting keys to strings from JSON + new_peers = {k: v for k, v in response_data["peers"].items()} + + # Log if new peers are discovered + newly_discovered = set(new_peers.keys()) - set(known_peers.keys()) + if newly_discovered: + logger.info(f"Discovered new peer(s): {', '.join(newly_discovered)}") + + known_peers = new_peers + logger.info(f"Total known peers (including self if returned by server): {len(known_peers)}") + else: + logger.warning("Server response did not contain a valid 'peers' field or it was empty.") + else: + logger.error(f"Failed to send update. Status code: {response.status_code}, Response: {response.text}") + if response.status_code == 404: + logger.error("Hint: The TARGET_SERVICE_UUID might be incorrect, or the server isn't running at this endpoint.") + elif response.status_code == 422: # Pydantic validation error + logger.error(f"Server validation error (422 Unprocessable Entity): {response.json()}") + + except requests.exceptions.Timeout: + logger.error(f"Request timed out after {10} seconds. Is the server running and responsive?") + except requests.exceptions.ConnectionError as e: + logger.error(f"Connection error: {e}. Is the server running at {SERVER_BASE_URL}?") + except requests.exceptions.RequestException as e: + logger.error(f"An unexpected request error occurred: {e}", exc_info=True) + except json.JSONDecodeError: + logger.error(f"Failed to decode JSON response: {response.text}. Is the server returning valid JSON?") + except Exception as e: + logger.error(f"An unexpected error occurred in the client loop: {e}", exc_info=True) + + # 6. Wait for the next update + time.sleep(UPDATE_INTERVAL_SECONDS) + +if __name__ == "__main__": + run_client() \ No newline at end of file