#!/usr/bin/env python3 import socket import json import time import os from datetime import datetime from collections import deque from rich.live import Live from rich.table import Table from rich.layout import Layout from rich.panel import Panel from rich.console import Console from rich.text import Text # --- CONFIGURATION --- INPUT_SOCKET = "/tmp/streamer.sock" ONRAMP_HOST = "127.0.0.1" ONRAMP_PORT = 9999 ANALYST_HOST = "127.0.0.1" ANALYST_PORT = 9997 REFRESH_RATE = 1.0 # Global state to track lag history (last 300 seconds) LAG_HISTORY = deque(maxlen=300) console = Console() # ------------------- QUERY HELPERS ------------------- def query_input_go(): if not os.path.exists(INPUT_SOCKET): return None try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: s.settimeout(0.2) s.connect(INPUT_SOCKET) return s.recv(1024).decode('utf-8').strip() except: return None def query_tcp_json(host, port, payload=b"status"): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1.0) s.connect((host, port)) s.sendall(payload) chunks = [] while True: chunk = s.recv(4096) if not chunk: break chunks.append(chunk) return json.loads(b"".join(chunks).decode()) except: return None # ------------------- LAYOUT ------------------- def make_layout(): layout = Layout() layout.split( Layout(name="header", size=3), Layout(name="main", size=10), Layout(name="market", size=12), Layout(name="footer", size=3), ) layout["main"].split_row( Layout(name="input_svc"), Layout(name="onramp_svc"), Layout(name="analyst_svc"), ) return layout # ------------------- PANELS ------------------- def get_input_panel(): raw = query_input_go() if not raw: return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red") parts = raw.split("|") content = "\n".join([p.strip() for p in parts]) return Panel(content, title="[1] Input Service (Go)", border_style="green") def get_onramp_panel(): data = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT) if not data: LAG_HISTORY.clear() return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Go)", border_style="red") last_ts = data.get('last_ts', 0) / 1000 current_lag = time.time() - last_ts if last_ts > 0 else 0 LAG_HISTORY.append(current_lag) avg_1m = sum(list(LAG_HISTORY)[-60:]) / min(len(LAG_HISTORY), 60) avg_5m = sum(LAG_HISTORY) / len(LAG_HISTORY) lag_style = "green" if current_lag < 2 else "yellow" if current_lag < 5 else "bold red" content = Text() content.append(f"Uptime Start : {data.get('uptime_start')}\n") content.append(f"Total Trades : {data.get('total_trades')}\n") content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n") content.append("Lag (Avg) : ") content.append(f"{current_lag:.2f}s", style=lag_style) content.append(f", {avg_1m:.2f}s/1m", style="dim" if avg_1m < 2 else "yellow") content.append(f", {avg_5m:.2f}s/5m", style="dim" if avg_5m < 2 else "yellow") return Panel(content, title="[2] Onramp Service (Go)", border_style="blue") def get_analyst_panel(): data = query_tcp_json(ANALYST_HOST, ANALYST_PORT) if not data: return Panel(Text("OFFLINE", style="bold red"), title="[3] Analyst Service (Python)", border_style="red") tf_data = data.get("timeframes", {}) active = ", ".join(data.get("active_timeframes", [])) content = Text() content.append(f"Active TFs : {active}\n\n") for tf in ["1m", "5m", "15m", "1h"]: tf_info = tf_data.get(tf) if not tf_info: continue last_ts = tf_info.get("last") last_str = ( datetime.fromtimestamp(last_ts).strftime('%H:%M:%S') if last_ts else "n/a" ) new = tf_info.get("new", 0) color = "green" if new > 0 else "dim" content.append(f"{tf:>4} | ") content.append(f"+{new:<4}", style=color) content.append(f" last: {last_str}\n") return Panel(content, title="[3] Analyst Service (Python)", border_style="magenta") # ------------------- MARKET TABLE ------------------- def get_market_table(): res = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT, b"live") table = Table(expand=True, border_style="cyan", header_style="bold cyan") table.add_column("TF", justify="center", style="bold yellow") table.add_column("Last Update", justify="center") table.add_column("Open", justify="right") table.add_column("High", justify="right") table.add_column("Low", justify="right") table.add_column("Close", justify="right") table.add_column("Volume", justify="right", style="magenta") table.add_column("Buy %", justify="right") if res and "data" in res: candles_data = res["data"] for tf in ["1m", "5m", "15m", "1h"]: if tf in candles_data and candles_data[tf]: all_ts = [int(ts) for ts in candles_data[tf].keys()] latest_ts = str(max(all_ts)) c = candles_data[tf][latest_ts] ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S') color = "green" if c['close'] >= c['open'] else "red" buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0 buy_color = "green" if buy_pct > 50 else "red" table.add_row( tf, ts_str, f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}", Text(f"{c['close']:.2f}", style=f"bold {color}"), f"{c['volume']:.2f}", Text(f"{buy_pct:.1f}%", style=buy_color) ) else: table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-") return table # ------------------- MAIN ------------------- def main(): layout = make_layout() with Live(layout, refresh_per_second=2, screen=True): while True: layout["header"].update( Panel( Text( f"BYBIT BTCUSDT UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", justify="center", style="bold white on blue" ) ) ) layout["input_svc"].update(get_input_panel()) layout["onramp_svc"].update(get_onramp_panel()) layout["analyst_svc"].update(get_analyst_panel()) layout["market"].update(get_market_table()) layout["footer"].update( Text( "Ctrl+C to exit | Pipeline: Input → Onramp → Analyst", justify="center", style="dim" ) ) time.sleep(REFRESH_RATE) if __name__ == "__main__": main()