Made the onramp in Go with way better architecture. Created onramp/DATABASE.md to help with next development cycle.

This commit is contained in:
Kalzu Rekku
2026-01-13 22:51:43 +02:00
parent ac4f2cfcc8
commit 5d2aa8f499
6 changed files with 474 additions and 54 deletions

12
monitor/Pipfile Normal file
View File

@@ -0,0 +1,12 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
rich = "*"
[dev-packages]
[requires]
python_version = "3.13"

View File

@@ -5,6 +5,7 @@ import json
import time import time
import os import os
from datetime import datetime from datetime import datetime
from collections import deque # Added for history tracking
from rich.live import Live from rich.live import Live
from rich.table import Table from rich.table import Table
@@ -12,13 +13,15 @@ from rich.layout import Layout
from rich.panel import Panel from rich.panel import Panel
from rich.console import Console from rich.console import Console
from rich.text import Text from rich.text import Text
from rich.columns import Columns
# --- CONFIGURATION --- # --- CONFIGURATION ---
INPUT_SOCKET = "/tmp/streamer.sock" INPUT_SOCKET = "/tmp/streamer.sock"
ONRAMP_HOST = "127.0.0.1" ONRAMP_HOST = "127.0.0.1"
ONRAMP_PORT = 9999 ONRAMP_PORT = 9999
REFRESH_RATE = 1.0 REFRESH_RATE = 1.0
# Global state to track lag history (last 300 seconds)
LAG_HISTORY = deque(maxlen=300)
console = Console() console = Console()
@@ -36,23 +39,20 @@ def query_input_go():
def query_onramp(command): def query_onramp(command):
try: try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# Increase timeout slightly for the larger 'live' payload
s.settimeout(1.0) s.settimeout(1.0)
s.connect((ONRAMP_HOST, ONRAMP_PORT)) s.connect((ONRAMP_HOST, ONRAMP_PORT))
s.sendall(command.encode('utf-8')) s.sendall(command.encode('utf-8'))
chunks = [] chunks = []
while True: while True:
chunk = s.recv(4096) # Read in 4KB chunks chunk = s.recv(4096)
if not chunk: if not chunk:
break # Server closed connection, we have everything break
chunks.append(chunk) chunks.append(chunk)
full_data = b"".join(chunks).decode('utf-8') full_data = b"".join(chunks).decode('utf-8')
return json.loads(full_data) return json.loads(full_data)
except Exception as e: except:
# For debugging, you can uncomment the line below:
# print(f"Socket Error: {e}")
return None return None
def make_layout(): def make_layout():
@@ -73,8 +73,6 @@ def get_input_panel():
raw = query_input_go() raw = query_input_go()
if not raw: if not raw:
return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red") return Panel(Text("OFFLINE", style="bold red"), title="[1] Input Service (Go)", border_style="red")
# The Go app returns: "Uptime: 1m2s | Total Msgs: 500 | Rate: 10.00 msg/min"
parts = raw.split("|") parts = raw.split("|")
content = "\n".join([p.strip() for p in parts]) content = "\n".join([p.strip() for p in parts])
return Panel(content, title="[1] Input Service (Go)", border_style="green") return Panel(content, title="[1] Input Service (Go)", border_style="green")
@@ -82,25 +80,39 @@ def get_input_panel():
def get_onramp_panel(): def get_onramp_panel():
data = query_onramp("status") data = query_onramp("status")
if not data: if not data:
LAG_HISTORY.clear() # Clear history if service goes down
return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Python)", border_style="red") return Panel(Text("OFFLINE", style="bold red"), title="[2] Onramp Service (Python)", border_style="red")
# 1. Calculate Instant Lag
last_ts = data.get('last_ts', 0) / 1000 last_ts = data.get('last_ts', 0) / 1000
lag = time.time() - last_ts if last_ts > 0 else 0 current_lag = time.time() - last_ts if last_ts > 0 else 0
lag_style = "green" if lag < 2 else "yellow" if lag < 5 else "bold red"
# 2. Update History
LAG_HISTORY.append(current_lag)
# 3. Calculate Averages (Load Average style)
avg_1m = sum(list(LAG_HISTORY)[-60:]) / min(len(LAG_HISTORY), 60)
avg_5m = sum(LAG_HISTORY) / len(LAG_HISTORY)
# 4. Determine Styling
lag_style = "green" if current_lag < 2 else "yellow" if current_lag < 5 else "bold red"
content = Text() content = Text()
content.append(f"Uptime Start : {data.get('uptime_start')}\n") content.append(f"Uptime Start : {data.get('uptime_start')}\n")
content.append(f"Total Trades : {data.get('total_trades')}\n") content.append(f"Total Trades : {data.get('total_trades')}\n")
content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n") content.append(f"Current File : {os.path.basename(str(data.get('last_file')))}\n")
content.append("Lag : ", style="white")
content.append(f"{lag:.2f}s", style=lag_style) # The "Load Average" line
content.append("Lag (Avg) : ", style="white")
content.append(f"{current_lag:.2f}s", style=lag_style)
content.append(f", {avg_1m:.2f}s/1m", style="dim" if avg_1m < 2 else "yellow")
content.append(f", {avg_5m:.2f}s/5m", style="dim" if avg_5m < 2 else "yellow")
return Panel(content, title="[2] Onramp Service (Python)", border_style="blue") return Panel(content, title="[2] Onramp Service (Python)", border_style="blue")
def get_market_table(): def get_market_table():
res = query_onramp("live") res = query_onramp("live")
table = Table(expand=True, border_style="cyan", header_style="bold cyan") table = Table(expand=True, border_style="cyan", header_style="bold cyan")
table.add_column("TF", justify="center", style="bold yellow") table.add_column("TF", justify="center", style="bold yellow")
table.add_column("Last Update", justify="center") table.add_column("Last Update", justify="center")
table.add_column("Open", justify="right") table.add_column("Open", justify="right")
@@ -112,64 +124,36 @@ def get_market_table():
if res and "data" in res: if res and "data" in res:
candles_data = res["data"] candles_data = res["data"]
# We want to show these specific rows
for tf in ["1m", "5m", "15m", "1h"]: for tf in ["1m", "5m", "15m", "1h"]:
if tf in candles_data and candles_data[tf]: if tf in candles_data and candles_data[tf]:
# Get all timestamps, convert to int to find the latest one all_ts = [int(ts) for ts in candles_data[tf].keys()]
all_timestamps = [int(ts) for ts in candles_data[tf].keys()] latest_ts = str(max(all_ts))
latest_ts = str(max(all_timestamps))
c = candles_data[tf][latest_ts] c = candles_data[tf][latest_ts]
# Format time
ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S') ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M:%S')
# Price Color (Bullish vs Bearish)
color = "green" if c['close'] >= c['open'] else "red" color = "green" if c['close'] >= c['open'] else "red"
# Calculate Buy Volume Percentage
buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0 buy_pct = (c['buy_volume'] / c['volume'] * 100) if c['volume'] > 0 else 0
buy_color = "green" if buy_pct > 50 else "red" buy_color = "green" if buy_pct > 50 else "red"
table.add_row( table.add_row(
tf, tf, ts_str, f"{c['open']:.2f}", f"{c['high']:.2f}", f"{c['low']:.2f}",
ts_str,
f"{c['open']:.2f}",
f"{c['high']:.2f}",
f"{c['low']:.2f}",
Text(f"{c['close']:.2f}", style=f"bold {color}"), Text(f"{c['close']:.2f}", style=f"bold {color}"),
f"{c['volume']:.2f}", f"{c['volume']:.2f}", Text(f"{buy_pct:.1f}%", style=buy_color)
Text(f"{buy_pct:.1f}%", style=buy_color)
) )
time.sleep(1)
else: else:
# Placeholder if service is offline or data not ready
table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-") table.add_row("waiting...", "-", "-", "-", "-", "-", "-", "-")
return table return table
def main(): def main():
layout = make_layout() layout = make_layout()
with Live(layout, refresh_per_second=2, screen=True): with Live(layout, refresh_per_second=2, screen=True):
while True: while True:
# Header layout["header"].update(Panel(Text(f"BYBIT BTC UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}", justify="center", style="bold white on blue")))
header_text = Text(f"BYBIT BTC UNIFIED MONITOR | {datetime.now().strftime('%H:%M:%S')}",
justify="center", style="bold white on blue")
layout["header"].update(Panel(header_text))
# Body Panels
layout["input_svc"].update(get_input_panel()) layout["input_svc"].update(get_input_panel())
layout["onramp_svc"].update(get_onramp_panel()) layout["onramp_svc"].update(get_onramp_panel())
# Market Table
layout["market"].update(get_market_table()) layout["market"].update(get_market_table())
layout["footer"].update(Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim"))
# Footer
footer_text = Text("Press Ctrl+C to exit | Monitoring: publicTrade.BTCUSDT", justify="center", style="dim")
layout["footer"].update(footer_text)
time.sleep(REFRESH_RATE) time.sleep(REFRESH_RATE)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

124
onramp/DATABASE.md Normal file
View File

@@ -0,0 +1,124 @@
# Data Source Documentation: `candles.db`
## 1. Database Overview
The database is an **Aggregated Trade Store**. Instead of storing millions of individual trades (which are kept in the raw `.jsonl` files), this database stores **OHLCV** (Open, High, Low, Close, Volume) data across multiple timeframes.
* **Database Engine:** SQLite 3
* **Concurrency Mode:** WAL (Write-Ahead Logging) enabled.
* **Update Frequency:** Real-time (updated as trades arrive).
---
## 2. Schema Definition
The database contains a single primary table: `candles`.
### Table: `candles`
| Column | Type | Description |
| :--- | :--- | :--- |
| `timeframe` | `TEXT` | The aggregation window: `1m`, `5m`, `15m`, or `1h`. |
| `timestamp` | `INTEGER` | Unix Timestamp (seconds) representing the **start** of the candle. |
| `open` | `REAL` | Price of the first trade in this window. |
| `high` | `REAL` | Highest price reached during this window. |
| `low` | `REAL` | Lowest price reached during this window. |
| `close` | `REAL` | Price of the last trade received for this window. |
| `volume` | `REAL` | Total base currency (BTC) volume traded. |
| `buy_volume` | `REAL` | Total volume from trades marked as "Buy" (Taker Buy). |
**Primary Key:** `(timeframe, timestamp)`
*This ensures no duplicate candles exist for the same timeframe and time slot.*
---
## 3. Key Data Logic
### Buy/Sell Pressure
Unlike standard exchange OHLCV, this database includes `buy_volume`.
* **Sell Volume** = `volume - buy_volume`.
* **Net Flow** = `buy_volume - (volume - buy_volume)`.
* **Buy Ratio** = `buy_volume / volume`.
### Candle Completion
Because the `onramp` service tails a live file, the **latest** candle for any timeframe is "unstable." It will continue to update until the next time window begins. Your analysis engine should account for this by either:
1. Filtering for `timestamp < current_window_start` (to get only closed candles).
2. Treating the latest row as "Live" data.
---
## 4. Accessing the Data
### Recommended Connection Settings (Python/Analysis Engine)
Since the `onramp` service is constantly writing to the database, you **must** use specific flags to avoid "Database is locked" errors.
```python
import sqlite3
import pandas as pd
def get_connection(db_path):
# Connect with a timeout to wait for the writer to finish
conn = sqlite3.connect(db_path, timeout=10)
# Enable WAL mode for high-concurrency reading
conn.execute("PRAGMA journal_mode=WAL;")
return conn
```
### Common Query Patterns
**Get the last 100 closed 1-minute candles:**
```sql
SELECT * FROM candles
WHERE timeframe = '1m'
ORDER BY timestamp DESC
LIMIT 100;
```
**Calculate 5-minute volatility (High-Low) over the last hour:**
```sql
SELECT timestamp, (high - low) as volatility
FROM candles
WHERE timeframe = '5m'
AND timestamp > (strftime('%s', 'now') - 3600)
ORDER BY timestamp ASC;
```
---
## 5. Integration with Analysis Engine (Pandas Example)
If you are building an analysis engine in Python, this is the most efficient way to load data for processing:
```python
import pandas as pd
import sqlite3
DB_PATH = "path/to/your/candles.db"
def load_candles(timeframe="1m", limit=1000):
conn = sqlite3.connect(DB_PATH)
query = f"""
SELECT * FROM candles
WHERE timeframe = ?
ORDER BY timestamp DESC
LIMIT ?
"""
df = pd.read_sql_query(query, conn, params=(timeframe, limit))
conn.close()
# Convert timestamp to readable datetime
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
# Sort back to chronological order for analysis
return df.sort_values('timestamp').reset_index(drop=True)
# Usage
df = load_candles("1m")
print(df.tail())
```
---
## 6. Maintenance & Performance Notes
1. **Index Optimization:** The Primary Key already creates an index on `(timeframe, timestamp)`. This makes queries filtered by timeframe and sorted by time extremely fast.
2. **Storage:** SQLite handles millions of rows easily. However, if the database exceeds several gigabytes, you may want to run `VACUUM;` occasionally (though this requires stopping the `onramp` service briefly).
3. **Backups:** You can safely copy the `candles.db` file while the system is running, provided you also copy the `candles.db-wal` and `candles.db-shm` files (or use the SQLite `.backup` command).

View File

@@ -4,6 +4,6 @@
"database_path": "market_data.db", "database_path": "market_data.db",
"status_host": "127.0.0.1", "status_host": "127.0.0.1",
"status_port": 9999, "status_port": 9999,
"poll_interval_ms": 200, "poll_interval_ms": 500,
"symbol": "BTCUSDT" "symbol": "BTCUSDT"
} }

5
onramp/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module onramp
go 1.25.0
require github.com/mattn/go-sqlite3 v1.14.33

295
onramp/onramp.go Normal file
View File

@@ -0,0 +1,295 @@
package main
import (
"bufio"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
// --- Types ---
type Config struct {
InputDirectory string `json:"input_directory"`
FilePattern string `json:"file_pattern"`
DatabasePath string `json:"database_path"`
StatusHost string `json:"status_host"`
StatusPort int `json:"status_port"`
PollInterval int `json:"poll_interval_ms"`
}
type Trade struct {
Timestamp int64 `json:"T"`
Price string `json:"p"`
Volume string `json:"v"`
Side string `json:"S"`
}
type TradePayload struct {
Data []Trade `json:"data"`
}
type Candle struct {
Timestamp int64 `json:"timestamp"`
Open float64 `json:"open"`
High float64 `json:"high"`
Low float64 `json:"low"`
Close float64 `json:"close"`
Volume float64 `json:"volume"`
BuyVolume float64 `json:"buy_volume"`
}
// --- Aggregator ---
type Aggregator struct {
db *sql.DB
mu sync.RWMutex
cache map[string]map[int64]*Candle // timeframe -> timestamp -> candle
timeframes map[string]int64
stats struct {
StartTime time.Time
LastFile string
TotalCount uint64
LastTS int64
}
}
func NewAggregator(dbPath string) *Aggregator {
db, err := sql.Open("sqlite3", dbPath+"?_journal=WAL&_sync=1")
if err != nil {
log.Fatal(err)
}
// This allows us to shrink the file in the background without locking it
db.Exec("PRAGMA auto_vacuum = INCREMENTAL;")
db.Exec(`CREATE TABLE IF NOT EXISTS candles (
timeframe TEXT, timestamp INTEGER,
open REAL, high REAL, low REAL, close REAL,
volume REAL, buy_volume REAL,
PRIMARY KEY (timeframe, timestamp)
)`)
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS candles (
timeframe TEXT, timestamp INTEGER,
open REAL, high REAL, low REAL, close REAL,
volume REAL, buy_volume REAL,
PRIMARY KEY (timeframe, timestamp)
)`)
if err != nil {
log.Fatal(err)
}
a := &Aggregator{
db: db,
cache: make(map[string]map[int64]*Candle),
timeframes: map[string]int64{
"1m": 60, "5m": 300, "15m": 900, "1h": 3600,
},
}
a.stats.StartTime = time.Now()
for tf := range a.timeframes {
a.cache[tf] = make(map[int64]*Candle)
}
return a
}
func (a *Aggregator) ProcessTrade(t Trade) {
tsS := t.Timestamp / 1000
price, _ := strconv.ParseFloat(t.Price, 64)
volume, _ := strconv.ParseFloat(t.Volume, 64)
isBuy := strings.ToLower(t.Side) == "buy"
a.mu.Lock()
defer a.mu.Unlock()
a.stats.TotalCount++
a.stats.LastTS = t.Timestamp
for tf, seconds := range a.timeframes {
candleTS := (tsS / seconds) * seconds
c, exists := a.cache[tf][candleTS]
if !exists {
c = &Candle{
Timestamp: candleTS,
Open: price, High: price, Low: price, Close: price,
Volume: volume,
}
if isBuy {
c.BuyVolume = volume
}
a.cache[tf][candleTS] = c
} else {
if price > c.High {
c.High = price
}
if price < c.Low {
c.Low = price
}
c.Close = price
c.Volume += volume
if isBuy {
c.BuyVolume += volume
}
}
a.saveToDB(tf, c)
}
}
func (a *Aggregator) saveToDB(tf string, c *Candle) {
_, err := a.db.Exec(`INSERT INTO candles
(timeframe, timestamp, open, high, low, close, volume, buy_volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(timeframe, timestamp) DO UPDATE SET
high=excluded.high, low=excluded.low, close=excluded.close,
volume=excluded.volume, buy_volume=excluded.buy_volume`,
tf, c.Timestamp, c.Open, c.High, c.Low, c.Close, c.Volume, c.BuyVolume)
if err != nil {
log.Printf("DB Error: %v", err)
}
}
func (a *Aggregator) startJanitor() {
ticker := time.NewTicker(1 * time.Hour)
for range ticker.C {
// Calculate cutoff (30 days ago)
cutoff := time.Now().AddDate(0, 0, -30).Unix()
log.Printf("[JANITOR] Cleaning up data older than %d", cutoff)
// 1. Delete old rows
_, err := a.db.Exec("DELETE FROM candles WHERE timestamp < ?", cutoff)
if err != nil {
log.Printf("[JANITOR] Delete error: %v", err)
continue
}
// 2. Incremental Vacuum
// This moves empty pages back to the OS 1000 pages at a time.
// It prevents the DB from staying huge after a big delete.
_, err = a.db.Exec("PRAGMA incremental_vacuum(1000);")
if err != nil {
log.Printf("[JANITOR] Vacuum error: %v", err)
}
}
}
// --- Server ---
func (a *Aggregator) serve(host string, port int) {
addr := fmt.Sprintf("%s:%d", host, port)
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}
log.Printf("Status server listening on %s", addr)
for {
conn, err := l.Accept()
if err != nil {
continue
}
go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 1024)
n, _ := c.Read(buf)
cmd := strings.TrimSpace(string(buf[:n]))
var response interface{}
a.mu.RLock()
if cmd == "live" {
response = map[string]interface{}{
"type": "live_candles",
"data": a.cache,
}
} else {
response = map[string]interface{}{
"type": "status",
"uptime_start": a.stats.StartTime.Format(time.RFC3339),
"last_file": a.stats.LastFile,
"total_trades": a.stats.TotalCount,
"last_ts": a.stats.LastTS,
}
}
a.mu.RUnlock()
json.NewEncoder(c).Encode(response)
}(conn)
}
}
// --- File Tailer ---
func getLatestFile(dir, pattern string) string {
files, _ := filepath.Glob(filepath.Join(dir, pattern))
if len(files) == 0 {
return ""
}
sort.Strings(files)
return files[len(files)-1]
}
func main() {
configPath := flag.String("config", "config.json", "path to config")
flag.Parse()
file, _ := os.Open(*configPath)
var conf Config
json.NewDecoder(file).Decode(&conf)
file.Close()
agg := NewAggregator(conf.DatabasePath)
go agg.startJanitor()
go agg.serve(conf.StatusHost, conf.StatusPort)
currentFile := ""
var lastPos int64 = 0
for {
latest := getLatestFile(conf.InputDirectory, conf.FilePattern)
if latest == "" {
time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond)
continue
}
if latest != currentFile {
log.Printf("Rotating to: %s", latest)
currentFile = latest
lastPos = 0
agg.mu.Lock()
agg.stats.LastFile = latest
agg.mu.Unlock()
}
f, err := os.Open(currentFile)
if err == nil {
f.Seek(lastPos, 0)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
var payload TradePayload
if err := json.Unmarshal(scanner.Bytes(), &payload); err == nil {
for _, t := range payload.Data {
agg.ProcessTrade(t)
}
}
}
lastPos, _ = f.Seek(0, 1)
f.Close()
}
time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond)
}
}