package main import ( "bufio" "database/sql" "encoding/json" "flag" "fmt" "log" "net" "os" "path/filepath" "sort" "strconv" "strings" "sync" "time" _ "github.com/mattn/go-sqlite3" ) // --- Types --- type Config struct { InputDirectory string `json:"input_directory"` FilePattern string `json:"file_pattern"` DatabasePath string `json:"database_path"` StatusHost string `json:"status_host"` StatusPort int `json:"status_port"` PollInterval int `json:"poll_interval_ms"` } type Trade struct { Timestamp int64 `json:"T"` Price string `json:"p"` Volume string `json:"v"` Side string `json:"S"` } type TradePayload struct { Data []Trade `json:"data"` } type Candle struct { Timestamp int64 `json:"timestamp"` Open float64 `json:"open"` High float64 `json:"high"` Low float64 `json:"low"` Close float64 `json:"close"` Volume float64 `json:"volume"` BuyVolume float64 `json:"buy_volume"` } // --- Aggregator --- type Aggregator struct { db *sql.DB mu sync.RWMutex cache map[string]map[int64]*Candle // timeframe -> timestamp -> candle timeframes map[string]int64 stats struct { StartTime time.Time LastFile string TotalCount uint64 LastTS int64 } } func NewAggregator(dbPath string) *Aggregator { db, err := sql.Open("sqlite3", dbPath+"?_journal=WAL&_sync=1") if err != nil { log.Fatal(err) } // This allows us to shrink the file in the background without locking it db.Exec("PRAGMA auto_vacuum = INCREMENTAL;") db.Exec(`CREATE TABLE IF NOT EXISTS candles ( timeframe TEXT, timestamp INTEGER, open REAL, high REAL, low REAL, close REAL, volume REAL, buy_volume REAL, PRIMARY KEY (timeframe, timestamp) )`) _, err = db.Exec(`CREATE TABLE IF NOT EXISTS candles ( timeframe TEXT, timestamp INTEGER, open REAL, high REAL, low REAL, close REAL, volume REAL, buy_volume REAL, PRIMARY KEY (timeframe, timestamp) )`) if err != nil { log.Fatal(err) } a := &Aggregator{ db: db, cache: make(map[string]map[int64]*Candle), timeframes: map[string]int64{ "1m": 60, "5m": 300, "15m": 900, "1h": 3600, }, } a.stats.StartTime = time.Now() for tf := range a.timeframes { a.cache[tf] = make(map[int64]*Candle) } return a } func (a *Aggregator) ProcessTrade(t Trade) { tsS := t.Timestamp / 1000 price, _ := strconv.ParseFloat(t.Price, 64) volume, _ := strconv.ParseFloat(t.Volume, 64) isBuy := strings.ToLower(t.Side) == "buy" a.mu.Lock() defer a.mu.Unlock() a.stats.TotalCount++ a.stats.LastTS = t.Timestamp for tf, seconds := range a.timeframes { candleTS := (tsS / seconds) * seconds c, exists := a.cache[tf][candleTS] if !exists { c = &Candle{ Timestamp: candleTS, Open: price, High: price, Low: price, Close: price, Volume: volume, } if isBuy { c.BuyVolume = volume } a.cache[tf][candleTS] = c } else { if price > c.High { c.High = price } if price < c.Low { c.Low = price } c.Close = price c.Volume += volume if isBuy { c.BuyVolume += volume } } a.saveToDB(tf, c) } } func (a *Aggregator) saveToDB(tf string, c *Candle) { _, err := a.db.Exec(`INSERT INTO candles (timeframe, timestamp, open, high, low, close, volume, buy_volume) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(timeframe, timestamp) DO UPDATE SET high=excluded.high, low=excluded.low, close=excluded.close, volume=excluded.volume, buy_volume=excluded.buy_volume`, tf, c.Timestamp, c.Open, c.High, c.Low, c.Close, c.Volume, c.BuyVolume) if err != nil { log.Printf("DB Error: %v", err) } } func (a *Aggregator) startJanitor() { ticker := time.NewTicker(1 * time.Hour) for range ticker.C { // Calculate cutoff (30 days ago) cutoff := time.Now().AddDate(0, 0, -30).Unix() log.Printf("[JANITOR] Cleaning up data older than %d", cutoff) // 1. Delete old rows _, err := a.db.Exec("DELETE FROM candles WHERE timestamp < ?", cutoff) if err != nil { log.Printf("[JANITOR] Delete error: %v", err) continue } // 2. Incremental Vacuum // This moves empty pages back to the OS 1000 pages at a time. // It prevents the DB from staying huge after a big delete. _, err = a.db.Exec("PRAGMA incremental_vacuum(1000);") if err != nil { log.Printf("[JANITOR] Vacuum error: %v", err) } } } // --- Server --- func (a *Aggregator) serve(host string, port int) { addr := fmt.Sprintf("%s:%d", host, port) l, err := net.Listen("tcp", addr) if err != nil { log.Fatal(err) } log.Printf("Status server listening on %s", addr) for { conn, err := l.Accept() if err != nil { continue } go func(c net.Conn) { defer c.Close() buf := make([]byte, 1024) n, _ := c.Read(buf) cmd := strings.TrimSpace(string(buf[:n])) var response interface{} a.mu.RLock() if cmd == "live" { // Deep copy the cache cacheCopy := make(map[string]map[int64]*Candle) for tf, candles := range a.cache { cacheCopy[tf] = make(map[int64]*Candle) for ts, candle := range candles { // Copy the candle struct candleCopy := *candle cacheCopy[tf][ts] = &candleCopy } } response = map[string]interface{}{ "type": "live_candles", "data": cacheCopy, } } else { response = map[string]interface{}{ "type": "status", "uptime_start": a.stats.StartTime.Format(time.RFC3339), "last_file": a.stats.LastFile, "total_trades": a.stats.TotalCount, "last_ts": a.stats.LastTS, } } a.mu.RUnlock() json.NewEncoder(c).Encode(response) }(conn) } } // --- File Tailer --- func getLatestFile(dir, pattern string) string { files, _ := filepath.Glob(filepath.Join(dir, pattern)) if len(files) == 0 { return "" } sort.Strings(files) return files[len(files)-1] } func main() { configPath := flag.String("config", "config.json", "path to config") flag.Parse() file, _ := os.Open(*configPath) var conf Config json.NewDecoder(file).Decode(&conf) file.Close() agg := NewAggregator(conf.DatabasePath) go agg.startJanitor() go agg.serve(conf.StatusHost, conf.StatusPort) currentFile := "" var lastPos int64 = 0 for { latest := getLatestFile(conf.InputDirectory, conf.FilePattern) if latest == "" { time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond) continue } if latest != currentFile { log.Printf("Rotating to: %s", latest) currentFile = latest lastPos = 0 agg.mu.Lock() agg.stats.LastFile = latest agg.mu.Unlock() } f, err := os.Open(currentFile) if err == nil { f.Seek(lastPos, 0) scanner := bufio.NewScanner(f) for scanner.Scan() { var payload TradePayload if err := json.Unmarshal(scanner.Bytes(), &payload); err == nil { for _, t := range payload.Data { agg.ProcessTrade(t) } } } lastPos, _ = f.Seek(0, 1) f.Close() } time.Sleep(time.Duration(conf.PollInterval) * time.Millisecond) } }