package main import ( "encoding/json" "compress/gzip" "flag" "fmt" "io" "log" "net" "os" "sort" "strconv" "os/signal" "path/filepath" "strings" "sync" "sync/atomic" "syscall" "time" "github.com/gorilla/websocket" ) type Config struct { OutputDir string `json:"output_dir"` Topic string `json:"topic"` WSUrl string `json:"ws_url"` BufferSize int `json:"buffer_size"` StatusInterval int `json:"status_interval"` LogFile string `json:"log_file"` LogToStdout bool `json:"log_to_stdout"` StatusSocket string `json:"status_socket"` } type Streamer struct { config *Config msgChan chan []byte currentFile *os.File currentHour int mu sync.Mutex // Stats fields totalMsgs atomic.Uint64 startTime time.Time } func main() { // 1. Parse CLI Flags debug := flag.Bool("debug", false, "Force logs to stdout (override config)") statusMode := flag.Bool("status", false, "Query status of running instance and exit") configPath := flag.String("config", "config.json", "Path to the configuration file") flag.Parse() // 2. Load Configuration conf, err := loadConfig(*configPath) if err != nil { fmt.Printf("Critical: Failed to load config: %v\n", err) os.Exit(1) } // 3. Handle Status Query Mode if *statusMode { queryStatus(conf.StatusSocket) return } // 4. Setup Logging setupLogger(conf, *debug) // 5. Initialize Streamer s := &Streamer{ config: conf, msgChan: make(chan []byte, conf.BufferSize), startTime: time.Now(), } log.Printf("Starting streamer for topic: %s", conf.Topic) // 6. Start Goroutines go s.writerLoop() go s.managerLoop() go s.statusLoop() go s.statusServer() // 7. Graceful Shutdown stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) <-stop log.Println("Shutting down gracefully...") os.Remove(s.config.StatusSocket) } func queryStatus(socketPath string) { conn, err := net.Dial("unix", socketPath) if err != nil { fmt.Printf("Error: Could not connect to running instance at %s\nIs the service running?\n", socketPath) os.Exit(1) } defer conn.Close() buf, err := io.ReadAll(conn) if err != nil { fmt.Printf("Error reading status: %v\n", err) os.Exit(1) } fmt.Print(string(buf)) } func (s *Streamer) statusLoop() { interval := time.Duration(s.config.StatusInterval) * time.Second ticker := time.NewTicker(interval) for range ticker.C { count := s.totalMsgs.Load() uptime := time.Since(s.startTime).Round(time.Second) mpm := 0.0 if uptime.Seconds() > 0 { mpm = (float64(count) / uptime.Seconds()) * 60 } log.Printf("[STATUS] Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min", uptime, count, mpm) } } func (s *Streamer) statusServer() { os.Remove(s.config.StatusSocket) l, err := net.Listen("unix", s.config.StatusSocket) if err != nil { log.Fatalf("Failed to listen on status socket: %v", err) } defer l.Close() for { conn, err := l.Accept() if err != nil { continue } go func(c net.Conn) { defer c.Close() count := s.totalMsgs.Load() uptime := time.Since(s.startTime).Round(time.Second) mpm := 0.0 if uptime.Seconds() > 0 { mpm = (float64(count) / uptime.Seconds()) * 60 } status := fmt.Sprintf("Uptime: %s | Total Msgs: %d | Rate: %.2f msg/min\n", uptime, count, mpm) c.Write([]byte(status)) }(conn) } } func (s *Streamer) managerLoop() { for { conn, _, err := websocket.DefaultDialer.Dial(s.config.WSUrl, nil) if err != nil { log.Printf("Dial error: %v. Retrying in 5s...", err) time.Sleep(5 * time.Second) continue } s.runHeartbeatAndReader(conn) } } func (s *Streamer) runHeartbeatAndReader(conn *websocket.Conn) { defer conn.Close() sub := fmt.Sprintf(`{"op": "subscribe", "args": ["%s"]}`, s.config.Topic) conn.WriteMessage(websocket.TextMessage, []byte(sub)) done := make(chan struct{}) go func() { ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if err := conn.WriteMessage(websocket.TextMessage, []byte(`{"op":"ping"}`)); err != nil { return } case <-done: return } } }() for { _, msg, err := conn.ReadMessage() if err != nil { close(done) return } s.totalMsgs.Add(1) select { case s.msgChan <- msg: default: log.Println("Buffer Overflow: Message dropped.") } } } func (s *Streamer) writerLoop() { for msg := range s.msgChan { now := time.Now().UTC() s.mu.Lock() if s.currentFile == nil || now.Hour() != s.currentHour { s.rotate(now) } if s.currentFile != nil { s.currentFile.Write(append(msg, '\n')) } s.mu.Unlock() } } func (s *Streamer) rotate(t time.Time) { if s.currentFile != nil { s.currentFile.Close() } if err := os.MkdirAll(s.config.OutputDir, 0755); err != nil { log.Printf("Error creating output dir: %v", err) return } s.currentHour = t.Hour() name := fmt.Sprintf("%s_%d.jsonl", s.config.Topic, t.Truncate(time.Hour).Unix()) filePath := filepath.Join(s.config.OutputDir, name) f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { log.Printf("Error opening data file: %v", err) s.currentFile = nil return } s.currentFile = f // After rotation, compress old files (keeping current and N-1 as plaintext) go s.compressOldFiles() } func (s *Streamer) compressOldFiles() { entries, err := os.ReadDir(s.config.OutputDir) if err != nil { log.Printf("Gzip scan error: %v", err) return } // Collect all .jsonl files with their timestamps type fileInfo struct { path string timestamp int64 } var jsonlFiles []fileInfo for _, e := range entries { if e.IsDir() { continue } name := e.Name() if !strings.HasSuffix(name, ".jsonl") { continue } // Extract timestamp from filename: topic_TIMESTAMP.jsonl parts := strings.Split(name, "_") if len(parts) < 2 { continue } tsStr := strings.TrimSuffix(parts[len(parts)-1], ".jsonl") ts, err := strconv.ParseInt(tsStr, 10, 64) if err != nil { continue } fullPath := filepath.Join(s.config.OutputDir, name) jsonlFiles = append(jsonlFiles, fileInfo{path: fullPath, timestamp: ts}) } // Sort by timestamp (newest first) sort.Slice(jsonlFiles, func(i, j int) bool { return jsonlFiles[i].timestamp > jsonlFiles[j].timestamp }) // Keep the 2 newest files (current + N-1) as plaintext, gzip the rest for i, fi := range jsonlFiles { if i < 2 { // Skip the 2 newest files continue } if err := gzipFile(fi.path); err != nil { log.Printf("Gzip failed for %s: %v", filepath.Base(fi.path), err) } else { log.Printf("Compressed %s", filepath.Base(fi.path)) } } } func gzipFile(path string) error { in, err := os.Open(path) if err != nil { return err } defer in.Close() outPath := path + ".gz" out, err := os.Create(outPath) if err != nil { return err } gw := gzip.NewWriter(out) if _, err := io.Copy(gw, in); err != nil { gw.Close() out.Close() return err } if err := gw.Close(); err != nil { out.Close() return err } if err := out.Close(); err != nil { return err } return os.Remove(path) } func setupLogger(c *Config, debugFlag bool) { f, err := os.OpenFile(c.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { fmt.Printf("Failed to open log file: %v\n", err) return } // If CLI -debug is on OR config log_to_stdout is true if debugFlag || c.LogToStdout { log.SetOutput(io.MultiWriter(f, os.Stdout)) } else { log.SetOutput(f) } } func loadConfig(path string) (*Config, error) { f, err := os.Open(path) if err != nil { return nil, err } defer f.Close() var conf Config dec := json.NewDecoder(f) if err := dec.Decode(&conf); err != nil { return nil, err } // Defaults if conf.OutputDir == "" { conf.OutputDir = "./output" } if conf.Topic == "" { conf.Topic = "publicTrade.BTCUSDT" } if conf.WSUrl == "" { conf.WSUrl = "wss://stream.bybit.com/v5/public/linear" } if conf.BufferSize == 0 { conf.BufferSize = 10000 } if conf.StatusInterval == 0 { conf.StatusInterval = 30 } if conf.LogFile == "" { conf.LogFile = "system.log" } if conf.StatusSocket == "" { conf.StatusSocket = "/tmp/streamer.sock" } return &conf, nil }