import sqlite3 from functools import wraps # Type alias Cursor = sqlite3.Cursor # Configs DB_FILE = "./traceroute.db" class Database: def __init__(self): self.db_file = DB_FILE self.conn = sqlite3.connect(self.db_file, check_same_thread=False) # Return fetch() data as Row objects, instead of tuples. self.conn.row_factory = sqlite3.Row self.cursor = self.conn.cursor() def create_tables(self): self.cursor.executescript( """ CREATE TABLE IF NOT EXISTS Traces ( id INTEGER PRIMARY KEY AUTOINCREMENT, created TEXT NOT NULL, origin TEXT NOT NULL, target TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS Hops ( id INTEGER PRIMARY KEY AUTOINCREMENT, trace_id INTEGER, created TEXT NOT NULL, number INTEGER NOT NULL, name TEXT, ip TEXT, latency TEXT, FOREIGN KEY(trace_id) REFERENCES Traces(id) ); """ ) def end(self): """Always call this after you're done with the connection / request.""" self.conn.commit() self.conn.close() def list_traces(self): # TODO: time filter result = [] self.cursor.execute("SELECT * FROM Traces") traces = self.cursor.fetchall() for t in traces: trace = dict(t) self.cursor.execute( "SELECT number, name, ip, latency FROM Hops WHERE trace_id = ? ORDER BY number ASC", (trace["id"],), ) hops = self.cursor.fetchall() trace["hops"] = hops result.append(trace) return result def create_trace(self, trace): self.cursor.execute( "INSERT OR IGNORE INTO Traces (created, origin, target) VALUES (?, ?, ?)", (trace["created"], trace["origin"], trace["target"]), ) trace_id = self.cursor.lastrowid for hop in trace["hops"]: self.cursor.execute( "INSERT OR IGNORE INTO Hops (trace_id, created, number, name, ip, latency) VALUES (?, ?, ?, ?, ?, ?)", ( trace_id, hop["created"], hop["number"], hop["name"], hop["ip"], hop["latency"], ), ) return self.cursor.fetchone() def create_hop(self, name, ip, latency): self.cursor.execute( "INSERT INTO Hops (name, ip, latency) VALUES (?, ?, ?)", (name, ip, latency), ) def create_latency(self, link_id, timestamp, link_latency): self.cursor.execute( "INSERT INTO Latency (link_id, timestamp, latency_ms) VALUES (?, ?, ?)", (link_id, timestamp, link_latency), ) def create_path(self, node, target, json): self.cursor.execute( "INSERT OR IGNORE INTO Paths (node, target, hops_json) VALUES (?, ?, ?)", (node, target, json), ) def ensure_table_setup(): db = Database() db.create_tables() db.end() #################################################################### #################################################################### #################################################################### #################################################################### def with_connection(func): @wraps(func) def wrapped(*args, **kwargs): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() result = func(cursor, *args, **kwargs) conn.commit() conn.close() return result return wrapped @with_connection def init_db(cursor: Cursor): cursor.executescript( """ CREATE TABLE IF NOT EXISTS Links ( id INTEGER PRIMARY KEY, source_ip TEXT NOT NULL, destination_ip TEXT NOT NULL, UNIQUE(source_ip, destination_ip) ); CREATE TABLE IF NOT EXISTS Paths ( id INTEGER PRIMARY KEY, node TEXT NOT NULL, target TEXT NOT NULL, hops_json TEXT NOT NULL, UNIQUE(node, target, hops_json) ); CREATE TABLE IF NOT EXISTS Latency ( id INTEGER PRIMARY KEY, link_id INTEGER NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, latency_ms REAL NOT NULL, FOREIGN KEY (link_id) REFERENCES Links(id) ); CREATE TABLE IF NOT EXISTS HopDetails ( id INTEGER PRIMARY KEY, hop_name TEXT, hop_ip TEXT, hop_latency TEXT ); """ ) @with_connection def insert_hop(cursor: Cursor, previous_hop_ip: str, hop_ip: str): """Insert a new hop and return related Link id""" cursor.execute( "INSERT OR IGNORE INTO Links (source_ip, destination_ip) VALUES (?, ?)", (previous_hop_ip, hop_ip), ) cursor.execute( "SELECT id FROM Links WHERE source_ip = ? AND destination_ip = ?", (previous_hop_ip, hop_ip), ) return cursor.fetchone()