296 lines
6.5 KiB
Go
296 lines
6.5 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// --- Types ---
|
|
|
|
type Config struct {
|
|
InputDirectory string `json:"input_directory"`
|
|
FilePattern string `json:"file_pattern"`
|
|
DatabasePath string `json:"database_path"`
|
|
StatusHost string `json:"status_host"`
|
|
StatusPort int `json:"status_port"`
|
|
PollInterval int `json:"poll_interval_ms"`
|
|
}
|
|
|
|
type Trade struct {
|
|
Timestamp int64 `json:"T"`
|
|
Price string `json:"p"`
|
|
Volume string `json:"v"`
|
|
Side string `json:"S"`
|
|
}
|
|
|
|
type TradePayload struct {
|
|
Data []Trade `json:"data"`
|
|
}
|
|
|
|
type Candle struct {
|
|
Timestamp int64 `json:"timestamp"`
|
|
Open float64 `json:"open"`
|
|
High float64 `json:"high"`
|
|
Low float64 `json:"low"`
|
|
Close float64 `json:"close"`
|
|
Volume float64 `json:"volume"`
|
|
BuyVolume float64 `json:"buy_volume"`
|
|
}
|
|
|
|
// --- Aggregator ---
|
|
|
|
type Aggregator struct {
|
|
db *sql.DB
|
|
mu sync.RWMutex
|
|
cache map[string]map[int64]*Candle // timeframe -> timestamp -> candle
|
|
timeframes map[string]int64
|
|
stats struct {
|
|
StartTime time.Time
|
|
LastFile string
|
|
TotalCount uint64
|
|
LastTS int64
|
|
}
|
|
}
|
|
|
|
func NewAggregator(dbPath string) *Aggregator {
|
|
db, err := sql.Open("sqlite3", dbPath+"?_journal=WAL&_sync=1")
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
// This allows us to shrink the file in the background without locking it
|
|
db.Exec("PRAGMA auto_vacuum = INCREMENTAL;")
|
|
|
|
db.Exec(`CREATE TABLE IF NOT EXISTS candles (
|
|
timeframe TEXT, timestamp INTEGER,
|
|
open REAL, high REAL, low REAL, close REAL,
|
|
volume REAL, buy_volume REAL,
|
|
PRIMARY KEY (timeframe, timestamp)
|
|
)`)
|
|
|
|
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS candles (
|
|
timeframe TEXT, timestamp INTEGER,
|
|
open REAL, high REAL, low REAL, close REAL,
|
|
volume REAL, buy_volume REAL,
|
|
PRIMARY KEY (timeframe, timestamp)
|
|
)`)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
a := &Aggregator{
|
|
db: db,
|
|
cache: make(map[string]map[int64]*Candle),
|
|
timeframes: map[string]int64{
|
|
"1m": 60, "5m": 300, "15m": 900, "1h": 3600,
|
|
},
|
|
}
|
|
a.stats.StartTime = time.Now()
|
|
for tf := range a.timeframes {
|
|
a.cache[tf] = make(map[int64]*Candle)
|
|
}
|
|
return a
|
|
}
|
|
|
|
func (a *Aggregator) ProcessTrade(t Trade) {
|
|
tsS := t.Timestamp / 1000
|
|
price, _ := strconv.ParseFloat(t.Price, 64)
|
|
volume, _ := strconv.ParseFloat(t.Volume, 64)
|
|
isBuy := strings.ToLower(t.Side) == "buy"
|
|
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
|
|
a.stats.TotalCount++
|
|
a.stats.LastTS = t.Timestamp
|
|
|
|
for tf, seconds := range a.timeframes {
|
|
candleTS := (tsS / seconds) * seconds
|
|
c, exists := a.cache[tf][candleTS]
|
|
|
|
if !exists {
|
|
c = &Candle{
|
|
Timestamp: candleTS,
|
|
Open: price, High: price, Low: price, Close: price,
|
|
Volume: volume,
|
|
}
|
|
if isBuy {
|
|
c.BuyVolume = volume
|
|
}
|
|
a.cache[tf][candleTS] = c
|
|
} else {
|
|
if price > c.High {
|
|
c.High = price
|
|
}
|
|
if price < c.Low {
|
|
c.Low = price
|
|
}
|
|
c.Close = price
|
|
c.Volume += volume
|
|
if isBuy {
|
|
c.BuyVolume += volume
|
|
}
|
|
}
|
|
a.saveToDB(tf, c)
|
|
}
|
|
}
|
|
|
|
func (a *Aggregator) saveToDB(tf string, c *Candle) {
|
|
_, err := a.db.Exec(`INSERT INTO candles
|
|
(timeframe, timestamp, open, high, low, close, volume, buy_volume)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(timeframe, timestamp) DO UPDATE SET
|
|
high=excluded.high, low=excluded.low, close=excluded.close,
|
|
volume=excluded.volume, buy_volume=excluded.buy_volume`,
|
|
tf, c.Timestamp, c.Open, c.High, c.Low, c.Close, c.Volume, c.BuyVolume)
|
|
if err != nil {
|
|
log.Printf("DB Error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (a *Aggregator) startJanitor() {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
for range ticker.C {
|
|
// Calculate cutoff (30 days ago)
|
|
cutoff := time.Now().AddDate(0, 0, -30).Unix()
|
|
|
|
log.Printf("[JANITOR] Cleaning up data older than %d", cutoff)
|
|
|
|
// 1. Delete old rows
|
|
_, err := a.db.Exec("DELETE FROM candles WHERE timestamp < ?", cutoff)
|
|
if err != nil {
|
|
log.Printf("[JANITOR] Delete error: %v", err)
|
|
continue
|
|
}
|
|
|
|
// 2. Incremental Vacuum
|
|
// This moves empty pages back to the OS 1000 pages at a time.
|
|
// It prevents the DB from staying huge after a big delete.
|
|
_, err = a.db.Exec("PRAGMA incremental_vacuum(1000);")
|
|
if err != nil {
|
|
log.Printf("[JANITOR] Vacuum error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Server ---
|
|
|
|
func (a *Aggregator) serve(host string, port int) {
|
|
addr := fmt.Sprintf("%s:%d", host, port)
|
|
l, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
log.Printf("Status server listening on %s", addr)
|
|
|
|
for {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
go func(c net.Conn) {
|
|
defer c.Close()
|
|
buf := make([]byte, 1024)
|
|
n, _ := c.Read(buf)
|
|
cmd := strings.TrimSpace(string(buf[:n]))
|
|
|
|
var response interface{}
|
|
a.mu.RLock()
|
|
if cmd == "live" {
|
|
response = map[string]interface{}{
|
|
"type": "live_candles",
|
|
"data": a.cache,
|
|
}
|
|
} else {
|
|
response = map[string]interface{}{
|
|
"type": "status",
|
|
"uptime_start": a.stats.StartTime.Format(time.RFC3339),
|
|
"last_file": a.stats.LastFile,
|
|
"total_trades": a.stats.TotalCount,
|
|
"last_ts": a.stats.LastTS,
|
|
}
|
|
}
|
|
a.mu.RUnlock()
|
|
|
|
json.NewEncoder(c).Encode(response)
|
|
}(conn)
|
|
}
|
|
}
|
|
|
|
// --- File Tailer ---
|
|
|
|
func getLatestFile(dir, pattern string) string {
|
|
files, _ := filepath.Glob(filepath.Join(dir, pattern))
|
|
if len(files) == 0 {
|
|
return ""
|
|
}
|
|
sort.Strings(files)
|
|
return files[len(files)-1]
|
|
}
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "config.json", "path to config")
|
|
flag.Parse()
|
|
|
|
file, _ := os.Open(*configPath)
|
|
var conf Config
|
|
json.NewDecoder(file).Decode(&conf)
|
|
file.Close()
|
|
|
|
agg := NewAggregator(conf.DatabasePath)
|
|
go agg.startJanitor()
|
|
go agg.serve(conf.StatusHost, conf.StatusPort)
|
|
|
|
currentFile := ""
|
|
var lastPos int64 = 0
|
|
|
|
for {
|
|
latest := getLatestFile(conf.InputDirectory, conf.FilePattern)
|
|
if latest == "" {
|
|
time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
if latest != currentFile {
|
|
log.Printf("Rotating to: %s", latest)
|
|
currentFile = latest
|
|
lastPos = 0
|
|
agg.mu.Lock()
|
|
agg.stats.LastFile = latest
|
|
agg.mu.Unlock()
|
|
}
|
|
|
|
f, err := os.Open(currentFile)
|
|
if err == nil {
|
|
f.Seek(lastPos, 0)
|
|
scanner := bufio.NewScanner(f)
|
|
for scanner.Scan() {
|
|
var payload TradePayload
|
|
if err := json.Unmarshal(scanner.Bytes(), &payload); err == nil {
|
|
for _, t := range payload.Data {
|
|
agg.ProcessTrade(t)
|
|
}
|
|
}
|
|
}
|
|
lastPos, _ = f.Seek(0, 1)
|
|
f.Close()
|
|
}
|
|
|
|
time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond)
|
|
}
|
|
}
|