import json import uuid import hashlib from datetime import datetime from .db import Database def parse_node(hop_info: list[str], target, origin): try: # Regular lines. number, name, ip, latency, _ = hop_info latency = float(latency) ip = (ip.strip("()"),) return { "render_id": ip, "hop_number": number, "name": name, "ip": ip, "latency_ms": latency, } except ValueError: # Asterisks, no data found for hop. number, name = hop_info return { "render_id": f"{origin}-{target}-{number}", "hop_number": number, "name": name, "ip": None, "latency_ms": None, } def parse_link(node, prev_node): latency = ( node["latency"] - prev_node["latency"] if prev_node is not None else node["latency"] ) return { "source": prev_node.get("render_id"), "target": node["render_id"], "latency_ms": latency, } def parse_traceroute_output(data: str, origin: str): # TODO: data validation lines = data.strip().split("\n") target = lines[0].split()[2] created = datetime.now().isoformat() trace = {"target": target, "created": created, "origin": origin, "hops": []} prev_node = None for line in lines[1:]: hop_info = line.split() print("LINE:", hop_info) node = parse_node(hop_info, target, origin) link = parse_link(node, prev_node) trace["nodes"].append(node) trace["links"].append(link) prev_node = node return trace # def store_traceroute(trace): # # hops_json = json.dumps(trace['hops']) # # path_ids = {} # # previous_hop_ip = None # previous_hop_latency = None # for hop in trace["hops"]: # hop_number = hop["number"] # hop_name = hop.get("name") # hop_ip = hop.get("ip") # hop_latency = hop.get("latency") # link_id = None # # # insert links and get their id's # if previous_hop_ip: # link_id = db.create_link(previous_hop_ip, hop_ip) # path_ids[hop_number] = link_id # # previous_hop_ip = hop_ip # # # Save hop details # db.create_hop(hop_name, hop_ip, hop_latency) # # # calculate link latency if possible and store it # if link_id and previous_hop_latency: # link_latency = hop_latency - previous_hop_latency # db.create_latency(link_id, trace["created"], link_latency) # # # make entry to "Paths" table # if path_ids: # json_path_ids = json.dumps(path_ids) # db.create_path(node, trace["target"], json_path_ids) # def generate_node_id(): # mac = uuid.getnode() # mac_str = ":".join( # ["{:02x}".format((mac >> ele) & 0xFF) for ele in range(0, 8 * 6, 8)][::-1] # ) # # # Hash the MAC address using SHA-256 to generate a unique ID # unique_id = hashlib.sha256(mac_str.encode()).hexdigest() # # return unique_id