Files
BytbitBTC/monitor/monitor.py

357 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import socket
import json
import time
import os
from datetime import datetime
from collections import deque
from rich.live import Live
from rich.table import Table
from rich.layout import Layout
from rich.panel import Panel
from rich.console import Console
from rich.text import Text
# --- CONFIGURATION ---
INPUT_SOCKET = "/tmp/streamer.sock"
SIGNALS_HEALTH_SOCKET = "/tmp/signals_health.sock"
ONRAMP_HOST = "127.0.0.1"
ONRAMP_PORT = 9999
ANALYST_HOST = "127.0.0.1"
ANALYST_PORT = 9997
REFRESH_RATE = 1.0
# Global state to track lag history (last 300 seconds)
LAG_HISTORY = deque(maxlen=300)
console = Console()
# ------------------- QUERY HELPERS -------------------
def query_input_go():
if not os.path.exists(INPUT_SOCKET):
return None
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
s.connect(INPUT_SOCKET)
return s.recv(1024).decode('utf-8').strip()
except:
return None
def query_signals_health():
if not os.path.exists(SIGNALS_HEALTH_SOCKET):
return None
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
s.connect(SIGNALS_HEALTH_SOCKET)
chunks = []
while True:
chunk = s.recv(4096)
if not chunk:
break
chunks.append(chunk)
return json.loads(b"".join(chunks).decode())
except:
return None
def query_tcp_json(host, port, payload=b"status"):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1.0)
s.connect((host, port))
s.sendall(payload)
chunks = []
while True:
chunk = s.recv(4096)
if not chunk:
break
chunks.append(chunk)
return json.loads(b"".join(chunks).decode())
except:
return None
# ------------------- LAYOUT -------------------
def make_layout():
layout = Layout()
layout.split(
Layout(name="header", size=3),
Layout(name="services", size=18),
Layout(name="signals_table", size=8),
Layout(name="market", size=12),
Layout(name="footer", size=3),
)
# Split services into 2x2 grid
layout["services"].split_column(
Layout(name="services_row1"),
Layout(name="services_row2"),
)
layout["services_row1"].split_row(
Layout(name="input_svc"),
Layout(name="onramp_svc"),
)
layout["services_row2"].split_row(
Layout(name="analyst_svc"),
Layout(name="signals_svc"),
)
return layout
# ------------------- PANELS -------------------
def get_input_panel():
raw = query_input_go()
if not raw:
return Panel(Text("OFFLINE", style="bold red"),
title="[1] Input Service (Go)", border_style="red")
parts = raw.split("|")
content = "\n".join([p.strip() for p in parts])
return Panel(content, title="[1] Input Service (Go)", border_style="green")
def get_onramp_panel():
data = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT)
if not data:
LAG_HISTORY.clear()
return Panel(Text("OFFLINE", style="bold red"),
title="[2] Onramp Service (Go)", border_style="red")
last_ts = data.get('last_ts', 0) / 1000
current_lag = time.time() - last_ts if last_ts > 0 else 0
LAG_HISTORY.append(current_lag)
avg_1m = sum(list(LAG_HISTORY)[-60:]) / min(len(LAG_HISTORY), 60)
avg_5m = sum(LAG_HISTORY) / len(LAG_HISTORY)
lag_style = "green" if current_lag < 2 else "yellow" if current_lag < 5 else "bold red"
content = Text()
content.append(f"Uptime Start : {data.get('uptime_start')}\n")
content.append(f"Total Trades : {data.get('total_trades')}\n")
content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n")
content.append("Lag (Avg) : ")
content.append(f"{current_lag:.2f}s", style=lag_style)
content.append(f", {avg_1m:.2f}s/1m", style="dim" if avg_1m < 2 else "yellow")
content.append(f", {avg_5m:.2f}s/5m", style="dim" if avg_5m < 2 else "yellow")
return Panel(content, title="[2] Onramp Service (Go)", border_style="blue")
def get_analyst_panel():
data = query_tcp_json(ANALYST_HOST, ANALYST_PORT)
if not data:
return Panel(Text("OFFLINE", style="bold red"),
title="[3] Analyst Service (Python)", border_style="red")
tf_data = data.get("timeframes", {})
active = ", ".join(data.get("active_timeframes", []))
content = Text()
content.append(f"Active TFs : {active}\n\n")
for tf in ["1m", "5m", "15m", "1h"]:
tf_info = tf_data.get(tf)
if not tf_info:
continue
last_ts = tf_info.get("last")
last_str = (
datetime.fromtimestamp(last_ts).strftime('%H:%M:%S')
if last_ts else "n/a"
)
new = tf_info.get("new", 0)
color = "green" if new > 0 else "dim"
content.append(f"{tf:>4} | ")
content.append(f"+{new:<4}", style=color)
content.append(f" last: {last_str}\n")
return Panel(content, title="[3] Analyst Service (Python)", border_style="magenta")
def get_signals_panel():
data = query_signals_health()
if not data:
return Panel(Text("OFFLINE", style="bold red"),
title="[4] Signal Generator (Python)", border_style="red")
status_color = "green" if data.get("status") == "running" else "red"
content = Text()
content.append(f"Status : ")
content.append(f"{data.get('status', 'unknown').upper()}", style=f"bold {status_color}")
content.append(f"\nPersonality : {data.get('personality', 'n/a')}\n")
content.append(f"Timeframes : {', '.join(data.get('timeframes', []))}\n")
content.append(f"Uptime : {data.get('uptime_seconds', 0)}s ({data.get('uptime_seconds', 0)//60}m)\n")
total = data.get('total_signals', 0)
buy = data.get('buy_signals', 0)
sell = data.get('sell_signals', 0)
content.append(f"Total Sigs : {total} (")
content.append(f"{buy}", style="green")
content.append(" / ")
content.append(f"{sell}", style="red")
content.append(")\n")
content.append(f"Errors : {data.get('errors', 0)}\n")
content.append(f"Clients : {data.get('connected_clients', 0)}\n")
last_sig = data.get('last_signal')
if last_sig:
try:
last_time = datetime.fromisoformat(last_sig.replace('Z', '+00:00'))
time_ago = (datetime.now(last_time.tzinfo) - last_time).total_seconds()
content.append(f"Last Signal : {int(time_ago)}s ago")
except:
content.append(f"Last Signal : {last_sig}")
else:
content.append(f"Last Signal : None")
border_color = "green" if data.get("status") == "running" else "red"
return Panel(content, title="[4] Signal Generator (Python)", border_style=border_color)
# ------------------- SIGNALS TABLE -------------------
def get_signals_table():
data = query_signals_health()
table = Table(expand=True, border_style="yellow", header_style="bold yellow")
table.add_column("Time", justify="center", style="dim")
table.add_column("TF", justify="center")
table.add_column("Signal", justify="center", style="bold")
table.add_column("Price", justify="right")
table.add_column("Conf", justify="right")
table.add_column("Reason", justify="left", no_wrap=False)
if data and "recent_signals" in data and data["recent_signals"]:
for sig in data["recent_signals"][-5:]: # Last 5 signals
try:
sig_time = datetime.fromisoformat(sig['generated_at'].replace('Z', '+00:00'))
time_str = sig_time.strftime('%H:%M:%S')
except:
time_str = "n/a"
signal_type = sig.get('signal', '?')
signal_color = "green" if signal_type == "BUY" else "red"
# Take first 2 reasons
reasons = sig.get('reasons', [])
reason_str = ", ".join(reasons[:2])
if len(reasons) > 2:
reason_str += "..."
table.add_row(
time_str,
sig.get('timeframe', '?'),
Text(signal_type, style=f"bold {signal_color}"),
f"${sig.get('price', 0):.2f}",
f"{sig.get('confidence', 0)*100:.0f}%",
reason_str
)
else:
table.add_row("", "", "", "", "", "No signals yet")
return Panel(table, title="Recent Signals", border_style="yellow")
# ------------------- MARKET TABLE -------------------
def get_market_table():
res = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT, b"live")
table = Table(expand=True, border_style="cyan", header_style="bold cyan")
table.add_column("TF", justify="center", style="bold yellow")
table.add_column("Last Update", justify="center")
table.add_column("Open", justify="right")
table.add_column("High", justify="right")
table.add_column("Low", justify="right")
table.add_column("Close", justify="right")
table.add_column("Volume", justify="right", style="magenta")
table.add_column("Buy %", justify="right")
if res and "data" in res:
candles_data = res["data"]
for tf in ["1m", "5m", "15m", "1h"]:
if tf in candles_data and candles_data[tf]:
all_ts = [int(ts) for ts in candles_data[tf].keys()]
latest_ts = str(max(all_ts))
c = candles_data[tf][latest_ts]
ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S')
color = "green" if c['close'] >= c['open'] else "red"
buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0
buy_color = "green" if buy_pct > 50 else "red"
table.add_row(
tf, ts_str,
f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}",
Text(f"{c['close']:.2f}", style=f"bold {color}"),
f"{c['volume']:.2f}",
Text(f"{buy_pct:.1f}%", style=buy_color)
)
else:
table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-")
return Panel(table, title="Live Market Data", border_style="cyan")
# ------------------- MAIN -------------------
def main():
layout = make_layout()
try:
with Live(layout, refresh_per_second=2, screen=True):
while True:
layout["header"].update(
Panel(
Text(
f"BYBIT BTCUSDT UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}",
justify="center",
style="bold white on blue"
)
)
)
layout["input_svc"].update(get_input_panel())
layout["onramp_svc"].update(get_onramp_panel())
layout["analyst_svc"].update(get_analyst_panel())
layout["signals_svc"].update(get_signals_panel())
layout["signals_table"].update(get_signals_table())
layout["market"].update(get_market_table())
layout["footer"].update(
Text(
"Ctrl+C to exit | Pipeline: Input → Onramp → Analyst → Signals",
justify="center",
style="dim"
)
)
time.sleep(REFRESH_RATE)
except KeyboardInterrupt:
console.print("\n[yellow]Shutting down monitor...[/yellow]")
finally:
console.print("[green]Monitor stopped.[/green]")
if __name__ == "__main__":
main()