From 7d7038d6bde2afce06db1c486f4365ef199be4aa Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Wed, 14 Jan 2026 00:28:13 +0200 Subject: [PATCH] Monitor that shows analyst also. Analyst that controlls its logging. --- analysis/DATABASE.md | 248 ++++++++++++++++++++++++++++++++++++++++ analysis/Pipfile | 12 ++ analysis/analyst.py | 261 +++++++++++++++++++++++++++++++++++++++++++ analysis/config.json | 13 +++ monitor/monitor.py | 120 ++++++++++++++++---- 5 files changed, 631 insertions(+), 23 deletions(-) create mode 100644 analysis/DATABASE.md create mode 100644 analysis/Pipfile create mode 100755 analysis/analyst.py create mode 100644 analysis/config.json diff --git a/analysis/DATABASE.md b/analysis/DATABASE.md new file mode 100644 index 0000000..303de91 --- /dev/null +++ b/analysis/DATABASE.md @@ -0,0 +1,248 @@ +# Data Output Documentation: `analysis.db` + +## 1. Database Overview + +The `analysis.db` database is a **Derived Market Analysis Store**. + +It contains **technical indicator outputs** calculated from the raw OHLCV candle data stored in `candles.db`. +This database is **append-only**, preserves full historical analysis, and is designed to act as a **clean input source** for downstream services such as: + +* Signal generators +* Strategy evaluators +* Paper/live trading engines +* Dashboards and visualization layers +* Alerting systems + +- **Database Engine:** SQLite 3 +- **Concurrency Mode:** WAL (Write-Ahead Logging) enabled +- **Update Frequency:** Near real-time (continuous processing loop) +- **Retention Model:** Rolling window (currently ~1 month via cleanup + incremental vacuum) + +--- + +## 2. Relationship to `candles.db` + +This database is **fully derived** from `candles.db`. + +* Each row in `analysis.db` corresponds **1:1** with a *closed* candle from `candles.db` +* No raw trade or OHLCV data is duplicated beyond what is needed for indicator outputs +* Only **completed candles** are analyzed + (the currently-forming candle per timeframe is intentionally excluded) + +This separation ensures: + +* Clean responsibility boundaries +* Safe concurrent reads +* Zero risk of contaminating raw market data + +--- + +## 3. Schema Definition + +The database contains a single primary table: `analysis`. + +### Table: `analysis` + +| Column | Type | Description | +| -------------- | --------- | -------------------------------------------------------------- | +| `timeframe` | `TEXT` | Candle timeframe: `1m`, `5m`, `15m`, `1h` | +| `timestamp` | `INTEGER` | Unix timestamp (seconds) – **start of the candle window** | +| `ema_9` | `REAL` | Exponential Moving Average (9) | +| `ema_21` | `REAL` | Exponential Moving Average (21) | +| `sma_50` | `REAL` | Simple Moving Average (50) | +| `sma_200` | `REAL` | Simple Moving Average (200) | +| `rsi_14` | `REAL` | Relative Strength Index (14) | +| `macd` | `REAL` | MACD line (EMA12 − EMA26) | +| `macd_signal` | `REAL` | MACD signal line (9-period EMA of MACD) | +| `macd_hist` | `REAL` | MACD histogram (`macd − macd_signal`) | +| `bb_upper` | `REAL` | Bollinger Band upper (20, 2σ) | +| `bb_middle` | `REAL` | Bollinger Band middle (20 SMA) | +| `bb_lower` | `REAL` | Bollinger Band lower (20, 2σ) | +| `bb_squeeze` | `INTEGER` | Volatility squeeze flag (`1` = squeeze detected, `0` = normal) | +| `volume_ma_20` | `REAL` | 20-period moving average of volume | + +### Primary Key + +``` +(timeframe, timestamp) +``` + +This guarantees: + +* No duplicate indicator rows +* Perfect alignment with candle boundaries +* Deterministic joins with `candles.db` + +--- + +## 4. Indicator Semantics + +### Moving Averages + +* **EMA 9 / 21**: Short-term momentum and trend confirmation +* **SMA 50 / 200**: Medium- and long-term trend structure + +### RSI (14) + +* Range: `0–100` +* Typical interpretations: + + * `>70` → Overbought + * `<30` → Oversold +* Calculated using classic Wilder-style average gains/losses + +### MACD (12, 26, 9) + +* `macd`: Momentum direction +* `macd_signal`: Smoothed momentum +* `macd_hist`: Acceleration / deceleration of momentum + +### Bollinger Bands (20, 2) + +* Measures volatility expansion and contraction +* Bands are calculated using 20-period SMA ± 2 standard deviations + +#### Bollinger Squeeze (`bb_squeeze`) + +* `1` when Bollinger Band width is at a **local minimum** +* Indicates **volatility compression** +* Often precedes large directional moves +* Designed for breakout-style strategies + +### Volume MA (20) + +* Used for: + + * Breakout confirmation + * Divergence detection + * Trend strength validation + +--- + +## 5. Time Handling & Candle Validity + +* All timestamps represent the **start** of the candle window +* Example: + + * A `1m` candle at `12:00:00` covers `12:00:00 → 12:00:59` +* Only **closed candles** are analyzed +* No partial or live candle values exist in this database + +This makes `analysis.db` safe for: + +* Backtesting +* Deterministic replay +* Strategy evaluation without repainting risk + +--- + +## 6. Accessing the Data + +### Recommended Connection Settings (Python) + +```python +import sqlite3 +import pandas as pd + +def get_connection(db_path): + conn = sqlite3.connect(db_path, timeout=10) + conn.execute("PRAGMA journal_mode=WAL;") + return conn +``` + +--- + +### Common Query Patterns + +#### Latest analysis for a timeframe + +```sql +SELECT * +FROM analysis +WHERE timeframe = '5m' +ORDER BY timestamp DESC +LIMIT 1; +``` + +--- + +#### Last 200 RSI values (1-minute) + +```sql +SELECT timestamp, rsi_14 +FROM analysis +WHERE timeframe = '1m' +ORDER BY timestamp DESC +LIMIT 200; +``` + +--- + +#### Detect active Bollinger squeezes + +```sql +SELECT timeframe, timestamp +FROM analysis +WHERE bb_squeeze = 1 +ORDER BY timestamp DESC; +``` + +--- + +#### Join with raw candles for strategy logic + +```sql +SELECT + c.timestamp, + c.close, + a.ema_21, + a.rsi_14, + a.macd_hist +FROM candles c +JOIN analysis a + ON c.timeframe = a.timeframe + AND c.timestamp = a.timestamp +WHERE c.timeframe = '15m' +ORDER BY c.timestamp DESC +LIMIT 100; +``` + +--- + +## 7. Retention, Cleanup & Vacuuming + +* The analysis engine enforces a **rolling ~1-month window** +* Rows older than the cutoff are deleted +* `PRAGMA incremental_vacuum` is used to reclaim disk space gradually + +This avoids: + +* Long blocking `VACUUM` operations +* Database bloat over long-running operation + +--- + +## 8. Intended Usage Patterns + +`analysis.db` is designed to be: + +* **Read-heavy** +* **Safe for multiple consumers** +* **Stable and deterministic** + +Typical consumers include: + +* Signal engines (rule-based or ML-driven) +* Strategy backtesters +* Alert pipelines +* Visualization / dashboards +* Risk and regime detection modules + +--- + +## 9. Design Philosophy + +* **Separation of concerns**: raw data vs. derived signals +* **Determinism first**: no repainting, no live-candle noise +* **SQLite-friendly**: WAL mode, append-only, incremental vacuum +* **Composable**: easy joins, easy extensions, predictable schema diff --git a/analysis/Pipfile b/analysis/Pipfile new file mode 100644 index 0000000..40fbb7a --- /dev/null +++ b/analysis/Pipfile @@ -0,0 +1,12 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +pandas = "*" + +[dev-packages] + +[requires] +python_version = "3.13" diff --git a/analysis/analyst.py b/analysis/analyst.py new file mode 100755 index 0000000..d298d9a --- /dev/null +++ b/analysis/analyst.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 +""" +BTC/USDT Continuous Analysis Engine + +This script continuously reads BTC/USDT candle data from `candles.db`, +computes technical indicators (EMA, SMA, RSI, MACD, Bollinger Bands with squeeze, Volume MA), +and stores the results in `analysis.db`. It uses SQLite with WAL mode for safe concurrent reads :contentReference[oaicite:0]{index=0}, +ensures only closed candles are processed (filtering out the current open candle):contentReference[oaicite:1]{index=1}, +and maintains a sliding one-month window of analysis data with incremental vacuuming. +A TCP status server provides health information (new candles processed, last timestamps, active timeframes). + +Configuration (config.json): +{ + "candels_database": "/databases/candles.db", + "analysis_database": "./analysis.db", + "status_host": "127.0.0.1", + "status_port": 9997 +} +""" + +import sqlite3 +import pandas as pd +import json +import time +import threading +import socketserver +import logging +import os +from datetime import datetime, timedelta + +# ========== Configuration ========== +with open('config.json', 'r') as f: + config = json.load(f) +CANDLES_DB = config.get('candels_database') or config.get('candles_database') +ANALYSIS_DB = config.get('analysis_database') +STATUS_HOST = config.get('status_host', '127.0.0.1') +STATUS_PORT = config.get('status_port', 9997) + +# ========== Logging Setup ========== + +log_cfg = config.get("logging", {}) + +LOG_TO_STDOUT = log_cfg.get("stdout", True) +LOG_TO_FILE = log_cfg.get("file", False) +LOG_FILE_PATH = log_cfg.get("file_path", "./analyst.log") +LOG_LEVEL = log_cfg.get("level", "INFO").upper() + +LOG_FORMAT = "%(asctime)s [%(name)s] %(levelname)s: %(message)s" +DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +# Root logger +root_logger = logging.getLogger() +root_logger.setLevel(LOG_LEVEL) + +# Prevent duplicate handlers if module reloads +root_logger.handlers.clear() + +formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT) + +# --- STDOUT handler --- +if LOG_TO_STDOUT: + stdout_handler = logging.StreamHandler() + stdout_handler.setFormatter(formatter) + root_logger.addHandler(stdout_handler) + +# --- FILE handler --- +if LOG_TO_FILE: + os.makedirs(os.path.dirname(LOG_FILE_PATH), exist_ok=True) + + file_handler = logging.FileHandler(LOG_FILE_PATH) + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + +# Named loggers (inherit handlers) +logger = logging.getLogger("AnalysisEngine") +status_logger = logging.getLogger("StatusServer") + +logger.info("Logging initialized") +logger.info( + "stdout=%s file=%s level=%s path=%s", + LOG_TO_STDOUT, + LOG_TO_FILE, + LOG_LEVEL, + LOG_FILE_PATH if LOG_TO_FILE else "-" +) + + +# ========== SQLite Connections ========== +def get_candles_connection(path): + # Connect with timeout and WAL mode to avoid locks:contentReference[oaicite:2]{index=2} + conn = sqlite3.connect(path, timeout=10, check_same_thread=False) + conn.execute("PRAGMA journal_mode=WAL;") + return conn + +candles_conn = get_candles_connection(CANDLES_DB) +analysis_conn = sqlite3.connect(ANALYSIS_DB, timeout=10, check_same_thread=False) +analysis_conn.execute("PRAGMA journal_mode=WAL;") + +# Create analysis table if it doesn't exist +analysis_conn.execute(""" +CREATE TABLE IF NOT EXISTS analysis ( + timeframe TEXT, + timestamp INTEGER, + ema_9 REAL, + ema_21 REAL, + sma_50 REAL, + sma_200 REAL, + rsi_14 REAL, + macd REAL, + macd_signal REAL, + macd_hist REAL, + bb_upper REAL, + bb_middle REAL, + bb_lower REAL, + bb_squeeze INTEGER, + volume_ma_20 REAL, + PRIMARY KEY (timeframe, timestamp) +) +""") +analysis_conn.commit() + +# ========== Technical Indicator Functions ========== +def compute_indicators(df): + close = df['close'] + # EMA and SMA + df['ema_9'] = close.ewm(span=9, adjust=False).mean() + df['ema_21'] = close.ewm(span=21, adjust=False).mean() + df['sma_50'] = close.rolling(window=50, min_periods=1).mean() + df['sma_200'] = close.rolling(window=200, min_periods=1).mean() + # RSI (14): using 14-period gains/losses and RSI formula (100 - 100/(1+RS)):contentReference[oaicite:3]{index=3} + delta = close.diff() + gain = delta.clip(lower=0) + loss = -delta.clip(upper=0) + avg_gain = gain.rolling(window=14, min_periods=14).mean() + avg_loss = loss.rolling(window=14, min_periods=14).mean() + rs = avg_gain / avg_loss.replace(0, pd.NA) + df['rsi_14'] = 100 - (100 / (1 + rs)) + # MACD (12,26,9) + ema12 = close.ewm(span=12, adjust=False).mean() + ema26 = close.ewm(span=26, adjust=False).mean() + macd_line = ema12 - ema26 + df['macd'] = macd_line + df['macd_signal'] = macd_line.ewm(span=9, adjust=False).mean() + df['macd_hist'] = df['macd'] - df['macd_signal'] + # Bollinger Bands (20,2) + df['bb_middle'] = close.rolling(window=20, min_periods=20).mean() + bb_std = close.rolling(window=20, min_periods=20).std() + df['bb_upper'] = df['bb_middle'] + 2 * bb_std + df['bb_lower'] = df['bb_middle'] - 2 * bb_std + # Bollinger Squeeze: detect when BB width is lowest over 20 periods:contentReference[oaicite:4]{index=4} + bb_width = df['bb_upper'] - df['bb_lower'] + rolling_min_width = bb_width.rolling(window=20, min_periods=20).min() + df['bb_squeeze'] = (bb_width <= rolling_min_width).astype(int) + # Volume moving average (20) + df['volume_ma_20'] = df['volume'].rolling(window=20, min_periods=1).mean() + return df + +# ========== Health Check Server ========== +status_lock = threading.Lock() +status_data = {tf: {'new': 0, 'last': None} for tf in ["1m", "5m", "15m", "1h"]} +active_timeframes = list(status_data.keys()) + +class StatusHandler(socketserver.BaseRequestHandler): + def handle(self): + with status_lock: + report = { + 'timeframes': { + tf: {'new': status_data[tf]['new'], 'last': status_data[tf]['last']} + for tf in status_data + }, + 'active_timeframes': active_timeframes + } + self.request.sendall(json.dumps(report).encode()) + +def start_status_server(host, port): + server = socketserver.ThreadingTCPServer((host, port), StatusHandler) + status_logger.info(f"Status server listening on {host}:{port}") + threading.Thread(target=server.serve_forever, daemon=True).start() + +start_status_server(STATUS_HOST, STATUS_PORT) + +# ========== Main Loop ========== +timeframes = {"1m":60, "5m":300, "15m":900, "1h":3600} +logger.info("Starting analysis loop") + +while True: + now = int(time.time()) + one_month_ago = now - 30*24*3600 + for tf, tf_seconds in timeframes.items(): + try: + cur = analysis_conn.cursor() + cur.execute("SELECT MAX(timestamp) FROM analysis WHERE timeframe=?", (tf,)) + row = cur.fetchone() + last_processed = row[0] if row and row[0] is not None else None + if last_processed: + begin_time = last_processed + else: + begin_time = one_month_ago + # Only closed candles: timestamp < current_window_start:contentReference[oaicite:5]{index=5} + window_start = (now // tf_seconds) * tf_seconds + query = """ + SELECT timestamp, open, high, low, close, volume + FROM candles + WHERE timeframe = ? + AND timestamp >= ? + AND timestamp < ? + ORDER BY timestamp ASC + """ + df = pd.read_sql_query(query, candles_conn, params=(tf, begin_time, window_start)) + if df.empty: + new_count = 0 + else: + df = compute_indicators(df) + if last_processed: + new_df = df[df['timestamp'] > last_processed].copy() + else: + new_df = df.copy() + new_count = len(new_df) + if new_count > 0: + for _, row in new_df.iterrows(): + analysis_conn.execute(""" + INSERT OR IGNORE INTO analysis ( + timeframe, timestamp, ema_9, ema_21, sma_50, sma_200, + rsi_14, macd, macd_signal, macd_hist, + bb_upper, bb_middle, bb_lower, bb_squeeze, + volume_ma_20 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + tf, + int(row['timestamp']), + float(row['ema_9']) if pd.notnull(row['ema_9']) else None, + float(row['ema_21']) if pd.notnull(row['ema_21']) else None, + float(row['sma_50']) if pd.notnull(row['sma_50']) else None, + float(row['sma_200']) if pd.notnull(row['sma_200']) else None, + float(row['rsi_14']) if pd.notnull(row['rsi_14']) else None, + float(row['macd']) if pd.notnull(row['macd']) else None, + float(row['macd_signal']) if pd.notnull(row['macd_signal']) else None, + float(row['macd_hist']) if pd.notnull(row['macd_hist']) else None, + float(row['bb_upper']) if pd.notnull(row['bb_upper']) else None, + float(row['bb_middle']) if pd.notnull(row['bb_middle']) else None, + float(row['bb_lower']) if pd.notnull(row['bb_lower']) else None, + int(row['bb_squeeze']), + float(row['volume_ma_20']) if pd.notnull(row['volume_ma_20']) else None + )) + analysis_conn.commit() + with status_lock: + status_data[tf]['new'] = new_count + if new_count > 0: + status_data[tf]['last'] = int(new_df['timestamp'].max()) + # Log processing result + logger.info(f"[{tf}] New candles: {new_count}, Last timestamp: {status_data[tf]['last']}") + except Exception as e: + logger.error(f"Error processing timeframe {tf}: {e}") + # Vacuum/cleanup for sliding window effect + try: + analysis_conn.execute("PRAGMA incremental_vacuum;") + analysis_conn.execute("DELETE FROM analysis WHERE timestamp < ?", (one_month_ago,)) + analysis_conn.commit() + except Exception as e: + logger.error(f"Error during vacuum/cleanup: {e}") + time.sleep(30) diff --git a/analysis/config.json b/analysis/config.json new file mode 100644 index 0000000..6ee6d72 --- /dev/null +++ b/analysis/config.json @@ -0,0 +1,13 @@ +{ + "candels_database": "../onramp/market_data.db", + "analysis_database": "./analysis.db", + "status_host": "127.0.0.1", + "status_port": 9997, + + "logging": { + "stdout": false, + "file": true, + "file_path": "./logs/analyst.log", + "level": "INFO" + } +} \ No newline at end of file diff --git a/monitor/monitor.py b/monitor/monitor.py index 750aa90..f73efa1 100755 --- a/monitor/monitor.py +++ b/monitor/monitor.py @@ -5,7 +5,7 @@ import json import time import os from datetime import datetime -from collections import deque # Added for history tracking +from collections import deque from rich.live import Live from rich.table import Table @@ -16,8 +16,13 @@ from rich.text import Text # --- CONFIGURATION --- INPUT_SOCKET = "/tmp/streamer.sock" + ONRAMP_HOST = "127.0.0.1" ONRAMP_PORT = 9999 + +ANALYST_HOST = "127.0.0.1" +ANALYST_PORT = 9997 + REFRESH_RATE = 1.0 # Global state to track lag history (last 300 seconds) @@ -25,6 +30,8 @@ LAG_HISTORY = deque(maxlen=300) console = Console() +# ------------------- QUERY HELPERS ------------------- + def query_input_go(): if not os.path.exists(INPUT_SOCKET): return None @@ -36,12 +43,13 @@ def query_input_go(): except: return None -def query_onramp(command): + +def query_tcp_json(host, port, payload=b"status"): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1.0) - s.connect((ONRAMP_HOST, ONRAMP_PORT)) - s.sendall(command.encode('utf-8')) + s.connect((host, port)) + s.sendall(payload) chunks = [] while True: @@ -50,11 +58,13 @@ def query_onramp(command): break chunks.append(chunk) - full_data = b"".join(chunks).decode('utf-8') - return json.loads(full_data) + return json.loads(b"".join(chunks).decode()) except: return None + +# ------------------- LAYOUT ------------------- + def make_layout(): layout = Layout() layout.split( @@ -66,35 +76,38 @@ def make_layout(): layout["main"].split_row( Layout(name="input_svc"), Layout(name="onramp_svc"), + Layout(name="analyst_svc"), ) return layout + +# ------------------- PANELS ------------------- + def get_input_panel(): raw = query_input_go() if not raw: - return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red") + return Panel(Text("OFFLINE", style="bold red"), + title="[1] Input Service (Go)", border_style="red") parts = raw.split("|") content = "\n".join([p.strip() for p in parts]) return Panel(content, title="[1] Input Service (Go)", border_style="green") -def get_onramp_panel(): - data = query_onramp("status") - if not data: - LAG_HISTORY.clear() # Clear history if service goes down - return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Go)", border_style="red") - # 1. Calculate Instant Lag +def get_onramp_panel(): + data = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT) + if not data: + LAG_HISTORY.clear() + return Panel(Text("OFFLINE", style="bold red"), + title="[2] Onramp Service (Go)", border_style="red") + last_ts = data.get('last_ts', 0) / 1000 current_lag = time.time() - last_ts if last_ts > 0 else 0 - # 2. Update History LAG_HISTORY.append(current_lag) - # 3. Calculate Averages (Load Average style) avg_1m = sum(list(LAG_HISTORY)[-60:]) / min(len(LAG_HISTORY), 60) avg_5m = sum(LAG_HISTORY) / len(LAG_HISTORY) - # 4. Determine Styling lag_style = "green" if current_lag < 2 else "yellow" if current_lag < 5 else "bold red" content = Text() @@ -102,17 +115,53 @@ def get_onramp_panel(): content.append(f"Total Trades : {data.get('total_trades')}\n") content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n") - # The "Load Average" line - content.append("Lag (Avg) : ", style="white") + content.append("Lag (Avg) : ") content.append(f"{current_lag:.2f}s", style=lag_style) content.append(f", {avg_1m:.2f}s/1m", style="dim" if avg_1m < 2 else "yellow") content.append(f", {avg_5m:.2f}s/5m", style="dim" if avg_5m < 2 else "yellow") return Panel(content, title="[2] Onramp Service (Go)", border_style="blue") + +def get_analyst_panel(): + data = query_tcp_json(ANALYST_HOST, ANALYST_PORT) + if not data: + return Panel(Text("OFFLINE", style="bold red"), + title="[3] Analyst Service (Python)", border_style="red") + + tf_data = data.get("timeframes", {}) + active = ", ".join(data.get("active_timeframes", [])) + + content = Text() + content.append(f"Active TFs : {active}\n\n") + + for tf in ["1m", "5m", "15m", "1h"]: + tf_info = tf_data.get(tf) + if not tf_info: + continue + + last_ts = tf_info.get("last") + last_str = ( + datetime.fromtimestamp(last_ts).strftime('%H:%M:%S') + if last_ts else "n/a" + ) + + new = tf_info.get("new", 0) + color = "green" if new > 0 else "dim" + + content.append(f"{tf:>4} | ") + content.append(f"+{new:<4}", style=color) + content.append(f" last: {last_str}\n") + + return Panel(content, title="[3] Analyst Service (Python)", border_style="magenta") + + +# ------------------- MARKET TABLE ------------------- + def get_market_table(): - res = query_onramp("live") + res = query_tcp_json(ONRAMP_HOST, ONRAMP_PORT, b"live") table = Table(expand=True, border_style="cyan", header_style="bold cyan") + table.add_column("TF", justify="center", style="bold yellow") table.add_column("Last Update", justify="center") table.add_column("Open", justify="right") @@ -136,24 +185,49 @@ def get_market_table(): buy_color = "green" if buy_pct > 50 else "red" table.add_row( - tf, ts_str, f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}", + tf, ts_str, + f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}", Text(f"{c['close']:.2f}", style=f"bold {color}"), - f"{c['volume']:.2f}", Text(f"{buy_pct:.1f}%", style=buy_color) + f"{c['volume']:.2f}", + Text(f"{buy_pct:.1f}%", style=buy_color) ) else: table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-") + return table + +# ------------------- MAIN ------------------- + def main(): layout = make_layout() with Live(layout, refresh_per_second=2, screen=True): while True: - layout["header"].update(Panel(Text(f"BYBIT BTCUSDT UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", justify="center", style="bold white on blue"))) + layout["header"].update( + Panel( + Text( + f"BYBIT BTCUSDT UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", + justify="center", + style="bold white on blue" + ) + ) + ) + layout["input_svc"].update(get_input_panel()) layout["onramp_svc"].update(get_onramp_panel()) + layout["analyst_svc"].update(get_analyst_panel()) layout["market"].update(get_market_table()) - layout["footer"].update(Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim")) + + layout["footer"].update( + Text( + "Ctrl+C to exit | Pipeline: Input → Onramp → Analyst", + justify="center", + style="dim" + ) + ) + time.sleep(REFRESH_RATE) + if __name__ == "__main__": main()