Files
node-monitor/test-client-multi-node.py

549 lines
19 KiB
Python

import os
import uuid
import time
import requests
import random
import json
import logging
import threading
import socket
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor
import argparse
from requests.adapters import HTTPAdapter
from urllib3.util.connection import create_connection
# --- Multi-Node Client Configuration ---
TARGET_SERVICE_UUID = os.environ.get(
"TARGET_SERVICE_UUID", "ab73d00a-8169-46bb-997d-f13e5f760973"
)
SERVER_BASE_URL = os.environ.get("SERVER_URL", "http://localhost:8000")
UPDATE_INTERVAL_SECONDS = int(os.environ.get("UPDATE_INTERVAL_SECONDS", 5))
NUM_NODES = int(os.environ.get("NUM_NODES", 3))
# Base IP for loopback binding (127.0.0.x where x starts from this base)
LOOPBACK_IP_BASE = int(os.environ.get("LOOPBACK_IP_BASE", 2)) # Start from 127.0.0.2
# --- Logging Configuration ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - [%(thread)d] - %(message)s",
)
logger = logging.getLogger("MultiNodeClient")
# --- Custom HTTP Adapter for Source IP Binding ---
class SourceIPHTTPAdapter(HTTPAdapter):
def __init__(self, source_ip, *args, **kwargs):
self.source_ip = source_ip
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
# Override the socket creation to bind to specific source IP
def custom_create_connection(
address,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None,
socket_options=None,
):
# Force our custom source address
return create_connection(
address,
timeout,
source_address=(self.source_ip, 0), # 0 = any available port
socket_options=socket_options,
)
# Monkey patch the connection creation
original_create_connection = socket.create_connection
socket.create_connection = custom_create_connection
try:
result = super().init_poolmanager(*args, **kwargs)
finally:
# Restore original function
socket.create_connection = original_create_connection
return result
# --- Enhanced Node Class with IP Binding ---
class SimulatedNode:
def __init__(
self,
node_id: int,
total_nodes: int,
server_url: str,
service_uuid: str,
update_interval: int,
ip_base: int,
):
self.node_id = node_id
self.node_uuid = str(uuid.uuid4())
self.server_url = server_url # Store server URL
self.service_uuid = service_uuid # Store service UUID
self.update_interval = update_interval # Store update interval
self.uptime_seconds = 0
self.known_peers = {}
self.total_nodes = total_nodes
self.running = False
# Assign unique loopback IP to this node using the passed ip_base
self.source_ip = f"127.0.0.{ip_base + node_id - 1}"
# Create requests session with custom adapter for IP binding
self.session = requests.Session()
adapter = SourceIPHTTPAdapter(self.source_ip)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Each node gets slightly different characteristics
self.base_load = random.uniform(0.2, 1.0)
self.base_memory = random.uniform(40.0, 70.0)
self.load_variance = random.uniform(0.1, 0.5)
self.memory_variance = random.uniform(5.0, 15.0)
# Some nodes might be "problematic" (higher load/memory)
if random.random() < 0.2: # 20% chance of being a "problematic" node
self.base_load *= 2.0
self.base_memory += 20.0
logger.info(
f"Node {self.node_id} ({self.node_uuid[:8]}) will simulate high resource usage (IP: {self.source_ip})"
)
logger.info(f"Node {self.node_id} will bind to source IP: {self.source_ip}")
def test_ip_binding(self):
"""Test if we can bind to the assigned IP address."""
try:
# Try to create a socket and bind to the IP
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.bind((self.source_ip, 0)) # Bind to any available port
test_socket.close()
logger.debug(
f"Node {self.node_id} successfully tested binding to {self.source_ip}"
)
return True
except OSError as e:
logger.error(f"Node {self.node_id} cannot bind to {self.source_ip}: {e}")
logger.error(
"Make sure the IP address is available on your loopback interface."
)
logger.error(
"You might need to add it with: sudo ifconfig lo0 alias {self.source_ip} (macOS)"
)
logger.error("Or: sudo ip addr add {self.source_ip}/8 dev lo (Linux)")
return False
def generate_node_status_data(self):
"""Generates simulated node status metrics with per-node characteristics."""
self.uptime_seconds += self.update_interval + random.randint(-1, 2)
# Generate load with some randomness but consistent per-node baseline
load_1min = max(
0.1,
self.base_load + random.uniform(-self.load_variance, self.load_variance),
)
load_5min = max(0.1, load_1min * random.uniform(0.8, 1.0))
load_15min = max(0.1, load_5min * random.uniform(0.8, 1.0))
load_avg = [round(load_1min, 2), round(load_5min, 2), round(load_15min, 2)]
# Generate memory usage with baseline + variance
memory_usage = max(
10.0,
min(
95.0,
self.base_memory
+ random.uniform(-self.memory_variance, self.memory_variance),
),
)
return {
"uptime_seconds": self.uptime_seconds,
"load_avg": load_avg,
"memory_usage_percent": round(memory_usage, 2),
}
def generate_ping_data(self):
"""Generates simulated ping latencies to known peers."""
pings = {}
# Ping to self (loopback)
pings[self.node_uuid] = round(random.uniform(0.1, 1.5), 2)
# Ping to known peers
for peer_uuid in self.known_peers.keys():
if peer_uuid != self.node_uuid:
# Simulate network latency with some consistency per peer
base_latency = random.uniform(5.0, 100.0)
variation = random.uniform(-10.0, 10.0)
latency = max(0.1, base_latency + variation)
pings[peer_uuid] = round(latency, 2)
return pings
def send_update(self):
"""Sends a single status update to the server using bound IP."""
try:
status_data = self.generate_node_status_data()
ping_data = self.generate_ping_data()
payload = {
"node": self.node_uuid,
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": status_data,
"pings": ping_data,
}
endpoint_url = f"{self.server_url}/{self.service_uuid}/{self.node_uuid}/"
logger.debug(
f"Node {self.node_id} ({self.source_ip}) sending update. "
f"Uptime: {status_data['uptime_seconds']}s, "
f"Load: {status_data['load_avg'][0]}, Memory: {status_data['memory_usage_percent']}%, "
f"Pings: {len(ping_data)}"
)
# Use the custom session with IP binding
response = self.session.put(endpoint_url, json=payload, timeout=10)
if response.status_code == 200:
response_data = response.json()
if "peers" in response_data and isinstance(
response_data["peers"], dict
):
new_peers = {k: v for k, v in response_data["peers"].items()}
# Log new peer discoveries
newly_discovered = set(new_peers.keys()) - set(
self.known_peers.keys()
)
if newly_discovered:
logger.info(
f"Node {self.node_id} ({self.source_ip}) discovered {len(newly_discovered)} new peer(s)"
)
self.known_peers = new_peers
if (
len(newly_discovered) > 0
or len(self.known_peers) != self.total_nodes - 1
):
logger.debug(
f"Node {self.node_id} ({self.source_ip}) knows {len(self.known_peers)} peers "
f"(expected {self.total_nodes - 1})"
)
return True
else:
logger.error(
f"Node {self.node_id} ({self.source_ip}) failed to send update. "
f"Status: {response.status_code}, Response: {response.text}"
)
return False
except requests.exceptions.Timeout:
logger.error(f"Node {self.node_id} ({self.source_ip}) request timed out")
return False
except requests.exceptions.ConnectionError as e:
logger.error(
f"Node {self.node_id} ({self.source_ip}) connection error: {e}"
)
return False
except Exception as e:
logger.error(
f"Node {self.node_id} ({self.source_ip}) unexpected error: {e}"
)
return False
def run(self):
"""Main loop for this simulated node."""
# Test IP binding before starting
if not self.test_ip_binding():
logger.error(f"Node {self.node_id} cannot start due to IP binding failure")
return
self.running = True
logger.info(
f"Starting Node {self.node_id} with UUID: {self.node_uuid} (IP: {self.source_ip})"
)
# Add some initial delay to stagger node starts
initial_delay = self.node_id * 0.5
time.sleep(initial_delay)
consecutive_failures = 0
while self.running:
try:
success = self.send_update()
if success:
consecutive_failures = 0
else:
consecutive_failures += 1
if consecutive_failures >= 3:
logger.warning(
f"Node {self.node_id} ({self.source_ip}) has failed {consecutive_failures} consecutive updates"
)
# Add some jitter to prevent thundering herd
jitter = random.uniform(-1.0, 1.0)
sleep_time = max(1.0, self.update_interval + jitter)
time.sleep(sleep_time)
except KeyboardInterrupt:
logger.info(
f"Node {self.node_id} ({self.source_ip}) received interrupt signal"
)
break
except Exception as e:
logger.error(
f"Node {self.node_id} ({self.source_ip}) unexpected error in main loop: {e}"
)
time.sleep(self.update_interval)
logger.info(f"Node {self.node_id} ({self.source_ip}) stopped")
def stop(self):
"""Stop the node."""
self.running = False
# --- Multi-Node Manager ---
class MultiNodeManager:
def __init__(
self,
num_nodes: int,
server_url: str,
service_uuid: str,
update_interval: int,
ip_base: int,
):
self.num_nodes = num_nodes
self.server_url = server_url
self.service_uuid = service_uuid
self.update_interval = update_interval
self.ip_base = ip_base
self.nodes = []
self.threads = []
self.running = False
# Create simulated nodes
for i in range(num_nodes):
node = SimulatedNode(
i + 1, num_nodes, server_url, service_uuid, update_interval, ip_base
)
self.nodes.append(node)
def check_ip_availability(self):
"""Check if all required IP addresses are available."""
logger.info("Checking IP address availability...")
all_available = True
for node in self.nodes:
if not node.test_ip_binding():
all_available = False
if not all_available:
logger.error(
"Some IP addresses are not available. See individual node errors above."
)
logger.info("To add loopback IP addresses:")
logger.info(" Linux: sudo ip addr add 127.0.0.X/8 dev lo")
logger.info(" macOS: sudo ifconfig lo0 alias 127.0.0.X")
# Use self.ip_base for the range
logger.info(
f" Where X ranges from {self.ip_base} to {self.ip_base + self.num_nodes - 1}"
)
return False
logger.info("All IP addresses are available!")
return True
def start_all_nodes(self):
"""Start all simulated nodes in separate threads."""
if not self.check_ip_availability():
return False
logger.info(
f"Starting {self.num_nodes} simulated nodes with unique IP addresses..."
)
self.running = True
for node in self.nodes:
thread = threading.Thread(target=node.run, name=f"Node-{node.node_id}")
thread.daemon = True
self.threads.append(thread)
thread.start()
logger.info(f"All {self.num_nodes} nodes started")
return True
def stop_all_nodes(self):
"""Stop all simulated nodes."""
logger.info("Stopping all nodes...")
self.running = False
for node in self.nodes:
node.stop()
# Wait for threads to finish
for thread in self.threads:
thread.join(timeout=5.0)
logger.info("All nodes stopped")
def print_status(self):
"""Print current status of all nodes."""
logger.info(f"=== Multi-Node Status ({self.num_nodes} nodes) ===")
for node in self.nodes:
logger.info(
f"Node {node.node_id}: IP={node.source_ip}, UUID={node.node_uuid[:8]}..., "
f"Uptime={node.uptime_seconds}s, Peers={len(node.known_peers)}"
)
def setup_loopback_ips(num_nodes, base_ip):
"""Helper function to show commands for setting up loopback IPs."""
logger.info("=== Loopback IP Setup Commands ===")
logger.info("Run these commands to add the required loopback IP addresses:")
logger.info("")
# Detect OS and show appropriate commands
import platform
system = platform.system().lower()
for i in range(num_nodes):
ip = f"127.0.0.{base_ip + i}"
if system == "linux":
logger.info(f"sudo ip addr add {ip}/8 dev lo")
elif system == "darwin": # macOS
logger.info(f"sudo ifconfig lo0 alias {ip}")
else:
logger.info(f"Add {ip} to loopback interface (OS: {system})")
logger.info("")
logger.info("To remove them later:")
for i in range(num_nodes):
ip = f"127.0.0.{base_ip + i}"
if system == "linux":
logger.info(f"sudo ip addr del {ip}/8 dev lo")
elif system == "darwin": # macOS
logger.info(f"sudo ifconfig lo0 -alias {ip}")
logger.info("=" * 40)
def main():
parser = argparse.ArgumentParser(
description="Multi-node test client with unique IP binding"
)
parser.add_argument(
"--nodes",
type=int,
default=NUM_NODES,
help=f"Number of simulated nodes (default: {NUM_NODES})",
)
parser.add_argument(
"--interval",
type=int,
default=UPDATE_INTERVAL_SECONDS,
help=f"Update interval in seconds (default: {UPDATE_INTERVAL_SECONDS})",
)
parser.add_argument(
"--server",
type=str,
default=SERVER_BASE_URL,
help=f"Server URL (default: {SERVER_BASE_URL})",
)
parser.add_argument(
"--service-uuid",
type=str,
default=TARGET_SERVICE_UUID,
help="Target service UUID",
)
parser.add_argument(
"--ip-base",
type=int,
default=LOOPBACK_IP_BASE,
help=f"Starting IP for 127.0.0.X (default: {LOOPBACK_IP_BASE})",
)
parser.add_argument(
"--setup-help",
action="store_true",
help="Show commands to set up loopback IP addresses",
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)
args = parser.parse_args()
num_nodes = args.nodes
update_interval = args.interval
server_url = args.server
service_uuid = args.service_uuid
ip_base = args.ip_base
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.setup_help:
setup_loopback_ips(num_nodes, ip_base)
return
# Validate configuration
if service_uuid == "REPLACE_ME_WITH_YOUR_SERVER_SERVICE_UUID":
logger.error("=" * 60)
logger.error("ERROR: TARGET_SERVICE_UUID is not set correctly!")
logger.error(
"Please set it via --service-uuid argument or TARGET_SERVICE_UUID environment variable."
)
logger.error(
"You can find the server's UUID by running main.py and checking its console output"
)
logger.error("or by visiting the server's root endpoint in your browser.")
logger.error("=" * 60)
return
logger.info("=" * 60)
logger.info("Multi-Node Test Client Configuration:")
logger.info(f" Number of nodes: {num_nodes}")
logger.info(f" Update interval: {update_interval} seconds")
logger.info(f" Server URL: {server_url}")
logger.info(f" Target Service UUID: {service_uuid}")
logger.info(f" IP range: 127.0.0.{ip_base} - 127.0.0.{ip_base + num_nodes - 1}")
logger.info("=" * 60)
# Create and start the multi-node manager
manager = MultiNodeManager(
num_nodes, server_url, service_uuid, update_interval, ip_base
)
try:
if not manager.start_all_nodes():
logger.error("Failed to start nodes. Check IP availability.")
setup_loopback_ips(num_nodes, ip_base)
return
# Main monitoring loop
while True:
time.sleep(30) # Print status every 30 seconds
manager.print_status()
except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down...")
except Exception as e:
logger.error(f"Unexpected error in main: {e}", exc_info=True)
finally:
manager.stop_all_nodes()
if __name__ == "__main__":
main()