Bug fixes in input and onramp. Hot config reload on signals. Added example utility scripts for signals.
This commit is contained in:
109
input/input.go
109
input/input.go
@@ -2,19 +2,22 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"compress/gzip"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
@@ -83,7 +86,7 @@ func main() {
|
||||
stop := make(chan os.Signal, 1)
|
||||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||||
<-stop
|
||||
|
||||
|
||||
log.Println("Shutting down gracefully...")
|
||||
os.Remove(s.config.StatusSocket)
|
||||
}
|
||||
@@ -122,7 +125,7 @@ func (s *Streamer) statusLoop() {
|
||||
}
|
||||
|
||||
func (s *Streamer) statusServer() {
|
||||
os.Remove(s.config.StatusSocket)
|
||||
os.Remove(s.config.StatusSocket)
|
||||
l, err := net.Listen("unix", s.config.StatusSocket)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to listen on status socket: %v", err)
|
||||
@@ -216,7 +219,7 @@ func (s *Streamer) rotate(t time.Time) {
|
||||
if s.currentFile != nil {
|
||||
s.currentFile.Close()
|
||||
}
|
||||
|
||||
|
||||
if err := os.MkdirAll(s.config.OutputDir, 0755); err != nil {
|
||||
log.Printf("Error creating output dir: %v", err)
|
||||
return
|
||||
@@ -225,7 +228,7 @@ func (s *Streamer) rotate(t time.Time) {
|
||||
s.currentHour = t.Hour()
|
||||
name := fmt.Sprintf("%s_%d.jsonl", s.config.Topic, t.Truncate(time.Hour).Unix())
|
||||
filePath := filepath.Join(s.config.OutputDir, name)
|
||||
|
||||
|
||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
log.Printf("Error opening data file: %v", err)
|
||||
@@ -233,6 +236,100 @@ func (s *Streamer) rotate(t time.Time) {
|
||||
return
|
||||
}
|
||||
s.currentFile = f
|
||||
|
||||
// After rotation, compress old files (keeping current and N-1 as plaintext)
|
||||
go s.compressOldFiles()
|
||||
}
|
||||
|
||||
func (s *Streamer) compressOldFiles() {
|
||||
entries, err := os.ReadDir(s.config.OutputDir)
|
||||
if err != nil {
|
||||
log.Printf("Gzip scan error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all .jsonl files with their timestamps
|
||||
type fileInfo struct {
|
||||
path string
|
||||
timestamp int64
|
||||
}
|
||||
var jsonlFiles []fileInfo
|
||||
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
name := e.Name()
|
||||
if !strings.HasSuffix(name, ".jsonl") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract timestamp from filename: topic_TIMESTAMP.jsonl
|
||||
parts := strings.Split(name, "_")
|
||||
if len(parts) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
tsStr := strings.TrimSuffix(parts[len(parts)-1], ".jsonl")
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fullPath := filepath.Join(s.config.OutputDir, name)
|
||||
jsonlFiles = append(jsonlFiles, fileInfo{path: fullPath, timestamp: ts})
|
||||
}
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
sort.Slice(jsonlFiles, func(i, j int) bool {
|
||||
return jsonlFiles[i].timestamp > jsonlFiles[j].timestamp
|
||||
})
|
||||
|
||||
// Keep the 2 newest files (current + N-1) as plaintext, gzip the rest
|
||||
for i, fi := range jsonlFiles {
|
||||
if i < 2 {
|
||||
// Skip the 2 newest files
|
||||
continue
|
||||
}
|
||||
|
||||
if err := gzipFile(fi.path); err != nil {
|
||||
log.Printf("Gzip failed for %s: %v", filepath.Base(fi.path), err)
|
||||
} else {
|
||||
log.Printf("Compressed %s", filepath.Base(fi.path))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func gzipFile(path string) error {
|
||||
in, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer in.Close()
|
||||
|
||||
outPath := path + ".gz"
|
||||
out, err := os.Create(outPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gw := gzip.NewWriter(out)
|
||||
if _, err := io.Copy(gw, in); err != nil {
|
||||
gw.Close()
|
||||
out.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := gw.Close(); err != nil {
|
||||
out.Close()
|
||||
return err
|
||||
}
|
||||
if err := out.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Remove(path)
|
||||
}
|
||||
|
||||
func setupLogger(c *Config, debugFlag bool) {
|
||||
@@ -273,4 +370,4 @@ func loadConfig(path string) (*Config, error) {
|
||||
if conf.StatusSocket == "" { conf.StatusSocket = "/tmp/streamer.sock" }
|
||||
|
||||
return &conf, nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user