From d4c71e7d2c0eb6a0fec20c40e9ad7b16e92fb2af Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Wed, 11 Jun 2025 13:57:36 +0300 Subject: [PATCH] Functional multi node testing script. --- test-client-multi-node.py | 440 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 440 insertions(+) create mode 100644 test-client-multi-node.py diff --git a/test-client-multi-node.py b/test-client-multi-node.py new file mode 100644 index 0000000..caf2d0a --- /dev/null +++ b/test-client-multi-node.py @@ -0,0 +1,440 @@ +import os +import uuid +import time +import requests +import random +import json +import logging +import threading +import socket +from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor +import argparse +from requests.adapters import HTTPAdapter +from urllib3.util.connection import create_connection + +# --- Multi-Node Client Configuration --- +TARGET_SERVICE_UUID = os.environ.get( + "TARGET_SERVICE_UUID", "c7c883fd-46f3-4b14-a727-d805ae0a6ec0" +) + +SERVER_BASE_URL = os.environ.get("SERVER_URL", "http://localhost:8000") +UPDATE_INTERVAL_SECONDS = int(os.environ.get("UPDATE_INTERVAL_SECONDS", 5)) +NUM_NODES = int(os.environ.get("NUM_NODES", 3)) + +# Base IP for loopback binding (127.0.0.x where x starts from this base) +LOOPBACK_IP_BASE = int(os.environ.get("LOOPBACK_IP_BASE", 2)) # Start from 127.0.0.2 + +# --- Logging Configuration --- +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - [%(thread)d] - %(message)s' +) +logger = logging.getLogger("MultiNodeClient") + +# --- Custom HTTP Adapter for Source IP Binding --- +class SourceIPHTTPAdapter(HTTPAdapter): + def __init__(self, source_ip, *args, **kwargs): + self.source_ip = source_ip + super().__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + # Override the socket creation to bind to specific source IP + def custom_create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, + source_address=None, socket_options=None): + # Force our custom source address + return create_connection( + address, + timeout, + source_address=(self.source_ip, 0), # 0 = any available port + socket_options=socket_options + ) + + # Monkey patch the connection creation + original_create_connection = socket.create_connection + socket.create_connection = custom_create_connection + + try: + result = super().init_poolmanager(*args, **kwargs) + finally: + # Restore original function + socket.create_connection = original_create_connection + + return result + +# --- Enhanced Node Class with IP Binding --- +class SimulatedNode: + def __init__(self, node_id: int, total_nodes: int, server_url: str, service_uuid: str, update_interval: int, ip_base: int): + self.node_id = node_id + self.node_uuid = str(uuid.uuid4()) + self.server_url = server_url # Store server URL + self.service_uuid = service_uuid # Store service UUID + self.update_interval = update_interval # Store update interval + self.uptime_seconds = 0 + self.known_peers = {} + self.total_nodes = total_nodes + self.running = False + + # Assign unique loopback IP to this node using the passed ip_base + self.source_ip = f"127.0.0.{ip_base + node_id - 1}" + + # Create requests session with custom adapter for IP binding + self.session = requests.Session() + adapter = SourceIPHTTPAdapter(self.source_ip) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + # Each node gets slightly different characteristics + self.base_load = random.uniform(0.2, 1.0) + self.base_memory = random.uniform(40.0, 70.0) + self.load_variance = random.uniform(0.1, 0.5) + self.memory_variance = random.uniform(5.0, 15.0) + + # Some nodes might be "problematic" (higher load/memory) + if random.random() < 0.2: # 20% chance of being a "problematic" node + self.base_load *= 2.0 + self.base_memory += 20.0 + logger.info(f"Node {self.node_id} ({self.node_uuid[:8]}) will simulate high resource usage (IP: {self.source_ip})") + + logger.info(f"Node {self.node_id} will bind to source IP: {self.source_ip}") + + def test_ip_binding(self): + """Test if we can bind to the assigned IP address.""" + try: + # Try to create a socket and bind to the IP + test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + test_socket.bind((self.source_ip, 0)) # Bind to any available port + test_socket.close() + logger.debug(f"Node {self.node_id} successfully tested binding to {self.source_ip}") + return True + except OSError as e: + logger.error(f"Node {self.node_id} cannot bind to {self.source_ip}: {e}") + logger.error("Make sure the IP address is available on your loopback interface.") + logger.error("You might need to add it with: sudo ifconfig lo0 alias {self.source_ip} (macOS)") + logger.error("Or: sudo ip addr add {self.source_ip}/8 dev lo (Linux)") + return False + + def generate_node_status_data(self): + """Generates simulated node status metrics with per-node characteristics.""" + + self.uptime_seconds += self.update_interval + random.randint(-1, 2) + + # Generate load with some randomness but consistent per-node baseline + load_1min = max(0.1, self.base_load + random.uniform(-self.load_variance, self.load_variance)) + load_5min = max(0.1, load_1min * random.uniform(0.8, 1.0)) + load_15min = max(0.1, load_5min * random.uniform(0.8, 1.0)) + + load_avg = [ + round(load_1min, 2), + round(load_5min, 2), + round(load_15min, 2) + ] + + # Generate memory usage with baseline + variance + memory_usage = max(10.0, min(95.0, + self.base_memory + random.uniform(-self.memory_variance, self.memory_variance))) + + return { + "uptime_seconds": self.uptime_seconds, + "load_avg": load_avg, + "memory_usage_percent": round(memory_usage, 2) + } + + def generate_ping_data(self): + """Generates simulated ping latencies to known peers.""" + pings = {} + + # Ping to self (loopback) + pings[self.node_uuid] = round(random.uniform(0.1, 1.5), 2) + + # Ping to known peers + for peer_uuid in self.known_peers.keys(): + if peer_uuid != self.node_uuid: + # Simulate network latency with some consistency per peer + base_latency = random.uniform(5.0, 100.0) + variation = random.uniform(-10.0, 10.0) + latency = max(0.1, base_latency + variation) + pings[peer_uuid] = round(latency, 2) + + return pings + + def send_update(self): + """Sends a single status update to the server using bound IP.""" + try: + status_data = self.generate_node_status_data() + ping_data = self.generate_ping_data() + + payload = { + "node": self.node_uuid, + "timestamp": datetime.now(timezone.utc).isoformat(), + "status": status_data, + "pings": ping_data + } + endpoint_url = f"{self.server_url}/{self.service_uuid}/{self.node_uuid}/" + + logger.debug(f"Node {self.node_id} ({self.source_ip}) sending update. " + f"Uptime: {status_data['uptime_seconds']}s, " + f"Load: {status_data['load_avg'][0]}, Memory: {status_data['memory_usage_percent']}%, " + f"Pings: {len(ping_data)}") + + # Use the custom session with IP binding + response = self.session.put(endpoint_url, json=payload, timeout=10) + + if response.status_code == 200: + response_data = response.json() + + if "peers" in response_data and isinstance(response_data["peers"], dict): + new_peers = {k: v for k, v in response_data["peers"].items()} + + # Log new peer discoveries + newly_discovered = set(new_peers.keys()) - set(self.known_peers.keys()) + if newly_discovered: + logger.info(f"Node {self.node_id} ({self.source_ip}) discovered {len(newly_discovered)} new peer(s)") + + self.known_peers = new_peers + + if len(newly_discovered) > 0 or len(self.known_peers) != self.total_nodes - 1: + logger.debug(f"Node {self.node_id} ({self.source_ip}) knows {len(self.known_peers)} peers " + f"(expected {self.total_nodes - 1})") + + return True + else: + logger.error(f"Node {self.node_id} ({self.source_ip}) failed to send update. " + f"Status: {response.status_code}, Response: {response.text}") + return False + + except requests.exceptions.Timeout: + logger.error(f"Node {self.node_id} ({self.source_ip}) request timed out") + return False + except requests.exceptions.ConnectionError as e: + logger.error(f"Node {self.node_id} ({self.source_ip}) connection error: {e}") + return False + except Exception as e: + logger.error(f"Node {self.node_id} ({self.source_ip}) unexpected error: {e}") + return False + + def run(self): + """Main loop for this simulated node.""" + # Test IP binding before starting + if not self.test_ip_binding(): + logger.error(f"Node {self.node_id} cannot start due to IP binding failure") + return + + self.running = True + logger.info(f"Starting Node {self.node_id} with UUID: {self.node_uuid} (IP: {self.source_ip})") + + # Add some initial delay to stagger node starts + initial_delay = self.node_id * 0.5 + time.sleep(initial_delay) + + consecutive_failures = 0 + + while self.running: + try: + success = self.send_update() + + if success: + consecutive_failures = 0 + else: + consecutive_failures += 1 + if consecutive_failures >= 3: + logger.warning(f"Node {self.node_id} ({self.source_ip}) has failed {consecutive_failures} consecutive updates") + + # Add some jitter to prevent thundering herd + jitter = random.uniform(-1.0, 1.0) + sleep_time = max(1.0, self.update_interval + jitter) + time.sleep(sleep_time) + + except KeyboardInterrupt: + logger.info(f"Node {self.node_id} ({self.source_ip}) received interrupt signal") + break + except Exception as e: + logger.error(f"Node {self.node_id} ({self.source_ip}) unexpected error in main loop: {e}") + time.sleep(self.update_interval) + + logger.info(f"Node {self.node_id} ({self.source_ip}) stopped") + + def stop(self): + """Stop the node.""" + self.running = False + +# --- Multi-Node Manager --- +class MultiNodeManager: + def __init__(self, num_nodes: int, server_url: str, service_uuid: str, update_interval: int, ip_base: int): + self.num_nodes = num_nodes + self.server_url = server_url + self.service_uuid = service_uuid + self.update_interval = update_interval + self.ip_base = ip_base + self.nodes = [] + self.threads = [] + self.running = False + + # Create simulated nodes + for i in range(num_nodes): + node = SimulatedNode(i + 1, num_nodes, server_url, service_uuid, update_interval, ip_base) + self.nodes.append(node) + + def check_ip_availability(self): + """Check if all required IP addresses are available.""" + logger.info("Checking IP address availability...") + all_available = True + + for node in self.nodes: + if not node.test_ip_binding(): + all_available = False + + if not all_available: + logger.error("Some IP addresses are not available. See individual node errors above.") + logger.info("To add loopback IP addresses:") + logger.info(" Linux: sudo ip addr add 127.0.0.X/8 dev lo") + logger.info(" macOS: sudo ifconfig lo0 alias 127.0.0.X") + # Use self.ip_base for the range + logger.info(f" Where X ranges from {self.ip_base} to {self.ip_base + self.num_nodes - 1}") + return False + + logger.info("All IP addresses are available!") + return True + + def start_all_nodes(self): + """Start all simulated nodes in separate threads.""" + if not self.check_ip_availability(): + return False + + logger.info(f"Starting {self.num_nodes} simulated nodes with unique IP addresses...") + self.running = True + + for node in self.nodes: + thread = threading.Thread(target=node.run, name=f"Node-{node.node_id}") + thread.daemon = True + self.threads.append(thread) + thread.start() + + logger.info(f"All {self.num_nodes} nodes started") + return True + + def stop_all_nodes(self): + """Stop all simulated nodes.""" + logger.info("Stopping all nodes...") + self.running = False + + for node in self.nodes: + node.stop() + + # Wait for threads to finish + for thread in self.threads: + thread.join(timeout=5.0) + + logger.info("All nodes stopped") + + def print_status(self): + """Print current status of all nodes.""" + logger.info(f"=== Multi-Node Status ({self.num_nodes} nodes) ===") + for node in self.nodes: + logger.info(f"Node {node.node_id}: IP={node.source_ip}, UUID={node.node_uuid[:8]}..., " + f"Uptime={node.uptime_seconds}s, Peers={len(node.known_peers)}") + +def setup_loopback_ips(num_nodes, base_ip): + """Helper function to show commands for setting up loopback IPs.""" + logger.info("=== Loopback IP Setup Commands ===") + logger.info("Run these commands to add the required loopback IP addresses:") + logger.info("") + + # Detect OS and show appropriate commands + import platform + system = platform.system().lower() + + for i in range(num_nodes): + ip = f"127.0.0.{base_ip + i}" + if system == "linux": + logger.info(f"sudo ip addr add {ip}/8 dev lo") + elif system == "darwin": # macOS + logger.info(f"sudo ifconfig lo0 alias {ip}") + else: + logger.info(f"Add {ip} to loopback interface (OS: {system})") + + logger.info("") + logger.info("To remove them later:") + for i in range(num_nodes): + ip = f"127.0.0.{base_ip + i}" + if system == "linux": + logger.info(f"sudo ip addr del {ip}/8 dev lo") + elif system == "darwin": # macOS + logger.info(f"sudo ifconfig lo0 -alias {ip}") + + logger.info("=" * 40) + +def main(): + parser = argparse.ArgumentParser(description='Multi-node test client with unique IP binding') + parser.add_argument('--nodes', type=int, default=NUM_NODES, + help=f'Number of simulated nodes (default: {NUM_NODES})') + parser.add_argument('--interval', type=int, default=UPDATE_INTERVAL_SECONDS, + help=f'Update interval in seconds (default: {UPDATE_INTERVAL_SECONDS})') + parser.add_argument('--server', type=str, default=SERVER_BASE_URL, + help=f'Server URL (default: {SERVER_BASE_URL})') + parser.add_argument('--service-uuid', type=str, default=TARGET_SERVICE_UUID, + help='Target service UUID') + parser.add_argument('--ip-base', type=int, default=LOOPBACK_IP_BASE, + help=f'Starting IP for 127.0.0.X (default: {LOOPBACK_IP_BASE})') + parser.add_argument('--setup-help', action='store_true', + help='Show commands to set up loopback IP addresses') + parser.add_argument('--verbose', '-v', action='store_true', + help='Enable verbose logging') + + args = parser.parse_args() + + num_nodes = args.nodes + update_interval = args.interval + server_url = args.server + service_uuid = args.service_uuid + ip_base = args.ip_base + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + if args.setup_help: + setup_loopback_ips(num_nodes, ip_base) + return + + # Validate configuration + if service_uuid == "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID": + logger.error("=" * 60) + logger.error("ERROR: TARGET_SERVICE_UUID is not set correctly!") + logger.error("Please set it via --service-uuid argument or TARGET_SERVICE_UUID environment variable.") + logger.error("You can find the server's UUID by running main.py and checking its console output") + logger.error("or by visiting the server's root endpoint in your browser.") + logger.error("=" * 60) + return + + logger.info("=" * 60) + logger.info("Multi-Node Test Client Configuration:") + logger.info(f" Number of nodes: {num_nodes}") + logger.info(f" Update interval: {update_interval} seconds") + logger.info(f" Server URL: {server_url}") + logger.info(f" Target Service UUID: {service_uuid}") + logger.info(f" IP range: 127.0.0.{ip_base} - 127.0.0.{ip_base + num_nodes - 1}") + logger.info("=" * 60) + + # Create and start the multi-node manager + manager = MultiNodeManager(num_nodes, server_url, service_uuid, update_interval, ip_base) + + try: + if not manager.start_all_nodes(): + logger.error("Failed to start nodes. Check IP availability.") + setup_loopback_ips(num_nodes, ip_base) + return + + # Main monitoring loop + while True: + time.sleep(30) # Print status every 30 seconds + manager.print_status() + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + except Exception as e: + logger.error(f"Unexpected error in main: {e}", exc_info=True) + finally: + manager.stop_all_nodes() + +if __name__ == "__main__": + main()