package main import ( "encoding/json" "flag" "fmt" "io" "log" "net" "os" "os/signal" "path/filepath" "sync" "sync/atomic" "syscall" "time" "github.com/gorilla/websocket" ) type Config struct { OutputDir string `json:"output_dir"` Topic string `json:"topic"` WSUrl string `json:"ws_url"` BufferSize int `json:"buffer_size"` StatusInterval int `json:"status_interval"` LogFile string `json:"log_file"` LogToStdout bool `json:"log_to_stdout"` StatusSocket string `json:"status_socket"` } type Streamer struct { config *Config msgChan chan []byte currentFile *os.File currentHour int mu sync.Mutex // Stats fields totalMsgs atomic.Uint64 startTime time.Time } func main() { // 1. Parse CLI Flags debug := flag.Bool("debug", false, "Force logs to stdout (override config)") statusMode := flag.Bool("status", false, "Query status of running instance and exit") configPath := flag.String("config", "config.json", "Path to the configuration file") flag.Parse() // 2. Load Configuration conf, err := loadConfig(*configPath) if err != nil { fmt.Printf("Critical: Failed to load config: %v\n", err) os.Exit(1) } // 3. Handle Status Query Mode if *statusMode { queryStatus(conf.StatusSocket) return } // 4. Setup Logging setupLogger(conf, *debug) // 5. Initialize Streamer s := &Streamer{ config: conf, msgChan: make(chan []byte, conf.BufferSize), startTime: time.Now(), } log.Printf("Starting streamer for topic: %s", conf.Topic) // 6. Start Goroutines go s.writerLoop() go s.managerLoop() go s.statusLoop() go s.statusServer() // 7. Graceful Shutdown stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) <-stop log.Println("Shutting down gracefully...") os.Remove(s.config.StatusSocket) } func queryStatus(socketPath string) { conn, err := net.Dial("unix", socketPath) if err != nil { fmt.Printf("Error: Could not connect to running instance at %s\nIs the service running?\n", socketPath) os.Exit(1) } defer conn.Close() buf, err := io.ReadAll(conn) if err != nil { fmt.Printf("Error reading status: %v\n", err) os.Exit(1) } fmt.Print(string(buf)) } func (s *Streamer) statusLoop() { interval := time.Duration(s.config.StatusInterval) * time.Second ticker := time.NewTicker(interval) for range ticker.C { count := s.totalMsgs.Load() uptime := time.Since(s.startTime).Round(time.Second) mpm := 0.0 if uptime.Seconds() > 0 { mpm = (float64(count) / uptime.Seconds()) * 60 } log.Printf("[STATUS] Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min", uptime, count, mpm) } } func (s *Streamer) statusServer() { os.Remove(s.config.StatusSocket) l, err := net.Listen("unix", s.config.StatusSocket) if err != nil { log.Fatalf("Failed to listen on status socket: %v", err) } defer l.Close() for { conn, err := l.Accept() if err != nil { continue } go func(c net.Conn) { defer c.Close() count := s.totalMsgs.Load() uptime := time.Since(s.startTime).Round(time.Second) mpm := 0.0 if uptime.Seconds() > 0 { mpm = (float64(count) / uptime.Seconds()) * 60 } status := fmt.Sprintf("Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min\n", uptime, count, mpm) c.Write([]byte(status)) }(conn) } } func (s *Streamer) managerLoop() { for { conn, _, err := websocket.DefaultDialer.Dial(s.config.WSUrl, nil) if err != nil { log.Printf("Dial error: %v. Retrying in 5s...", err) time.Sleep(5 * time.Second) continue } s.runHeartbeatAndReader(conn) } } func (s *Streamer) runHeartbeatAndReader(conn *websocket.Conn) { defer conn.Close() sub := fmt.Sprintf(`{"op": "subscribe", "args": ["%s"]}`, s.config.Topic) conn.WriteMessage(websocket.TextMessage, []byte(sub)) done := make(chan struct{}) go func() { ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if err := conn.WriteMessage(websocket.TextMessage, []byte(`{"op":"ping"}`)); err != nil { return } case <-done: return } } }() for { _, msg, err := conn.ReadMessage() if err != nil { close(done) return } s.totalMsgs.Add(1) select { case s.msgChan <- msg: default: log.Println("Buffer Overflow: Message dropped.") } } } func (s *Streamer) writerLoop() { for msg := range s.msgChan { now := time.Now().UTC() s.mu.Lock() if s.currentFile == nil || now.Hour() != s.currentHour { s.rotate(now) } if s.currentFile != nil { s.currentFile.Write(append(msg, '\n')) } s.mu.Unlock() } } func (s *Streamer) rotate(t time.Time) { if s.currentFile != nil { s.currentFile.Close() } if err := os.MkdirAll(s.config.OutputDir, 0755); err != nil { log.Printf("Error creating output dir: %v", err) return } s.currentHour = t.Hour() name := fmt.Sprintf("%s_%d.jsonl", s.config.Topic, t.Truncate(time.Hour).Unix()) filePath := filepath.Join(s.config.OutputDir, name) f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Printf("Error opening data file: %v", err) s.currentFile = nil return } s.currentFile = f } func setupLogger(c *Config, debugFlag bool) { f, err := os.OpenFile(c.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { fmt.Printf("Failed to open log file: %v\n", err) return } // If CLI -debug is on OR config log_to_stdout is true if debugFlag || c.LogToStdout { log.SetOutput(io.MultiWriter(f, os.Stdout)) } else { log.SetOutput(f) } } func loadConfig(path string) (*Config, error) { f, err := os.Open(path) if err != nil { return nil, err } defer f.Close() var conf Config dec := json.NewDecoder(f) if err := dec.Decode(&conf); err != nil { return nil, err } // Defaults if conf.OutputDir == "" { conf.OutputDir = "./output" } if conf.Topic == "" { conf.Topic = "publicTrade.BTCUSDT" } if conf.WSUrl == "" { conf.WSUrl = "wss://stream.bybit.com/v5/public/linear" } if conf.BufferSize == 0 { conf.BufferSize = 10000 } if conf.StatusInterval == 0 { conf.StatusInterval = 30 } if conf.LogFile == "" { conf.LogFile = "system.log" } if conf.StatusSocket == "" { conf.StatusSocket = "/tmp/streamer.sock" } return &conf, nil }