From 5d2aa8f4997a9c639c354db26bc2e89254e4b9c3 Mon Sep 17 00:00:00 2001 From: Kalzu Rekku Date: Tue, 13 Jan 2026 22:51:43 +0200 Subject: [PATCH] Made the onramp in Go with way better architecture. Created onramp/DATABASE.md to help with next development cycle. --- monitor/Pipfile | 12 ++ monitor/monitor.py | 90 ++++++-------- onramp/DATABASE.md | 124 +++++++++++++++++++ onramp/config.json | 2 +- onramp/go.mod | 5 + onramp/onramp.go | 295 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 474 insertions(+), 54 deletions(-) create mode 100644 monitor/Pipfile create mode 100644 onramp/DATABASE.md create mode 100644 onramp/go.mod create mode 100644 onramp/onramp.go diff --git a/monitor/Pipfile b/monitor/Pipfile new file mode 100644 index 0000000..cf080f9 --- /dev/null +++ b/monitor/Pipfile @@ -0,0 +1,12 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +rich = "*" + +[dev-packages] + +[requires] +python_version = "3.13" diff --git a/monitor/monitor.py b/monitor/monitor.py index 1f6678f..f7dd667 100755 --- a/monitor/monitor.py +++ b/monitor/monitor.py @@ -5,6 +5,7 @@ import json import time import os from datetime import datetime +from collections import deque # Added for history tracking from rich.live import Live from rich.table import Table @@ -12,13 +13,15 @@ from rich.layout import Layout from rich.panel import Panel from rich.console import Console from rich.text import Text -from rich.columns import Columns # --- CONFIGURATION --- INPUT_SOCKET = "/tmp/streamer.sock" ONRAMP_HOST = "127.0.0.1" ONRAMP_PORT = 9999 -REFRESH_RATE = 1.0 +REFRESH_RATE = 1.0 + +# Global state to track lag history (last 300 seconds) +LAG_HISTORY = deque(maxlen=300) console = Console() @@ -36,23 +39,20 @@ def query_input_go(): def query_onramp(command): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - # Increase timeout slightly for the larger 'live' payload s.settimeout(1.0) s.connect((ONRAMP_HOST, ONRAMP_PORT)) s.sendall(command.encode('utf-8')) chunks = [] while True: - chunk = s.recv(4096) # Read in 4KB chunks + chunk = s.recv(4096) if not chunk: - break # Server closed connection, we have everything + break chunks.append(chunk) full_data = b"".join(chunks).decode('utf-8') return json.loads(full_data) - except Exception as e: - # For debugging, you can uncomment the line below: - # print(f"Socket Error: {e}") + except: return None def make_layout(): @@ -73,8 +73,6 @@ def get_input_panel(): raw = query_input_go() if not raw: return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red") - - # The Go app returns: "Uptime: 1m2s | Total Msgs: 500 | Rate: 10.00 msg/min" parts = raw.split("|") content = "\n".join([p.strip() for p in parts]) return Panel(content, title="[1] Input Service (Go)", border_style="green") @@ -82,25 +80,39 @@ def get_input_panel(): def get_onramp_panel(): data = query_onramp("status") if not data: + LAG_HISTORY.clear() # Clear history if service goes down return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Python)", border_style="red") - + + # 1. Calculate Instant Lag last_ts = data.get('last_ts', 0) / 1000 - lag = time.time() - last_ts if last_ts > 0 else 0 - lag_style = "green" if lag < 2 else "yellow" if lag < 5 else "bold red" + current_lag = time.time() - last_ts if last_ts > 0 else 0 + + # 2. Update History + LAG_HISTORY.append(current_lag) + + # 3. Calculate Averages (Load Average style) + avg_1m = sum(list(LAG_HISTORY)[-60:]) / min(len(LAG_HISTORY), 60) + avg_5m = sum(LAG_HISTORY) / len(LAG_HISTORY) + + # 4. Determine Styling + lag_style = "green" if current_lag < 2 else "yellow" if current_lag < 5 else "bold red" content = Text() content.append(f"Uptime Start : {data.get('uptime_start')}\n") content.append(f"Total Trades : {data.get('total_trades')}\n") content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n") - content.append("Lag : ", style="white") - content.append(f"{lag:.2f}s", style=lag_style) - + + # The "Load Average" line + content.append("Lag (Avg) : ", style="white") + content.append(f"{current_lag:.2f}s", style=lag_style) + content.append(f", {avg_1m:.2f}s/1m", style="dim" if avg_1m < 2 else "yellow") + content.append(f", {avg_5m:.2f}s/5m", style="dim" if avg_5m < 2 else "yellow") + return Panel(content, title="[2] Onramp Service (Python)", border_style="blue") def get_market_table(): res = query_onramp("live") table = Table(expand=True, border_style="cyan", header_style="bold cyan") - table.add_column("TF", justify="center", style="bold yellow") table.add_column("Last Update", justify="center") table.add_column("Open", justify="right") @@ -112,64 +124,36 @@ def get_market_table(): if res and "data" in res: candles_data = res["data"] - # We want to show these specific rows for tf in ["1m", "5m", "15m", "1h"]: if tf in candles_data and candles_data[tf]: - # Get all timestamps, convert to int to find the latest one - all_timestamps = [int(ts) for ts in candles_data[tf].keys()] - latest_ts = str(max(all_timestamps)) - + all_ts = [int(ts) for ts in candles_data[tf].keys()] + latest_ts = str(max(all_ts)) c = candles_data[tf][latest_ts] - - # Format time + ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S') - - # Price Color (Bullish vs Bearish) color = "green" if c['close'] >= c['open'] else "red" - - # Calculate Buy Volume Percentage buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0 buy_color = "green" if buy_pct > 50 else "red" table.add_row( - tf, - ts_str, - f"{c['open']:.2f}", - f"{c['high']:.2f}", - f"{c['low']:.2f}", + tf, ts_str, f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}", Text(f"{c['close']:.2f}", style=f"bold {color}"), - f"{c['volume']:.2f}", - Text(f"{buy_pct:.1f}%", style=buy_color) + f"{c['volume']:.2f}", Text(f"{buy_pct:.1f}%", style=buy_color) ) - time.sleep(1) else: - # Placeholder if service is offline or data not ready table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-") - return table def main(): layout = make_layout() - with Live(layout, refresh_per_second=2, screen=True): while True: - # Header - header_text = Text(f"BYBIT BTC UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", - justify="center", style="bold white on blue") - layout["header"].update(Panel(header_text)) - - # Body Panels + layout["header"].update(Panel(Text(f"BYBIT BTC UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", justify="center", style="bold white on blue"))) layout["input_svc"].update(get_input_panel()) layout["onramp_svc"].update(get_onramp_panel()) - - # Market Table layout["market"].update(get_market_table()) - - # Footer - footer_text = Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim") - layout["footer"].update(footer_text) - + layout["footer"].update(Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim")) time.sleep(REFRESH_RATE) if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/onramp/DATABASE.md b/onramp/DATABASE.md new file mode 100644 index 0000000..f92d252 --- /dev/null +++ b/onramp/DATABASE.md @@ -0,0 +1,124 @@ +# Data Source Documentation: `candles.db` + +## 1. Database Overview +The database is an **Aggregated Trade Store**. Instead of storing millions of individual trades (which are kept in the raw `.jsonl` files), this database stores **OHLCV** (Open, High, Low, Close, Volume) data across multiple timeframes. + +* **Database Engine:** SQLite 3 +* **Concurrency Mode:** WAL (Write-Ahead Logging) enabled. +* **Update Frequency:** Real-time (updated as trades arrive). + +--- + +## 2. Schema Definition + +The database contains a single primary table: `candles`. + +### Table: `candles` +| Column | Type | Description | +| :--- | :--- | :--- | +| `timeframe` | `TEXT` | The aggregation window: `1m`, `5m`, `15m`, or `1h`. | +| `timestamp` | `INTEGER` | Unix Timestamp (seconds) representing the **start** of the candle. | +| `open` | `REAL` | Price of the first trade in this window. | +| `high` | `REAL` | Highest price reached during this window. | +| `low` | `REAL` | Lowest price reached during this window. | +| `close` | `REAL` | Price of the last trade received for this window. | +| `volume` | `REAL` | Total base currency (BTC) volume traded. | +| `buy_volume` | `REAL` | Total volume from trades marked as "Buy" (Taker Buy). | + +**Primary Key:** `(timeframe, timestamp)` +*This ensures no duplicate candles exist for the same timeframe and time slot.* + +--- + +## 3. Key Data Logic + +### Buy/Sell Pressure +Unlike standard exchange OHLCV, this database includes `buy_volume`. +* **Sell Volume** = `volume - buy_volume`. +* **Net Flow** = `buy_volume - (volume - buy_volume)`. +* **Buy Ratio** = `buy_volume / volume`. + +### Candle Completion +Because the `onramp` service tails a live file, the **latest** candle for any timeframe is "unstable." It will continue to update until the next time window begins. Your analysis engine should account for this by either: +1. Filtering for `timestamp < current_window_start` (to get only closed candles). +2. Treating the latest row as "Live" data. + +--- + +## 4. Accessing the Data + +### Recommended Connection Settings (Python/Analysis Engine) +Since the `onramp` service is constantly writing to the database, you **must** use specific flags to avoid "Database is locked" errors. + +```python +import sqlite3 +import pandas as pd + +def get_connection(db_path): + # Connect with a timeout to wait for the writer to finish + conn = sqlite3.connect(db_path, timeout=10) + # Enable WAL mode for high-concurrency reading + conn.execute("PRAGMA journal_mode=WAL;") + return conn +``` + +### Common Query Patterns + +**Get the last 100 closed 1-minute candles:** +```sql +SELECT * FROM candles +WHERE timeframe = '1m' +ORDER BY timestamp DESC +LIMIT 100; +``` + +**Calculate 5-minute volatility (High-Low) over the last hour:** +```sql +SELECT timestamp, (high - low) as volatility +FROM candles +WHERE timeframe = '5m' +AND timestamp > (strftime('%s', 'now') - 3600) +ORDER BY timestamp ASC; +``` + +--- + +## 5. Integration with Analysis Engine (Pandas Example) + +If you are building an analysis engine in Python, this is the most efficient way to load data for processing: + +```python +import pandas as pd +import sqlite3 + +DB_PATH = "path/to/your/candles.db" + +def load_candles(timeframe="1m", limit=1000): + conn = sqlite3.connect(DB_PATH) + query = f""" + SELECT * FROM candles + WHERE timeframe = ? + ORDER BY timestamp DESC + LIMIT ? + """ + df = pd.read_sql_query(query, conn, params=(timeframe, limit)) + conn.close() + + # Convert timestamp to readable datetime + df['datetime'] = pd.to_datetime(df['timestamp'], unit='s') + + # Sort back to chronological order for analysis + return df.sort_values('timestamp').reset_index(drop=True) + +# Usage +df = load_candles("1m") +print(df.tail()) +``` + +--- + +## 6. Maintenance & Performance Notes + +1. **Index Optimization:** The Primary Key already creates an index on `(timeframe, timestamp)`. This makes queries filtered by timeframe and sorted by time extremely fast. +2. **Storage:** SQLite handles millions of rows easily. However, if the database exceeds several gigabytes, you may want to run `VACUUM;` occasionally (though this requires stopping the `onramp` service briefly). +3. **Backups:** You can safely copy the `candles.db` file while the system is running, provided you also copy the `candles.db-wal` and `candles.db-shm` files (or use the SQLite `.backup` command). \ No newline at end of file diff --git a/onramp/config.json b/onramp/config.json index 8d0c8cc..6f6d1a1 100644 --- a/onramp/config.json +++ b/onramp/config.json @@ -4,6 +4,6 @@ "database_path": "market_data.db", "status_host": "127.0.0.1", "status_port": 9999, - "poll_interval_ms": 200, + "poll_interval_ms": 500, "symbol": "BTCUSDT" } diff --git a/onramp/go.mod b/onramp/go.mod new file mode 100644 index 0000000..c4dfd76 --- /dev/null +++ b/onramp/go.mod @@ -0,0 +1,5 @@ +module onramp + +go 1.25.0 + +require github.com/mattn/go-sqlite3 v1.14.33 diff --git a/onramp/onramp.go b/onramp/onramp.go new file mode 100644 index 0000000..c0fb408 --- /dev/null +++ b/onramp/onramp.go @@ -0,0 +1,295 @@ +package main + +import ( + "bufio" + "database/sql" + "encoding/json" + "flag" + "fmt" + "log" + "net" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// --- Types --- + +type Config struct { + InputDirectory string `json:"input_directory"` + FilePattern string `json:"file_pattern"` + DatabasePath string `json:"database_path"` + StatusHost string `json:"status_host"` + StatusPort int `json:"status_port"` + PollInterval int `json:"poll_interval_ms"` +} + +type Trade struct { + Timestamp int64 `json:"T"` + Price string `json:"p"` + Volume string `json:"v"` + Side string `json:"S"` +} + +type TradePayload struct { + Data []Trade `json:"data"` +} + +type Candle struct { + Timestamp int64 `json:"timestamp"` + Open float64 `json:"open"` + High float64 `json:"high"` + Low float64 `json:"low"` + Close float64 `json:"close"` + Volume float64 `json:"volume"` + BuyVolume float64 `json:"buy_volume"` +} + +// --- Aggregator --- + +type Aggregator struct { + db *sql.DB + mu sync.RWMutex + cache map[string]map[int64]*Candle // timeframe -> timestamp -> candle + timeframes map[string]int64 + stats struct { + StartTime time.Time + LastFile string + TotalCount uint64 + LastTS int64 + } +} + +func NewAggregator(dbPath string) *Aggregator { + db, err := sql.Open("sqlite3", dbPath+"?_journal=WAL&_sync=1") + if err != nil { + log.Fatal(err) + } + + // This allows us to shrink the file in the background without locking it + db.Exec("PRAGMA auto_vacuum = INCREMENTAL;") + + db.Exec(`CREATE TABLE IF NOT EXISTS candles ( + timeframe TEXT, timestamp INTEGER, + open REAL, high REAL, low REAL, close REAL, + volume REAL, buy_volume REAL, + PRIMARY KEY (timeframe, timestamp) + )`) + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS candles ( + timeframe TEXT, timestamp INTEGER, + open REAL, high REAL, low REAL, close REAL, + volume REAL, buy_volume REAL, + PRIMARY KEY (timeframe, timestamp) + )`) + if err != nil { + log.Fatal(err) + } + + a := &Aggregator{ + db: db, + cache: make(map[string]map[int64]*Candle), + timeframes: map[string]int64{ + "1m": 60, "5m": 300, "15m": 900, "1h": 3600, + }, + } + a.stats.StartTime = time.Now() + for tf := range a.timeframes { + a.cache[tf] = make(map[int64]*Candle) + } + return a +} + +func (a *Aggregator) ProcessTrade(t Trade) { + tsS := t.Timestamp / 1000 + price, _ := strconv.ParseFloat(t.Price, 64) + volume, _ := strconv.ParseFloat(t.Volume, 64) + isBuy := strings.ToLower(t.Side) == "buy" + + a.mu.Lock() + defer a.mu.Unlock() + + a.stats.TotalCount++ + a.stats.LastTS = t.Timestamp + + for tf, seconds := range a.timeframes { + candleTS := (tsS / seconds) * seconds + c, exists := a.cache[tf][candleTS] + + if !exists { + c = &Candle{ + Timestamp: candleTS, + Open: price, High: price, Low: price, Close: price, + Volume: volume, + } + if isBuy { + c.BuyVolume = volume + } + a.cache[tf][candleTS] = c + } else { + if price > c.High { + c.High = price + } + if price < c.Low { + c.Low = price + } + c.Close = price + c.Volume += volume + if isBuy { + c.BuyVolume += volume + } + } + a.saveToDB(tf, c) + } +} + +func (a *Aggregator) saveToDB(tf string, c *Candle) { + _, err := a.db.Exec(`INSERT INTO candles + (timeframe, timestamp, open, high, low, close, volume, buy_volume) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(timeframe, timestamp) DO UPDATE SET + high=excluded.high, low=excluded.low, close=excluded.close, + volume=excluded.volume, buy_volume=excluded.buy_volume`, + tf, c.Timestamp, c.Open, c.High, c.Low, c.Close, c.Volume, c.BuyVolume) + if err != nil { + log.Printf("DB Error: %v", err) + } +} + +func (a *Aggregator) startJanitor() { + ticker := time.NewTicker(1 * time.Hour) + for range ticker.C { + // Calculate cutoff (30 days ago) + cutoff := time.Now().AddDate(0, 0, -30).Unix() + + log.Printf("[JANITOR] Cleaning up data older than %d", cutoff) + + // 1. Delete old rows + _, err := a.db.Exec("DELETE FROM candles WHERE timestamp < ?", cutoff) + if err != nil { + log.Printf("[JANITOR] Delete error: %v", err) + continue + } + + // 2. Incremental Vacuum + // This moves empty pages back to the OS 1000 pages at a time. + // It prevents the DB from staying huge after a big delete. + _, err = a.db.Exec("PRAGMA incremental_vacuum(1000);") + if err != nil { + log.Printf("[JANITOR] Vacuum error: %v", err) + } + } +} + +// --- Server --- + +func (a *Aggregator) serve(host string, port int) { + addr := fmt.Sprintf("%s:%d", host, port) + l, err := net.Listen("tcp", addr) + if err != nil { + log.Fatal(err) + } + log.Printf("Status server listening on %s", addr) + + for { + conn, err := l.Accept() + if err != nil { + continue + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 1024) + n, _ := c.Read(buf) + cmd := strings.TrimSpace(string(buf[:n])) + + var response interface{} + a.mu.RLock() + if cmd == "live" { + response = map[string]interface{}{ + "type": "live_candles", + "data": a.cache, + } + } else { + response = map[string]interface{}{ + "type": "status", + "uptime_start": a.stats.StartTime.Format(time.RFC3339), + "last_file": a.stats.LastFile, + "total_trades": a.stats.TotalCount, + "last_ts": a.stats.LastTS, + } + } + a.mu.RUnlock() + + json.NewEncoder(c).Encode(response) + }(conn) + } +} + +// --- File Tailer --- + +func getLatestFile(dir, pattern string) string { + files, _ := filepath.Glob(filepath.Join(dir, pattern)) + if len(files) == 0 { + return "" + } + sort.Strings(files) + return files[len(files)-1] +} + +func main() { + configPath := flag.String("config", "config.json", "path to config") + flag.Parse() + + file, _ := os.Open(*configPath) + var conf Config + json.NewDecoder(file).Decode(&conf) + file.Close() + + agg := NewAggregator(conf.DatabasePath) + go agg.startJanitor() + go agg.serve(conf.StatusHost, conf.StatusPort) + + currentFile := "" + var lastPos int64 = 0 + + for { + latest := getLatestFile(conf.InputDirectory, conf.FilePattern) + if latest == "" { + time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond) + continue + } + + if latest != currentFile { + log.Printf("Rotating to: %s", latest) + currentFile = latest + lastPos = 0 + agg.mu.Lock() + agg.stats.LastFile = latest + agg.mu.Unlock() + } + + f, err := os.Open(currentFile) + if err == nil { + f.Seek(lastPos, 0) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var payload TradePayload + if err := json.Unmarshal(scanner.Bytes(), &payload); err == nil { + for _, t := range payload.Data { + agg.ProcessTrade(t) + } + } + } + lastPos, _ = f.Seek(0, 1) + f.Close() + } + + time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond) + } +}