TEMP: Working data pipeline & rendering

This commit is contained in:
ryyst 2024-06-01 16:14:29 +03:00
parent f1c3cf8758
commit 964e9b3806
8 changed files with 460 additions and 207 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.db

View File

@ -1,102 +1,63 @@
#!/usr/bin/env python3.11
import re
import json import json
import ipaddress
import uuid import uuid
import hashlib import hashlib
from datetime import datetime
from .db import Database from .db import Database
def parse_traceroute_output(output, timestamp):
lines = output.strip().split('\n')
trace = {}
hops = []
ip_regex = r"\((.*?)\)" # ipaddress are in () def parse_traceroute_output(data: str, origin: str):
lines = data.strip().split("\n")
target = lines[0].split()[2]
target = output.strip().split('\n')[0].split()[2] created = datetime.now().isoformat()
trace = {"target": target, "created": created, "origin": origin, "hops": []}
prev_latency = 0
for line in lines[1:]: for line in lines[1:]:
hop = {}
hop_info = line.split() hop_info = line.split()
hop_number = int(hop_info[0]) print("LINE:", hop_info)
hop_name = None try:
hop_ip = None number, name, ip, latency, _ = hop_info
hop_latency = None hop = {
"created": created,
"number": number,
"name": name,
"ip": ip.strip("()"),
"latency": float(latency),
}
except ValueError:
number, name = hop_info
hop = {
"created": created,
"number": number,
"name": name,
"ip": None,
"latency": None,
}
latencies = [] trace["hops"].append(hop)
#print("##### "+str(hop_info))
count = 0
for part in hop_info[1:]:
count += 1
# source node drops or blocks icmp packages
# We will give funny to name to hop for not answering and move on.
if part == '*':
hop_name = 'unresponsive'
hop_ip = 'unresponsive'
break
# If first colum is either name or ip-address
if count == 1:
match = re.search(ip_regex, part)
if match:
hop_ip = part.strip('()')
else:
hop_name = part
# Second colum is ip-address first latency reading
if count == 2:
if re.search(ip_regex, part):
try:
_ip = ipaddress.ip_address(part.strip('()'))
hop_ip = part.strip('()')
except ValueError:
pass # Ignore if it's not a valid IP address
# Rest of the input colums are either latency floats, 'ms' or
# reruns of the hop_name and hop_ip...
# We only need the latency floats anymore.
else:
try:
latency = float(part)
latencies.append(latency)
except ValueError:
pass
hop_latency = sum(latencies) / len(latencies) if latencies else None
hop['hop_number'] = hop_number
if not hop_name == None:
hop['hop_name'] = hop_name
hop['hop_ip'] = hop_ip
hop['hop_latency'] = hop_latency
hops.append(hop)
trace['target'] = target
trace['timestamp'] = timestamp
trace['hops'] = hops
return trace return trace
def store_traceroute(node, trace): def store_traceroute(trace):
db = Database() db = Database()
#hops_json = json.dumps(trace['hops']) # hops_json = json.dumps(trace['hops'])
path_ids = {} path_ids = {}
previous_hop_ip = None previous_hop_ip = None
previous_hop_latency = None previous_hop_latency = None
for hop in trace['hops']: for hop in trace["hops"]:
hop_number = hop['hop_number'] hop_number = hop["number"]
hop_name = hop.get('hop_name') hop_name = hop.get("name")
hop_ip = hop.get('hop_ip') hop_ip = hop.get("ip")
hop_latency = hop.get('hop_latency') hop_latency = hop.get("latency")
link_id = None link_id = None
# insert links and get their id's # insert links and get their id's
@ -112,20 +73,21 @@ def store_traceroute(node, trace):
# calculate link latency if possible and store it # calculate link latency if possible and store it
if link_id and previous_hop_latency: if link_id and previous_hop_latency:
link_latency = hop_latency - previous_hop_latency link_latency = hop_latency - previous_hop_latency
db.create_latency(link_id, trace['timestamp'], link_latency) db.create_latency(link_id, trace["created"], link_latency)
# make entry to "Paths" table # make entry to "Paths" table
if path_ids: if path_ids:
json_path_ids = json.dumps(path_ids) json_path_ids = json.dumps(path_ids)
db.create_path(node, trace['target'], json_path_ids) db.create_path(node, trace["target"], json_path_ids)
db.end() db.end()
def generate_node_id(): def generate_node_id():
mac = uuid.getnode() mac = uuid.getnode()
mac_str = ':'.join(['{:02x}'.format((mac >> ele) & 0xff) for ele in range(0,8*6,8)][::-1]) mac_str = ":".join(
["{:02x}".format((mac >> ele) & 0xFF) for ele in range(0, 8 * 6, 8)][::-1]
)
# Hash the MAC address using SHA-256 to generate a unique ID # Hash the MAC address using SHA-256 to generate a unique ID
unique_id = hashlib.sha256(mac_str.encode()).hexdigest() unique_id = hashlib.sha256(mac_str.encode()).hexdigest()

150
app/db.py
View File

@ -12,40 +12,35 @@ class Database:
def __init__(self): def __init__(self):
self.db_file = DB_FILE self.db_file = DB_FILE
self.conn = sqlite3.connect(self.db_file, check_same_thread=False) self.conn = sqlite3.connect(self.db_file, check_same_thread=False)
# Return fetch() data as Row objects, instead of tuples.
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
def create_tables(self): def create_tables(self):
self.cursor.executescript(""" self.cursor.executescript(
CREATE TABLE IF NOT EXISTS Links ( """
id INTEGER PRIMARY KEY, CREATE TABLE IF NOT EXISTS Traces (
source_ip TEXT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT,
destination_ip TEXT NOT NULL, created TEXT NOT NULL,
UNIQUE(source_ip, destination_ip) origin TEXT NOT NULL,
target TEXT NOT NULL
); );
CREATE TABLE IF NOT EXISTS Paths ( CREATE TABLE IF NOT EXISTS Hops (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY AUTOINCREMENT,
node TEXT NOT NULL, trace_id INTEGER,
target TEXT NOT NULL, created TEXT NOT NULL,
hops_json TEXT NOT NULL, number INTEGER NOT NULL,
UNIQUE(node, target, hops_json) name TEXT,
); ip TEXT,
latency TEXT,
CREATE TABLE IF NOT EXISTS Latency ( FOREIGN KEY(trace_id) REFERENCES Traces(id)
id INTEGER PRIMARY KEY,
link_id INTEGER NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
latency_ms REAL NOT NULL,
FOREIGN KEY (link_id) REFERENCES Links(id)
); );
"""
CREATE TABLE IF NOT EXISTS HopDetails ( )
id INTEGER PRIMARY KEY,
hop_name TEXT,
hop_ip TEXT,
hop_latency TEXT
);
""")
def end(self): def end(self):
"""Always call this after you're done with the connection / request.""" """Always call this after you're done with the connection / request."""
@ -53,71 +48,80 @@ class Database:
self.conn.commit() self.conn.commit()
self.conn.close() self.conn.close()
def get_traceroute(self): def list_traces(self):
retval = {} # TODO: time filter
result = []
self.cursor.execute(''' self.cursor.execute("SELECT * FROM Traces")
SELECT target, hops_json traces = self.cursor.fetchall()
FROM Paths
''')
retval['path'] = self.cursor.fetchall()
self.cursor.execute(''' for t in traces:
SELECT source_ip, destination_ip trace = dict(t)
FROM Links
''')
retval['links'] = self.cursor.fetchall()
return retval self.cursor.execute(
"SELECT number, name, ip, latency FROM Hops WHERE trace_id = ? ORDER BY number ASC",
(trace["id"],),
)
hops = self.cursor.fetchall()
trace["hops"] = hops
def create_link(self, previous_hop_ip, hop_ip): result.append(trace)
return result
def create_trace(self, trace):
self.cursor.execute( self.cursor.execute(
"INSERT OR IGNORE INTO Links (source_ip, destination_ip) VALUES (?, ?)", "INSERT OR IGNORE INTO Traces (created, origin, target) VALUES (?, ?, ?)",
(previous_hop_ip, hop_ip) (trace["created"], trace["origin"], trace["target"]),
)
self.cursor.execute(
"SELECT id FROM Links WHERE source_ip = ? AND destination_ip = ?",
(previous_hop_ip, hop_ip)
) )
trace_id = self.cursor.lastrowid
for hop in trace["hops"]:
self.cursor.execute(
"INSERT OR IGNORE INTO Hops (trace_id, created, number, name, ip, latency) VALUES (?, ?, ?, ?, ?, ?)",
(
trace_id,
hop["created"],
hop["number"],
hop["name"],
hop["ip"],
hop["latency"],
),
)
return self.cursor.fetchone() return self.cursor.fetchone()
def create_hop(self, name, ip, latency): def create_hop(self, name, ip, latency):
self.cursor.execute( self.cursor.execute(
"INSERT INTO HopDetails (hop_name, hop_ip, hop_latency) VALUES (?, ?, ?)", "INSERT INTO Hops (name, ip, latency) VALUES (?, ?, ?)",
(name, ip, latency) (name, ip, latency),
) )
def create_latency(self, link_id, timestamp, link_latency): def create_latency(self, link_id, timestamp, link_latency):
self.cursor.execute( self.cursor.execute(
"INSERT INTO Latency (link_id, timestamp, latency_ms) VALUES (?, NOW(), ?)", "INSERT INTO Latency (link_id, timestamp, latency_ms) VALUES (?, ?, ?)",
(link_id, timestamp, link_latency) (link_id, timestamp, link_latency),
) )
def create_path(self, node, target, json): def create_path(self, node, target, json):
self.cursor.execute( self.cursor.execute(
"INSERT OR IGNORE INTO Paths (node, target, hops_json) VALUES (?, ?, ?)", "INSERT OR IGNORE INTO Paths (node, target, hops_json) VALUES (?, ?, ?)",
(node, target, json) (node, target, json),
) )
def ensure_table_setup(): def ensure_table_setup():
db = Database() db = Database()
db.create_tables() db.create_tables()
db.end() db.end()
####################################################################
####################################################################
####################################################################
####################################################################
####################################################################
####################################################################
####################################################################
####################################################################
# Temp testing. Fancy decorator stuff.
def with_connection(func): def with_connection(func):
@wraps(func) @wraps(func)
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):
@ -130,12 +134,14 @@ def with_connection(func):
conn.close() conn.close()
return result return result
return wrapped return wrapped
@with_connection @with_connection
def init_db(cursor: Cursor): def init_db(cursor: Cursor):
cursor.executescript(""" cursor.executescript(
"""
CREATE TABLE IF NOT EXISTS Links ( CREATE TABLE IF NOT EXISTS Links (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
source_ip TEXT NOT NULL, source_ip TEXT NOT NULL,
@ -165,18 +171,22 @@ def init_db(cursor: Cursor):
hop_ip TEXT, hop_ip TEXT,
hop_latency TEXT hop_latency TEXT
); );
""") """
)
@with_connection @with_connection
def create_link(cursor: Cursor, previous_hop_ip: str, hop_ip: str): def insert_hop(cursor: Cursor, previous_hop_ip: str, hop_ip: str):
"""Insert a new hop and return related Link id""" """Insert a new hop and return related Link id"""
cursor.execute(""" cursor.execute(
INSERT OR IGNORE INTO Links (source_ip, destination_ip) VALUES (?, ?) "INSERT OR IGNORE INTO Links (source_ip, destination_ip) VALUES (?, ?)",
""", (previous_hop_ip, hop_ip)) (previous_hop_ip, hop_ip),
)
cursor.execute(""" cursor.execute(
SELECT id FROM Links WHERE source_ip = ? AND destination_ip = ? "SELECT id FROM Links WHERE source_ip = ? AND destination_ip = ?",
""", (previous_hop_ip, hop_ip)) (previous_hop_ip, hop_ip),
)
return cursor.fetchone() return cursor.fetchone()

View File

@ -1,10 +1,8 @@
from fastapi import Request, FastAPI from fastapi import Request, FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from datetime import datetime
from .collector import parse_traceroute_output, store_traceroute from .collector import parse_traceroute_output, store_traceroute
from .db import ensure_table_setup from .db import Database, ensure_table_setup
from pprint import pprint as print from pprint import pprint as print
@ -14,7 +12,8 @@ ensure_table_setup()
# Setup web framework thingies # Setup web framework thingies
app = FastAPI() app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static", StaticFiles(directory="app/static"), name="static")
@app.get("/") @app.get("/")
def read_root(): def read_root():
@ -33,23 +32,36 @@ def read_root():
"", "",
"", "",
"END OF TRANSMISSION", "END OF TRANSMISSION",
] + [None]*800 ]
+ [None] * 800,
} }
@app.post("/trace/{hostname}") @app.get("/trace/")
async def create_trace(hostname: str, request: Request): def list_traces():
db = Database()
trace = db.list_traces()
db.end()
return trace
@app.post("/trace/{origin}")
async def create_trace(origin: str, request: Request):
raw_data = await request.body() raw_data = await request.body()
data = raw_data.decode("utf-8", "ignore") data = raw_data.decode("utf-8", "ignore")
print(f"Received data from {hostname}:") print(f"Received data from {origin}:")
print(data) print(data)
trace = parse_traceroute_output(data, datetime.now()) trace = parse_traceroute_output(data, origin)
print("Parsed data:") print("Parsed data:")
print(trace) print(trace)
store_traceroute(hostname, trace) db = Database()
db.create_trace(trace)
db.end()
return {"status": "ok" } return {"status": "ok"}

2
app/static/d3.v7.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View File

@ -4,53 +4,11 @@
<title>Kalzu</title> <title>Kalzu</title>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<script src="/static/d3.v7.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sigma.js/2.4.0/sigma.min.js" integrity="sha512-iiPEYww3QXZU5C795JnnINBRNgHqDnRHs9mA7aJoqx4pNE4u3CknCDGmePHFoHtKR/6C9aIcRFa+HJ6obtlteQ==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/graphology/0.25.4/graphology.umd.min.js" integrity="sha512-tjMBhL9fLMcqoccPOwpRiIQIOAyUh18lWUlUvE10zvG1UNMfxUC4qSERmUq+VF30iavIyqs/q6fSP2o475FAUw==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
</head> </head>
<body style="background: lightgrey"> <body style="background: #EEE">
<div id="container" style="width: 97%; height: 95vh; background: white; margin: auto; margin-top: 32px"></div> <div id="container"></div>
<script> <script src="/static/index.js"></script>
const hops = [
["_gateway", "(192.168.0.1) 0.549 ms"],
["83-148-246-178.static.lounea.fi", "(83.148.246.178) 1.200 ms"],
["*"],
["*"],
["*"],
["86-60-254-198.static.lounea.fi", "(86.60.254.198) 3.719 ms"],
["72.14.194.142", " (72.14.194.142) 3.716 ms"],
["142.251.53.71", " (142.251.53.71) 3.704 ms"],
["142.250.229.87", " (142.250.229.87) 4.603 ms"],
["mad06s09-in-f142.1e100.net", " (216.58.210.142) 3.700 ms"],
];
console.log("Hops!", hops)
// Create a graphology graph
const graph = new graphology.Graph();
hops.forEach((hop, i) => {
const [ label, extra ] = hop;
const size = i === 0 ? 15 : 10;
const color = "green"
graph.addNode(i, { label, x: 0, y: i, size, color });
if (i > 0) {
graph.addEdge(i, i-1, { size: 2, color: "purple" });
}
});
// graph.addNode("1", { label: "Node 1", x: 0, y: 0, size: 10, color: "blue" });
// graph.addNode("2", { label: "Node 2", x: 1, y: 1, size: 20, color: "red" });
// graph.addEdge("1", "2", { size: 5, color: "purple" });
// Instantiate sigma.js and render the graph
const sigmaInstance = new Sigma(graph, document.getElementById("container"));
</script>
</body> </body>
</html> </html>
</html> </html>

308
app/static/index.js Normal file
View File

@ -0,0 +1,308 @@
const linkArc = (d) => {
const r = Math.hypot(d.target.x - d.source.x, d.target.y - d.source.y);
return `
M${d.source.x},${d.source.y}
A${r},${r} 0 0,1 ${d.target.x},${d.target.y}
`;
};
const drag = (simulation) => {
function dragstarted(event, d) {
if (!event.active) simulation.alphaTarget(0.3).restart();
d.fx = d.x;
d.fy = d.y;
}
function dragged(event, d) {
d.fx = event.x;
d.fy = event.y;
}
function dragended(event, d) {
if (!event.active) simulation.alphaTarget(0);
d.fx = null;
d.fy = null;
}
return d3
.drag()
.on("start", dragstarted)
.on("drag", dragged)
.on("end", dragended);
};
const drawChart = (data) => {
// Specify the dimensions of the chart.
const width = 1600;
const height = 1200;
// Specify the color scale.
const color = d3.scaleOrdinal(d3.schemeCategory10);
// The force simulation mutates links and nodes, so create a copy
// so that re-evaluating this cell produces the same result.
const links = data.links.map((d) => ({ ...d }));
const nodes = data.nodes.map((d) => ({ ...d }));
// Create a simulation with several forces.
const simulation = d3
.forceSimulation(nodes)
.force(
"link",
d3.forceLink(links).id((d) => d.id),
)
.force("charge", d3.forceManyBody())
.force("x", d3.forceX())
.force("y", d3.forceY());
// Create the SVG container.
const svg = d3
.create("svg")
.attr("width", width)
.attr("height", height)
.attr("viewBox", [-width / 2, -height / 2, width, height])
.attr("style", "max-width: 100%; height: auto;");
// Add a line for each link, and a circle for each node.
const link = svg
.append("g")
.attr("stroke", "#999")
.attr("stroke-opacity", 0.6)
.selectAll("line")
.data(links)
.join("line")
.attr("stroke-width", 1); // (d) => Math.sqrt(d.value));
const node = svg
.append("g")
.attr("stroke", "#fff")
.attr("stroke-width", 1.5)
.selectAll("circle")
.data(nodes)
.join("circle")
.attr("r", 5)
.attr("fill", (d) => color(d.group));
node.append("title").text((d) => d.id);
// Add a drag behavior.
node.call(
d3.drag().on("start", dragstarted).on("drag", dragged).on("end", dragended),
);
// Set the position attributes of links and nodes each time the simulation ticks.
simulation.on("tick", () => {
link
.attr("x1", (d) => d.source.x)
.attr("y1", (d) => d.source.y)
.attr("x2", (d) => d.target.x)
.attr("y2", (d) => d.target.y);
node.attr("cx", (d) => d.x).attr("cy", (d) => d.y);
});
// Reheat the simulation when drag starts, and fix the subject position.
function dragstarted(event) {
if (!event.active) simulation.alphaTarget(0.3).restart();
event.subject.fx = event.subject.x;
event.subject.fy = event.subject.y;
}
// Update the subject (dragged node) position during drag.
function dragged(event) {
event.subject.fx = event.x;
event.subject.fy = event.y;
}
// Restore the target alpha so the simulation cools after dragging ends.
// Unfix the subject position now that its no longer being dragged.
function dragended(event) {
if (!event.active) simulation.alphaTarget(0);
event.subject.fx = null;
event.subject.fy = null;
}
// When this cell is re-run, stop the previous simulation. (This doesnt
// really matter since the target alpha is zero and the simulation will
// stop naturally, but its a good practice.)
// invalidation.then(() => simulation.stop());
return svg.node();
};
const drawChart2 = (data) => {
const width = 1600;
const height = 1200;
const types = Array.from(new Set(data.map((d) => d.type)));
const nodes = Array.from(
new Set(data.flatMap((l) => [l.source, l.target])),
(id) => ({ id }),
);
const links = data.map((d) => Object.create(d));
const color = d3.scaleOrdinal(types, d3.schemeCategory10);
const simulation = d3
.forceSimulation(nodes)
.force(
"link",
d3.forceLink(links).id((d) => d.id),
)
.force("charge", d3.forceManyBody().strength(-400))
.force("x", d3.forceX())
.force("y", d3.forceY());
const svg = d3
.create("svg")
.attr("viewBox", [-width / 2, -height / 2, width, height])
.attr("width", width)
.attr("height", height)
.attr("style", "max-width: 100%; height: auto; font: 12px sans-serif;");
// Per-type markers, as they don't inherit styles.
svg
.append("defs")
.selectAll("marker")
.data(types)
.join("marker")
.attr("id", (d) => `arrow-${d}`)
.attr("viewBox", "0 -5 10 10")
.attr("refX", 15)
.attr("refY", -0.5)
.attr("markerWidth", 6)
.attr("markerHeight", 6)
.attr("orient", "auto")
.append("path")
.attr("fill", color)
.attr("d", "M0,-5L10,0L0,5");
const link = svg
.append("g")
.attr("fill", "none")
.attr("stroke-width", 1.5)
.selectAll("path")
.data(links)
.join("path")
.attr("stroke", (d) => color(d.type))
.attr("marker-end", (d) => `url(${new URL(`#arrow-${d.type}`, location)})`);
const node = svg
.append("g")
.attr("fill", "currentColor")
.attr("stroke-linecap", "round")
.attr("stroke-linejoin", "round")
.selectAll("g")
.data(nodes)
.join("g")
.call(drag(simulation));
node
.append("circle")
.attr("stroke", "white")
.attr("stroke-width", 1.5)
.attr("r", 4);
node
.append("text")
.attr("x", 8)
.attr("y", "0.31em")
.text((d) => (d.id.endsWith("*") ? "*" : d.id))
.clone(true)
.lower()
.attr("fill", "none")
.attr("stroke", "white")
.attr("stroke-width", 3);
simulation.on("tick", () => {
link.attr("d", linkArc);
node.attr("transform", (d) => `translate(${d.x},${d.y})`);
});
// invalidation.then(() => simulation.stop());
return Object.assign(svg.node(), { scales: { color } });
};
const parseNodesAndLinks = (traces) => {
const result = {
nodes: [],
links: [],
};
traces.forEach((trace) => {
let prevId = null;
const getId = (hop) => {
if (prevId === null) {
return trace.origin;
}
if (hop.name === "*") {
return `${trace.id}-${hop.number}-*`;
}
return hop.ip;
};
trace.hops.forEach((hop) => {
const id = getId(hop);
// New node
result.nodes.push({
id: id,
group: trace.origin,
radius: 8,
value: hop.name || "name?",
origin: trace.origin,
});
if (prevId) {
// New link
result.links.push({
source: prevId,
target: id,
type: trace.origin,
group: trace.origin,
});
}
prevId = id;
});
// Last "destination" node
result.nodes.push({
id: trace.id,
group: trace.origin,
radius: 8,
value: trace.target,
});
if (prevId) {
// New link
result.links.push({
source: prevId,
target: trace.target,
type: trace.origin,
group: trace.origin,
});
}
});
// { id: ip, group: origin, radius: 2 }
// { source: prev.ip, target: ip, value: latency }
return result;
};
async function main() {
const response = await fetch("/trace/");
const traces = await response.json();
console.log("Traces:", traces);
const data = parseNodesAndLinks(traces);
console.log("Data:", data);
const chart = drawChart2(data.links);
container.append(chart);
}
main();

Binary file not shown.