traceroute_map/app/parser.py

120 lines
3.0 KiB
Python
Raw Permalink Normal View History

2024-06-05 22:59:20 +03:00
import json
import uuid
import hashlib
from datetime import datetime
from .db import Database
def parse_node(hop_info: list[str], target, origin):
try:
# Regular lines.
number, name, ip, latency, _ = hop_info
latency = float(latency)
ip = (ip.strip("()"),)
return {
"render_id": ip,
"hop_number": number,
"name": name,
"ip": ip,
"latency_ms": latency,
}
except ValueError:
# Asterisks, no data found for hop.
number, name = hop_info
return {
"render_id": f"{origin}-{target}-{number}",
"hop_number": number,
"name": name,
"ip": None,
"latency_ms": None,
}
def parse_link(node, prev_node):
latency = (
node["latency"] - prev_node["latency"]
if prev_node is not None
else node["latency"]
)
return {
"source": prev_node.get("render_id"),
"target": node["render_id"],
"latency_ms": latency,
}
def parse_traceroute_output(data: str, origin: str):
# TODO: data validation
lines = data.strip().split("\n")
target = lines[0].split()[2]
created = datetime.now().isoformat()
trace = {"target": target, "created": created, "origin": origin, "hops": []}
prev_node = None
for line in lines[1:]:
hop_info = line.split()
print("LINE:", hop_info)
node = parse_node(hop_info, target, origin)
link = parse_link(node, prev_node)
trace["nodes"].append(node)
trace["links"].append(link)
prev_node = node
return trace
# def store_traceroute(trace):
# # hops_json = json.dumps(trace['hops'])
#
# path_ids = {}
#
# previous_hop_ip = None
# previous_hop_latency = None
# for hop in trace["hops"]:
# hop_number = hop["number"]
# hop_name = hop.get("name")
# hop_ip = hop.get("ip")
# hop_latency = hop.get("latency")
# link_id = None
#
# # insert links and get their id's
# if previous_hop_ip:
# link_id = db.create_link(previous_hop_ip, hop_ip)
# path_ids[hop_number] = link_id
#
# previous_hop_ip = hop_ip
#
# # Save hop details
# db.create_hop(hop_name, hop_ip, hop_latency)
#
# # calculate link latency if possible and store it
# if link_id and previous_hop_latency:
# link_latency = hop_latency - previous_hop_latency
# db.create_latency(link_id, trace["created"], link_latency)
#
# # make entry to "Paths" table
# if path_ids:
# json_path_ids = json.dumps(path_ids)
# db.create_path(node, trace["target"], json_path_ids)
# def generate_node_id():
# mac = uuid.getnode()
# mac_str = ":".join(
# ["{:02x}".format((mac >> ele) & 0xFF) for ele in range(0, 8 * 6, 8)][::-1]
# )
#
# # Hash the MAC address using SHA-256 to generate a unique ID
# unique_id = hashlib.sha256(mac_str.encode()).hexdigest()
#
# return unique_id