From ac4f2cfcc828920e2d787f5b867817b4bed7a1d9 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Tue, 13 Jan 2026 21:41:27 +0200 Subject: [PATCH] Better monitor, uses rich library for TUI. --- monitor/monitor.py | 214 ++++++++++++++++++++++++++++++--------------- 1 file changed, 144 insertions(+), 70 deletions(-) diff --git a/monitor/monitor.py b/monitor/monitor.py index 51fc7fd..1f6678f 100755 --- a/monitor/monitor.py +++ b/monitor/monitor.py @@ -6,96 +6,170 @@ import time import os from datetime import datetime +from rich.live import Live +from rich.table import Table +from rich.layout import Layout +from rich.panel import Panel +from rich.console import Console +from rich.text import Text +from rich.columns import Columns + # --- CONFIGURATION --- -# Adjust these paths/ports to match your config.json files INPUT_SOCKET = "/tmp/streamer.sock" ONRAMP_HOST = "127.0.0.1" ONRAMP_PORT = 9999 -REFRESH_RATE = 1.0 # Seconds +REFRESH_RATE = 1.0 + +console = Console() def query_input_go(): - """Queries the Go Input service via Unix Socket.""" if not os.path.exists(INPUT_SOCKET): - return "OFFLINE (Socket not found)" + return None try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: - s.settimeout(0.5) + s.settimeout(0.2) s.connect(INPUT_SOCKET) - data = s.recv(1024) - return data.decode('utf-8').strip() - except Exception as e: - return f"OFFLINE ({e})" - -def query_onramp_json(command): - """Queries the Python Onramp service via TCP.""" - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(0.5) - s.connect((ONRAMP_HOST, ONRAMP_PORT)) - s.sendall(command.encode('utf-8')) - data = s.recv(8192) # Larger buffer for 'live' data - return json.loads(data.decode('utf-8')) + return s.recv(1024).decode('utf-8').strip() except: return None -def format_candle(candle): - """Formats a single candle dictionary into a readable line.""" - return (f"O: {candle['open']:.2f} | H: {candle['high']:.2f} | " - f"L: {candle['low']:.2f} | C: {candle['close']:.2f} | V: {candle['volume']:.2f}") +def query_onramp(command): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Increase timeout slightly for the larger 'live' payload + s.settimeout(1.0) + s.connect((ONRAMP_HOST, ONRAMP_PORT)) + s.sendall(command.encode('utf-8')) + + chunks = [] + while True: + chunk = s.recv(4096) # Read in 4KB chunks + if not chunk: + break # Server closed connection, we have everything + chunks.append(chunk) + + full_data = b"".join(chunks).decode('utf-8') + return json.loads(full_data) + except Exception as e: + # For debugging, you can uncomment the line below: + # print(f"Socket Error: {e}") + return None + +def make_layout(): + layout = Layout() + layout.split( + Layout(name="header", size=3), + Layout(name="main", size=10), + Layout(name="market", size=12), + Layout(name="footer", size=3), + ) + layout["main"].split_row( + Layout(name="input_svc"), + Layout(name="onramp_svc"), + ) + return layout + +def get_input_panel(): + raw = query_input_go() + if not raw: + return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red") + + # The Go app returns: "Uptime: 1m2s | Total Msgs: 500 | Rate: 10.00 msg/min" + parts = raw.split("|") + content = "\n".join([p.strip() for p in parts]) + return Panel(content, title="[1] Input Service (Go)", border_style="green") + +def get_onramp_panel(): + data = query_onramp("status") + if not data: + return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Python)", border_style="red") + + last_ts = data.get('last_ts', 0) / 1000 + lag = time.time() - last_ts if last_ts > 0 else 0 + lag_style = "green" if lag < 2 else "yellow" if lag < 5 else "bold red" + + content = Text() + content.append(f"Uptime Start : {data.get('uptime_start')}\n") + content.append(f"Total Trades : {data.get('total_trades')}\n") + content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n") + content.append("Lag : ", style="white") + content.append(f"{lag:.2f}s", style=lag_style) + + return Panel(content, title="[2] Onramp Service (Python)", border_style="blue") + +def get_market_table(): + res = query_onramp("live") + table = Table(expand=True, border_style="cyan", header_style="bold cyan") + + table.add_column("TF", justify="center", style="bold yellow") + table.add_column("Last Update", justify="center") + table.add_column("Open", justify="right") + table.add_column("High", justify="right") + table.add_column("Low", justify="right") + table.add_column("Close", justify="right") + table.add_column("Volume", justify="right", style="magenta") + table.add_column("Buy %", justify="right") + + if res and "data" in res: + candles_data = res["data"] + # We want to show these specific rows + for tf in ["1m", "5m", "15m", "1h"]: + if tf in candles_data and candles_data[tf]: + # Get all timestamps, convert to int to find the latest one + all_timestamps = [int(ts) for ts in candles_data[tf].keys()] + latest_ts = str(max(all_timestamps)) + + c = candles_data[tf][latest_ts] + + # Format time + ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S') + + # Price Color (Bullish vs Bearish) + color = "green" if c['close'] >= c['open'] else "red" + + # Calculate Buy Volume Percentage + buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0 + buy_color = "green" if buy_pct > 50 else "red" + + table.add_row( + tf, + ts_str, + f"{c['open']:.2f}", + f"{c['high']:.2f}", + f"{c['low']:.2f}", + Text(f"{c['close']:.2f}", style=f"bold {color}"), + f"{c['volume']:.2f}", + Text(f"{buy_pct:.1f}%", style=buy_color) + ) + time.sleep(1) + else: + # Placeholder if service is offline or data not ready + table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-") + + return table def main(): - try: + layout = make_layout() + + with Live(layout, refresh_per_second=2, screen=True): while True: - # 1. Collect Data - input_status = query_input_go() - onramp_status = query_onramp_json("status") - onramp_live = query_onramp_json("live") + # Header + header_text = Text(f"BYBIT BTC UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", + justify="center", style="bold white on blue") + layout["header"].update(Panel(header_text)) - # 2. Clear Screen - print("\033[2J\033[H", end="") - - print("="*70) - print(f" UNIFIED MONITOR - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print("="*70) + # Body Panels + layout["input_svc"].update(get_input_panel()) + layout["onramp_svc"].update(get_onramp_panel()) - # 3. Display Input (Go) Section - print(f"\n[1] INPUT SERVICE (Go)") - print(f" Status: {input_status}") + # Market Table + layout["market"].update(get_market_table()) - # 4. Display Onramp (Python) Section - print(f"\n[2] ONRAMP SERVICE (Python)") - if onramp_status: - stats = onramp_status - # Calculate lag - last_ts = stats.get('last_ts', 0) / 1000 - lag = time.time() - last_ts if last_ts > 0 else 0 - - print(f" Uptime Start : {stats.get('uptime_start')}") - print(f" Processed : {stats.get('total_trades')} trades") - print(f" Current File : {os.path.basename(str(stats.get('last_file')))}") - print(f" Data Lag : {lag:.2f}s") - else: - print(" Status: OFFLINE") + # Footer + footer_text = Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim") + layout["footer"].update(footer_text) - # 5. Display Live Market Snapshot - if onramp_live and "data" in onramp_live: - print(f"\n[3] LIVE MARKET SNAPSHOT") - candles = onramp_live["data"] - # Show 1m and 1h as examples - for tf in ["1m", "1h"]: - if tf in candles: - # Get the most recent timestamp in that timeframe - latest_ts = max(candles[tf].keys()) - c = candles[tf][latest_ts] - ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M') - print(f" {tf} ({ts_str}) >> {format_candle(c)}") - - print("\n" + "="*70) - print(" (Ctrl+C to exit)") - time.sleep(REFRESH_RATE) - except KeyboardInterrupt: - print("\nClosing monitor...") if __name__ == "__main__": - main() \ No newline at end of file + main()