Initial commit

This commit is contained in:
Kalzu Rekku
2026-01-13 21:03:27 +02:00
commit abb60348f6
8 changed files with 713 additions and 0 deletions

9
input/config.json Normal file
View File

@@ -0,0 +1,9 @@
{
"output_dir": "./output",
"topic": "publicTrade.BTCUSDT",
"ws_url": "wss://stream.bybit.com/v5/public/linear",
"buffer_size": 10000,
"log_file": "system.log",
"log_to_stdout": false,
"status_interval": 30
}

5
input/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module input
go 1.25.0
require github.com/gorilla/websocket v1.5.3 // indirect

276
input/input.go Normal file
View File

@@ -0,0 +1,276 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/gorilla/websocket"
)
type Config struct {
OutputDir string `json:"output_dir"`
Topic string `json:"topic"`
WSUrl string `json:"ws_url"`
BufferSize int `json:"buffer_size"`
StatusInterval int `json:"status_interval"`
LogFile string `json:"log_file"`
LogToStdout bool `json:"log_to_stdout"`
StatusSocket string `json:"status_socket"`
}
type Streamer struct {
config *Config
msgChan chan []byte
currentFile *os.File
currentHour int
mu sync.Mutex
// Stats fields
totalMsgs atomic.Uint64
startTime time.Time
}
func main() {
// 1. Parse CLI Flags
debug := flag.Bool("debug", false, "Force logs to stdout (override config)")
statusMode := flag.Bool("status", false, "Query status of running instance and exit")
configPath := flag.String("config", "config.json", "Path to the configuration file")
flag.Parse()
// 2. Load Configuration
conf, err := loadConfig(*configPath)
if err != nil {
fmt.Printf("Critical: Failed to load config: %v\n", err)
os.Exit(1)
}
// 3. Handle Status Query Mode
if *statusMode {
queryStatus(conf.StatusSocket)
return
}
// 4. Setup Logging
setupLogger(conf, *debug)
// 5. Initialize Streamer
s := &Streamer{
config: conf,
msgChan: make(chan []byte, conf.BufferSize),
startTime: time.Now(),
}
log.Printf("Starting streamer for topic: %s", conf.Topic)
// 6. Start Goroutines
go s.writerLoop()
go s.managerLoop()
go s.statusLoop()
go s.statusServer()
// 7. Graceful Shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
<-stop
log.Println("Shutting down gracefully...")
os.Remove(s.config.StatusSocket)
}
func queryStatus(socketPath string) {
conn, err := net.Dial("unix", socketPath)
if err != nil {
fmt.Printf("Error: Could not connect to running instance at %s\nIs the service running?\n", socketPath)
os.Exit(1)
}
defer conn.Close()
buf, err := io.ReadAll(conn)
if err != nil {
fmt.Printf("Error reading status: %v\n", err)
os.Exit(1)
}
fmt.Print(string(buf))
}
func (s *Streamer) statusLoop() {
interval := time.Duration(s.config.StatusInterval) * time.Second
ticker := time.NewTicker(interval)
for range ticker.C {
count := s.totalMsgs.Load()
uptime := time.Since(s.startTime).Round(time.Second)
mpm := 0.0
if uptime.Seconds() > 0 {
mpm = (float64(count) / uptime.Seconds()) * 60
}
log.Printf("[STATUS] Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min",
uptime, count, mpm)
}
}
func (s *Streamer) statusServer() {
os.Remove(s.config.StatusSocket)
l, err := net.Listen("unix", s.config.StatusSocket)
if err != nil {
log.Fatalf("Failed to listen on status socket: %v", err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
continue
}
go func(c net.Conn) {
defer c.Close()
count := s.totalMsgs.Load()
uptime := time.Since(s.startTime).Round(time.Second)
mpm := 0.0
if uptime.Seconds() > 0 {
mpm = (float64(count) / uptime.Seconds()) * 60
}
status := fmt.Sprintf("Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min\n",
uptime, count, mpm)
c.Write([]byte(status))
}(conn)
}
}
func (s *Streamer) managerLoop() {
for {
conn, _, err := websocket.DefaultDialer.Dial(s.config.WSUrl, nil)
if err != nil {
log.Printf("Dial error: %v. Retrying in 5s...", err)
time.Sleep(5 * time.Second)
continue
}
s.runHeartbeatAndReader(conn)
}
}
func (s *Streamer) runHeartbeatAndReader(conn *websocket.Conn) {
defer conn.Close()
sub := fmt.Sprintf(`{"op": "subscribe", "args": ["%s"]}`, s.config.Topic)
conn.WriteMessage(websocket.TextMessage, []byte(sub))
done := make(chan struct{})
go func() {
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := conn.WriteMessage(websocket.TextMessage, []byte(`{"op":"ping"}`)); err != nil {
return
}
case <-done:
return
}
}
}()
for {
_, msg, err := conn.ReadMessage()
if err != nil {
close(done)
return
}
s.totalMsgs.Add(1)
select {
case s.msgChan <- msg:
default:
log.Println("Buffer Overflow: Message dropped.")
}
}
}
func (s *Streamer) writerLoop() {
for msg := range s.msgChan {
now := time.Now().UTC()
s.mu.Lock()
if s.currentFile == nil || now.Hour() != s.currentHour {
s.rotate(now)
}
if s.currentFile != nil {
s.currentFile.Write(append(msg, '\n'))
}
s.mu.Unlock()
}
}
func (s *Streamer) rotate(t time.Time) {
if s.currentFile != nil {
s.currentFile.Close()
}
if err := os.MkdirAll(s.config.OutputDir, 0755); err != nil {
log.Printf("Error creating output dir: %v", err)
return
}
s.currentHour = t.Hour()
name := fmt.Sprintf("%s_%d.jsonl", s.config.Topic, t.Truncate(time.Hour).Unix())
filePath := filepath.Join(s.config.OutputDir, name)
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("Error opening data file: %v", err)
s.currentFile = nil
return
}
s.currentFile = f
}
func setupLogger(c *Config, debugFlag bool) {
f, err := os.OpenFile(c.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
// If CLI -debug is on OR config log_to_stdout is true
if debugFlag || c.LogToStdout {
log.SetOutput(io.MultiWriter(f, os.Stdout))
} else {
log.SetOutput(f)
}
}
func loadConfig(path string) (*Config, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
var conf Config
dec := json.NewDecoder(f)
if err := dec.Decode(&conf); err != nil {
return nil, err
}
// Defaults
if conf.OutputDir == "" { conf.OutputDir = "./output" }
if conf.Topic == "" { conf.Topic = "publicTrade.BTCUSDT" }
if conf.WSUrl == "" { conf.WSUrl = "wss://stream.bybit.com/v5/public/linear" }
if conf.BufferSize == 0 { conf.BufferSize = 10000 }
if conf.StatusInterval == 0 { conf.StatusInterval = 30 }
if conf.LogFile == "" { conf.LogFile = "system.log" }
if conf.StatusSocket == "" { conf.StatusSocket = "/tmp/streamer.sock" }
return &conf, nil
}

75
listner_design.md Normal file
View File

@@ -0,0 +1,75 @@
# Technical Design: Bybit WebSocket Streamer
1. System Overview
The software acts as a dedicated bridge between the Bybit V5 WebSocket API and a local filesystem. Its primary goal is to provide a "hot" data stream for downstream consumers who read from the disk every ~80ms.
2. Core Architecture
The system follows a Producer-Consumer pattern to decouple network ingestion from disk I/O. This prevents disk latency spikes from causing packet loss on the WebSocket buffer.
Producer (WS Client): Manages the connection, sends heartbeats, and pushes raw messages into a high-speed queue.
Consumer (File Writer): Pulls messages from the queue, determines the target file, and writes data to disk.
3. Functional Components
A. Connection Manager (Self-Healing)
To ensure "solid" performance, the manager must implement:
Heartbeat Mechanism: Send periodic ping messages to Bybit (usually every 20-30 seconds) to prevent idle timeouts.
Auto-Reconnect: On network loss or socket error, the client must automatically attempt to reconnect.
Exponential Backoff: To avoid spamming the API during an outage, use a delay formula:
Delay=min(2attempts,MaxDelay)
State Tracking: Maintain the subscription state so it automatically re-subscribes to the topic upon reconnection.
B. Stream Processor & Memory Management
Streaming I/O: Messages must be handled as raw byte streams. Do not parse the JSON into deep objects unless necessary for validation, as this creates garbage collection overhead.
Bounded Buffer: The queue between the network and disk should have a fixed capacity. If the disk fails, the queue should drop old data rather than growing infinitely and crashing the system (RAM hoarding).
C. Atomic File Rotator
The rotator manages the lifecycle of the .jsonl files.
Naming Convention: {topic}_{unix_timestamp}.jsonl
Rotation Logic: On every message receive, compare the current system time against the active file's creation hour. If a new hour has begun:
Flush and Close the current file handle.
Open/Create the new file for the current hour.
Immediate Visibility: Because downstream programs read every 80ms, the writer must Flush the stream buffer immediately after writing each JSON line to ensure the data is visible to other processes without waiting for the OS buffer to fill.
4. Configuration Requirements
The software should read from a configuration file (YAML/JSON) or environment variables:
Option Description Example
OUTPUT_DIR Absolute path for data storage /data/bybit/
WS_URL Bybit WebSocket endpoint wss://stream.bybit.com/v5/public/linear
TOPIC Topic to subscribe to publicTrade.BTCUSDT
ROTATION_SECONDS Interval for file creation 3600
LOG_LEVEL Verbosity of internal logs INFO, DEBUG, ERROR
5. Logging & Diagnostics
Operational logs must be written to a separate rolling log file (e.g., system.log) to track:
Connection Events: Timestamps of successful handshakes and disconnections.
Subscription Status: Confirmation of topic subscription.
Rotation Events: Filenames of newly created files.
Errors: Socket timeouts, disk full errors, or malformed JSON received from the exchange.
6. Implementation Considerations for the Programmer
Concurrency: Use non-blocking I/O or green threads (Goroutines, Asyncio, etc.) to ensure the heartbeat doesn't get stuck behind a slow disk write.
File Permissions: Ensure created files have read permissions for the downstream programs.
Graceful Shutdown: On SIGTERM, the software must flush all buffers and close the current file properly to avoid data corruption.

101
monitor/monitor.py Executable file
View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
import socket
import json
import time
import os
from datetime import datetime
# --- CONFIGURATION ---
# Adjust these paths/ports to match your config.json files
INPUT_SOCKET = "/tmp/streamer.sock"
ONRAMP_HOST = "127.0.0.1"
ONRAMP_PORT = 9999
REFRESH_RATE = 1.0 # Seconds
def query_input_go():
"""Queries the Go Input service via Unix Socket."""
if not os.path.exists(INPUT_SOCKET):
return "OFFLINE (Socket not found)"
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
s.connect(INPUT_SOCKET)
data = s.recv(1024)
return data.decode('utf-8').strip()
except Exception as e:
return f"OFFLINE ({e})"
def query_onramp_json(command):
"""Queries the Python Onramp service via TCP."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
s.connect((ONRAMP_HOST, ONRAMP_PORT))
s.sendall(command.encode('utf-8'))
data = s.recv(8192) # Larger buffer for 'live' data
return json.loads(data.decode('utf-8'))
except:
return None
def format_candle(candle):
"""Formats a single candle dictionary into a readable line."""
return (f"O: {candle['open']:.2f} | H: {candle['high']:.2f} | "
f"L: {candle['low']:.2f} | C: {candle['close']:.2f} | V: {candle['volume']:.2f}")
def main():
try:
while True:
# 1. Collect Data
input_status = query_input_go()
onramp_status = query_onramp_json("status")
onramp_live = query_onramp_json("live")
# 2. Clear Screen
print("\033[2J\033[H", end="")
print("="*70)
print(f" UNIFIED MONITOR - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)
# 3. Display Input (Go) Section
print(f"\n[1] INPUT SERVICE (Go)")
print(f" Status: {input_status}")
# 4. Display Onramp (Python) Section
print(f"\n[2] ONRAMP SERVICE (Python)")
if onramp_status:
stats = onramp_status
# Calculate lag
last_ts = stats.get('last_ts', 0) / 1000
lag = time.time() - last_ts if last_ts > 0 else 0
print(f" Uptime Start : {stats.get('uptime_start')}")
print(f" Processed : {stats.get('total_trades')} trades")
print(f" Current File : {os.path.basename(str(stats.get('last_file')))}")
print(f" Data Lag : {lag:.2f}s")
else:
print(" Status: OFFLINE")
# 5. Display Live Market Snapshot
if onramp_live and "data" in onramp_live:
print(f"\n[3] LIVE MARKET SNAPSHOT")
candles = onramp_live["data"]
# Show 1m and 1h as examples
for tf in ["1m", "1h"]:
if tf in candles:
# Get the most recent timestamp in that timeframe
latest_ts = max(candles[tf].keys())
c = candles[tf][latest_ts]
ts_str = datetime.fromtimestamp(int(latest_ts)).strftime('%H:%M')
print(f" {tf} ({ts_str}) >> {format_candle(c)}")
print("\n" + "="*70)
print(" (Ctrl+C to exit)")
time.sleep(REFRESH_RATE)
except KeyboardInterrupt:
print("\nClosing monitor...")
if __name__ == "__main__":
main()

39
onramp/README.md Normal file
View File

@@ -0,0 +1,39 @@
# Key Features of this Implementation:
File Rotation Handling: The FileTailer continuously checks the directory for the newest file matching the pattern. If a new hourly file appears, it automatically switches to it.
## OHLCV Logic:
Open: Set when a candle is first created for that timestamp bucket.
High/Low: Updated via max() and min().
Close: Updated with every new trade.
Buy Volume: Calculated by filtering trades where S == "Buy".
SQLite Upsert: Uses ON CONFLICT(timeframe, timestamp) DO UPDATE. This is crucial because trades for the same minute arrive sequentially; we need to update the existing row rather than creating duplicates.
## Robustness:
Wrapped in try-except blocks to prevent the service from crashing on a single malformed line.
Comprehensive logging to both file and console.
Configurable polling interval (default 200ms).
# To get the live, unfinished candle data:
code Bash
'''
echo "live" | nc 127.0.0.1 9999
'''
To get the general health/status:
code Bash
'''
echo "status" | nc 127.0.0.1 9999
'''

9
onramp/config.json Normal file
View File

@@ -0,0 +1,9 @@
{
"input_directory": "../input/output",
"file_pattern": "publicTrade.BTCUSDT_*.jsonl",
"database_path": "market_data.db",
"status_host": "127.0.0.1",
"status_port": 9999,
"poll_interval_ms": 200,
"symbol": "BTCUSDT"
}

199
onramp/onramp.py Normal file
View File

@@ -0,0 +1,199 @@
import os
import json
import time
import glob
import sqlite3
import logging
import socket
import threading
from datetime import datetime
from collections import defaultdict
# Setup Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.FileHandler("processor.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class CandleAggregator:
def __init__(self, db_path):
self.db_path = db_path
self.lock = threading.Lock() # Ensure thread safety
self.timeframes = {
"1m": 60,
"5m": 300,
"15m": 900,
"1h": 3600
}
self.init_db()
# Cache structure: {timeframe: {timestamp: {data}}}
self.cache = defaultdict(dict)
def init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS candles (
timeframe TEXT,
timestamp INTEGER,
open REAL, high REAL, low REAL, close REAL,
volume REAL, buy_volume REAL,
PRIMARY KEY (timeframe, timestamp)
)
""")
conn.commit()
def process_trade(self, ts_ms, price, volume, side):
ts_s = ts_ms // 1000
is_buy = 1 if side.lower() == "buy" else 0
with self.lock:
for tf_name, seconds in self.timeframes.items():
candle_ts = (ts_s // seconds) * seconds
current = self.cache[tf_name].get(candle_ts)
if not current:
current = {
"timestamp": candle_ts,
"open": price, "high": price, "low": price, "close": price,
"volume": volume, "buy_volume": volume if is_buy else 0.0
}
else:
current["high"] = max(current["high"], price)
current["low"] = min(current["low"], price)
current["close"] = price
current["volume"] += volume
if is_buy:
current["buy_volume"] += volume
self.cache[tf_name][candle_ts] = current
self.save_to_db(tf_name, candle_ts, current)
def save_to_db(self, timeframe, ts, data):
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO candles (timeframe, timestamp, open, high, low, close, volume, buy_volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(timeframe, timestamp) DO UPDATE SET
high = excluded.high, low = excluded.low, close = excluded.close,
volume = excluded.volume, buy_volume = excluded.buy_volume
""", (timeframe, ts, data['open'], data['high'], data['low'],
data['close'], data['volume'], data['buy_volume']))
except Exception as e:
logger.error(f"Database error: {e}")
def get_live_snapshot(self):
"""Returns the current state of all active candles in the cache."""
with self.lock:
# We return a copy to avoid dictionary size mutation errors during JSON serialization
return json.loads(json.dumps(self.cache))
class StatusServer:
def __init__(self, host, port, stats_ref, aggregator):
self.host = host
self.port = port
self.stats = stats_ref
self.aggregator = aggregator
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
def start(self):
self.sock.bind((self.host, self.port))
self.sock.listen(5)
threading.Thread(target=self._serve, daemon=True).start()
logger.info(f"Network server started on {self.host}:{self.port}")
def _serve(self):
while True:
try:
client, addr = self.sock.accept()
# Set a timeout so a slow client doesn't hang the server
client.settimeout(2.0)
# Receive command (e.g., "status" or "live")
data = client.recv(1024).decode('utf-8').strip()
response = {}
if data == "live":
response = {
"type": "live_candles",
"data": self.aggregator.get_live_snapshot()
}
else:
# Default to status info
response = {
"type": "status",
"uptime_start": self.stats['start_time'],
"last_file": self.stats['last_file'],
"total_trades": self.stats['lines_count'],
"last_ts": self.stats['last_ts']
}
client.send(json.dumps(response).encode('utf-8'))
client.close()
except Exception as e:
logger.error(f"Server error: {e}")
class FileTailer:
def __init__(self, config):
self.config = config
self.aggregator = CandleAggregator(config['database_path'])
self.stats = {
"start_time": datetime.now().isoformat(),
"last_file": None, "lines_count": 0, "last_ts": None
}
self.status_server = StatusServer(
config['status_host'],
config['status_port'],
self.stats,
self.aggregator
)
def get_latest_file(self):
path_pattern = os.path.join(self.config['input_directory'], self.config['file_pattern'])
files = sorted(glob.glob(path_pattern))
return files[-1] if files else None
def run(self):
self.status_server.start()
current_file_path = self.get_latest_file()
last_position = 0
while True:
newest_file = self.get_latest_file()
if newest_file and newest_file != current_file_path:
logger.info(f"Rotating to: {newest_file}")
current_file_path = newest_file
last_position = 0
if current_file_path and os.path.exists(current_file_path):
self.stats['last_file'] = current_file_path
with open(current_file_path, 'r') as f:
f.seek(last_position)
while True:
line = f.readline()
if not line: break
self.process_line(line)
last_position = f.tell()
time.sleep(self.config['poll_interval_ms'] / 1000.0)
def process_line(self, line):
try:
payload = json.loads(line)
if "data" not in payload: return
for trade in payload["data"]:
self.aggregator.process_trade(
trade["T"], float(trade["p"]), float(trade["v"]), trade["S"]
)
self.stats['last_ts'] = trade["T"]
self.stats['lines_count'] += 1
except Exception as e:
logger.error(f"Line processing error: {e}")
if __name__ == "__main__":
with open("config.json", "r") as cf:
conf = json.load(cf)
FileTailer(conf).run()