#!/usr/bin/env python3 """ BTCUSDT Signal Generator - Multi-Personality Edition Generates trading signals from candles.db and analysis.db Runs both scalping and swing personalities simultaneously Streams signals via Unix Domain Socket """ import sqlite3 import pandas as pd import numpy as np import talib import json import time import logging import signal import sys import os import socket from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, List import threading from collections import deque # Configuration class Config: def __init__(self, config_path: str = "config.json"): self.config_path = config_path self.reload() def reload(self): """Reload configuration from file""" with open(self.config_path, "r") as f: data = json.load(f) self.candles_db = data.get("candles_db", "../onramp/candles.db") self.analysis_db = data.get("analysis_db", "../analysis/analysis.db") self.socket_path = data.get("socket_path", "/tmp/signals.sock") self.health_socket_path = data.get( "health_socket_path", "/tmp/signals_health.sock" ) self.control_socket_path = data.get( "control_socket_path", "/tmp/signals_control.sock" ) self.log_file = data.get("log_file", "logs/signal_generator.log") self.log_to_stdout = data.get("log_to_stdout", True) self.poll_interval = data.get("poll_interval", 0.5) self.timeframes = data.get("timeframes", ["1m", "5m"]) self.lookback = data.get("lookback", 200) # Signal thresholds (can be personality-specific) self.min_confidence = data.get("min_confidence", 0.6) self.cooldown_seconds = data.get("cooldown_seconds", 60) # Personality-specific weights self.weights = data.get( "weights", { "scalping": { "ema_cross": 0.3, "stoch": 0.25, "rsi": 0.2, "volume": 0.15, "macd": 0.1, }, "swing": { "regime": 0.35, "bb_squeeze": 0.25, "macd": 0.2, "flow": 0.15, "rsi": 0.05, }, }, ) # Logging setup def setup_logging(config: Config): Path(config.log_file).parent.mkdir(parents=True, exist_ok=True) handlers = [logging.FileHandler(config.log_file)] if config.log_to_stdout: handlers.append(logging.StreamHandler(sys.stdout)) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=handlers, ) return logging.getLogger(__name__) # Signal Generator Class (per-personality) class PersonalityEngine: """Single personality signal generation engine""" def __init__(self, personality: str, config: Config, logger: logging.Logger): self.personality = personality self.config = config self.logger = logger self.last_signal_time = {} self.signal_history = deque(maxlen=100) self.stats = { "total_signals": 0, "buy_signals": 0, "sell_signals": 0, "last_signal_time": None, "errors": 0, } self.lock = threading.Lock() def fetch_and_enrich(self, timeframe: str) -> Optional[pd.DataFrame]: """Fetch data from databases and enrich with additional indicators""" try: conn_c = sqlite3.connect( f"file:{self.config.candles_db}?mode=ro", uri=True, timeout=10, ) # ATTACH the analysis database so we can JOIN across them conn_c.execute( f"ATTACH DATABASE 'file:{self.config.analysis_db}?mode=ro' AS analysis_db" ) query = """ SELECT c.timeframe, c.timestamp, c.open, c.high, c.low, c.close, c.volume, c.buy_volume, a.ema_9, a.ema_21, a.sma_50, a.sma_200, a.rsi_14, a.macd, a.macd_signal, a.macd_hist, a.bb_upper, a.bb_middle, a.bb_lower, a.bb_squeeze, a.volume_ma_20 FROM candles c JOIN analysis_db.analysis a ON c.timeframe = a.timeframe AND c.timestamp = a.timestamp WHERE c.timeframe = ? ORDER BY c.timestamp DESC LIMIT ? """ df = pd.read_sql_query( query, conn_c, params=(timeframe, self.config.lookback) ) conn_c.close() if df.empty: return None # Sort chronologically df = df.sort_values("timestamp").reset_index(drop=True) df["datetime"] = pd.to_datetime(df["timestamp"], unit="s") # Filter only closed candles (exclude current forming candle) current_time = int(time.time()) if timeframe == "1m": window = 60 elif timeframe == "5m": window = 300 elif timeframe == "15m": window = 900 elif timeframe == "1h": window = 3600 else: window = 60 df = df[df["timestamp"] < (current_time - window)] if len(df) < 50: self.logger.debug(f"[{self.personality}] Not enough data for {timeframe}: {len(df)} rows") return None # Drop rows with NULL in critical columns df = df.dropna(subset=["open", "high", "low", "close", "volume"]) if len(df) < 50: self.logger.debug( f"[{self.personality}] Not enough valid data after NULL filtering for {timeframe}" ) return None # Add Stochastic Oscillator df["stoch_k"], df["stoch_d"] = talib.STOCH( df["high"].values, df["low"].values, df["close"].values, fastk_period=14, slowk_period=3, slowd_period=3, ) # Calculate buy/sell ratio df["buy_ratio"] = df["buy_volume"] / df["volume"].replace(0, np.nan) df["net_flow"] = df["buy_volume"] - (df["volume"] - df["buy_volume"]) return df except Exception as e: self.logger.error(f"[{self.personality}] Error fetching data for {timeframe}: {e}") with self.lock: self.stats["errors"] += 1 return None def generate_signal_scalping( self, df: pd.DataFrame, timeframe: str ) -> Optional[Dict]: """Generate signal using scalping personality""" if len(df) < 21: self.logger.debug(f"[{self.personality}/{timeframe}] Insufficient data: {len(df)} rows") return None latest = df.iloc[-1] prev = df.iloc[-2] # Check for NULL indicators required_cols = [ "ema_9", "ema_21", "rsi_14", "stoch_k", "stoch_d", "macd", "macd_signal", ] if any(pd.isna(latest[col]) for col in required_cols): self.logger.debug(f"[{self.personality}/{timeframe}] Skipping: missing required indicators") return None score = 0 reasons = [] signal_type = None weights = self.config.weights["scalping"] # EMA Crossover (9/21) ema_cross_up = ( latest["ema_9"] > latest["ema_21"] and prev["ema_9"] <= prev["ema_21"] ) ema_cross_down = ( latest["ema_9"] < latest["ema_21"] and prev["ema_9"] >= prev["ema_21"] ) if ema_cross_up: score += weights["ema_cross"] reasons.append("EMA9 crossed above EMA21") signal_type = "BUY" elif ema_cross_down: score += weights["ema_cross"] reasons.append("EMA9 crossed below EMA21") signal_type = "SELL" # Stochastic if signal_type == "BUY": if latest["stoch_k"] > latest["stoch_d"] and latest["stoch_k"] < 30: score += weights["stoch"] reasons.append("Stochastic oversold crossover") elif signal_type == "SELL": if latest["stoch_k"] < latest["stoch_d"] and latest["stoch_k"] > 70: score += weights["stoch"] reasons.append("Stochastic overbought crossover") # RSI if signal_type == "BUY" and latest["rsi_14"] < 40: score += weights["rsi"] reasons.append(f"RSI undersold ({latest['rsi_14']:.1f})") elif signal_type == "SELL" and latest["rsi_14"] > 60: score += weights["rsi"] reasons.append(f"RSI oversold ({latest['rsi_14']:.1f})") # Volume surge if latest["volume"] > 1.5 * latest["volume_ma_20"]: score += weights["volume"] reasons.append("Volume surge detected") # MACD confirmation if signal_type == "BUY" and latest["macd"] > latest["macd_signal"]: score += weights["macd"] reasons.append("MACD bullish") elif signal_type == "SELL" and latest["macd"] < latest["macd_signal"]: score += weights["macd"] reasons.append("MACD bearish") if signal_type and score >= self.config.min_confidence: return { "signal": signal_type, "timeframe": timeframe, "confidence": round(score, 3), "price": float(latest["close"]), "timestamp": int(latest["timestamp"]), "reasons": reasons, "personality": "scalping", } return None def generate_signal_swing(self, df: pd.DataFrame, timeframe: str) -> Optional[Dict]: """Generate signal using swing personality""" if len(df) < 200: return None latest = df.iloc[-1] prev = df.iloc[-2] # Check for NULL indicators required_cols = [ "sma_50", "sma_200", "bb_upper", "bb_lower", "bb_squeeze", "macd", "macd_signal", "buy_ratio", ] if any(pd.isna(latest[col]) for col in required_cols): self.logger.debug(f"[{self.personality}/{timeframe}] Skipping: missing required indicators") return None score = 0 reasons = [] signal_type = None weights = self.config.weights["swing"] # Regime filter (SMA50/200) bull_regime = latest["close"] > latest["sma_50"] > latest["sma_200"] bear_regime = latest["close"] < latest["sma_50"] < latest["sma_200"] if bull_regime: signal_type = "BUY" score += weights["regime"] reasons.append("Bull regime (price > SMA50 > SMA200)") elif bear_regime: signal_type = "SELL" score += weights["regime"] reasons.append("Bear regime (price < SMA50 < SMA200)") # Bollinger Squeeze breakout if latest["bb_squeeze"] == 1 or prev["bb_squeeze"] == 1: if signal_type == "BUY" and latest["close"] > latest["bb_upper"]: score += weights["bb_squeeze"] reasons.append("BB squeeze breakout to upside") elif signal_type == "SELL" and latest["close"] < latest["bb_lower"]: score += weights["bb_squeeze"] reasons.append("BB squeeze breakout to downside") # MACD crossover if ( signal_type == "BUY" and latest["macd"] > latest["macd_signal"] and prev["macd"] <= prev["macd_signal"] ): score += weights["macd"] reasons.append("MACD bullish crossover") elif ( signal_type == "SELL" and latest["macd"] < latest["macd_signal"] and prev["macd"] >= prev["macd_signal"] ): score += weights["macd"] reasons.append("MACD bearish crossover") # Net flow if signal_type == "BUY" and latest["buy_ratio"] > 0.55: score += weights["flow"] reasons.append(f"Buy pressure ({latest['buy_ratio']:.2%})") elif signal_type == "SELL" and latest["buy_ratio"] < 0.45: score += weights["flow"] reasons.append(f"Sell pressure ({latest['buy_ratio']:.2%})") # RSI (light filter for swing) if signal_type == "BUY" and latest["rsi_14"] < 50: score += weights["rsi"] reasons.append("RSI not overbought") elif signal_type == "SELL" and latest["rsi_14"] > 50: score += weights["rsi"] reasons.append("RSI not oversold") if signal_type and score >= self.config.min_confidence: return { "signal": signal_type, "timeframe": timeframe, "confidence": round(score, 3), "price": float(latest["close"]), "timestamp": int(latest["timestamp"]), "reasons": reasons, "personality": "swing", } return None def generate_signal(self, timeframe: str) -> Optional[Dict]: """Main signal generation dispatcher""" # Check cooldown cooldown_key = f"{self.personality}_{timeframe}" if cooldown_key in self.last_signal_time: elapsed = time.time() - self.last_signal_time[cooldown_key] if elapsed < self.config.cooldown_seconds: return None df = self.fetch_and_enrich(timeframe) if df is None: return None if self.personality == "scalping": signal = self.generate_signal_scalping(df, timeframe) elif self.personality == "swing": signal = self.generate_signal_swing(df, timeframe) else: self.logger.error(f"Unknown personality: {self.personality}") return None if signal: self.last_signal_time[cooldown_key] = time.time() signal["generated_at"] = datetime.now(timezone.utc).isoformat() # Update stats with self.lock: self.stats["total_signals"] += 1 if signal["signal"] == "BUY": self.stats["buy_signals"] += 1 else: self.stats["sell_signals"] += 1 self.stats["last_signal_time"] = signal["generated_at"] self.signal_history.append(signal) return signal # Main Signal Generator Coordinator class SignalGenerator: def __init__(self, config: Config, logger: logging.Logger): self.config = config self.logger = logger self.running = False self.debug_mode = False # Create personality engines self.engines = { "scalping": PersonalityEngine("scalping", config, logger), "swing": PersonalityEngine("swing", config, logger), } self.global_stats = { "uptime_start": datetime.now(timezone.utc), "config_reloads": 0, } # Unix socket self.socket = None self.connections = [] self.connections_lock = threading.Lock() # Health check socket self.health_socket = None # Control socket self.control_socket = None # Thread pool self.threads = [] def broadcast_signal(self, signal: Dict): """Broadcast signal to all connected clients""" message = json.dumps(signal) + "\n" message_bytes = message.encode("utf-8") with self.connections_lock: disconnected = [] for conn in self.connections: try: conn.sendall(message_bytes) self.logger.info( f"[{signal['personality']}] Sent {signal['signal']} signal: " f"{signal['timeframe']} @ {signal['price']} (conf: {signal['confidence']})" ) except Exception as e: self.logger.warning(f"Failed to send to client: {e}") disconnected.append(conn) # Remove disconnected clients for conn in disconnected: try: conn.close() except: pass self.connections.remove(conn) def setup_signal_socket(self): """Setup Unix domain socket for signal streaming""" try: if os.path.exists(self.config.socket_path): os.remove(self.config.socket_path) self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.socket.bind(self.config.socket_path) self.socket.listen(5) self.socket.settimeout(0.1) self.logger.info(f"Signal socket listening on {self.config.socket_path}") except Exception as e: self.logger.error(f"Failed to setup signal socket: {e}") raise def setup_health_socket(self): """Setup Unix domain socket for health checks""" try: if os.path.exists(self.config.health_socket_path): os.remove(self.config.health_socket_path) self.health_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.health_socket.bind(self.config.health_socket_path) self.health_socket.listen(5) self.health_socket.settimeout(0.1) self.logger.info( f"Health socket listening on {self.config.health_socket_path}" ) except Exception as e: self.logger.error(f"Failed to setup health socket: {e}") raise def setup_control_socket(self): """Setup Unix domain socket for control commands""" try: if os.path.exists(self.config.control_socket_path): os.remove(self.config.control_socket_path) self.control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.control_socket.bind(self.config.control_socket_path) self.control_socket.listen(5) self.control_socket.settimeout(0.1) self.logger.info( f"Control socket listening on {self.config.control_socket_path}" ) except Exception as e: self.logger.error(f"Failed to setup control socket: {e}") raise def check_database_status(self) -> Dict: """Check database connectivity and data availability""" status = { "candles_db": {"accessible": False, "row_count": 0, "timeframes": {}}, "analysis_db": {"accessible": False, "row_count": 0, "timeframes": {}}, } # Check candles DB try: conn_c = sqlite3.connect( f"file:{self.config.candles_db}?mode=ro", uri=True, timeout=5 ) cursor = conn_c.cursor() cursor.execute("SELECT COUNT(*) FROM candles") status["candles_db"]["row_count"] = cursor.fetchone()[0] status["candles_db"]["accessible"] = True for tf in self.config.timeframes: cursor.execute( "SELECT COUNT(*), MAX(timestamp) FROM candles WHERE timeframe = ?", (tf,), ) count, max_ts = cursor.fetchone() status["candles_db"]["timeframes"][tf] = { "count": count, "latest_timestamp": max_ts, "age_seconds": int(time.time() - max_ts) if max_ts else None, } conn_c.close() except Exception as e: status["candles_db"]["error"] = str(e) # Check analysis DB try: conn_a = sqlite3.connect( f"file:{self.config.analysis_db}?mode=ro", uri=True, timeout=5 ) cursor = conn_a.cursor() cursor.execute("SELECT COUNT(*) FROM analysis") status["analysis_db"]["row_count"] = cursor.fetchone()[0] status["analysis_db"]["accessible"] = True for tf in self.config.timeframes: cursor.execute( "SELECT COUNT(*), MAX(timestamp) FROM analysis WHERE timeframe = ?", (tf,), ) count, max_ts = cursor.fetchone() status["analysis_db"]["timeframes"][tf] = { "count": count, "latest_timestamp": max_ts, "age_seconds": int(time.time() - max_ts) if max_ts else None, } conn_a.close() except Exception as e: status["analysis_db"]["error"] = str(e) return status def handle_health_checks(self): """Handle incoming health check requests""" try: conn, _ = self.health_socket.accept() uptime = datetime.now(timezone.utc) - self.global_stats["uptime_start"] db_status = self.check_database_status() # Aggregate stats from both engines total_stats = { "total_signals": 0, "buy_signals": 0, "sell_signals": 0, "errors": 0, } personality_stats = {} recent_signals = [] for name, engine in self.engines.items(): with engine.lock: personality_stats[name] = { "total_signals": engine.stats["total_signals"], "buy_signals": engine.stats["buy_signals"], "sell_signals": engine.stats["sell_signals"], "last_signal": engine.stats["last_signal_time"], "errors": engine.stats["errors"], "recent_signals": list(engine.signal_history)[-5:], } total_stats["total_signals"] += engine.stats["total_signals"] total_stats["buy_signals"] += engine.stats["buy_signals"] total_stats["sell_signals"] += engine.stats["sell_signals"] total_stats["errors"] += engine.stats["errors"] recent_signals.extend(list(engine.signal_history)[-5:]) # Sort recent signals by timestamp recent_signals.sort(key=lambda x: x.get("timestamp", 0), reverse=True) health = { "status": "running", "mode": "multi-personality", "personalities": ["scalping", "swing"], "timeframes": self.config.timeframes, "uptime_seconds": int(uptime.total_seconds()), "total_stats": total_stats, "personality_stats": personality_stats, "connected_clients": len(self.connections), "recent_signals": recent_signals[:10], "databases": db_status, "config": { "min_confidence": self.config.min_confidence, "cooldown_seconds": self.config.cooldown_seconds, "lookback": self.config.lookback, "weights": self.config.weights, "reloads": self.global_stats["config_reloads"], }, "debug_mode": self.debug_mode, } conn.sendall(json.dumps(health, indent=2).encode("utf-8") + b"\n") conn.close() except socket.timeout: pass except Exception as e: self.logger.debug(f"Health check error: {e}") def handle_control_commands(self): """Handle incoming control commands""" try: conn, _ = self.control_socket.accept() data = conn.recv(4096).decode("utf-8").strip() if not data: conn.close() return try: cmd = json.loads(data) response = self.process_command(cmd) except json.JSONDecodeError: response = {"status": "error", "message": "Invalid JSON"} conn.sendall(json.dumps(response, indent=2).encode("utf-8") + b"\n") conn.close() except socket.timeout: pass except Exception as e: self.logger.debug(f"Control command error: {e}") def process_command(self, cmd: Dict) -> Dict: """Process control commands""" action = cmd.get("action") if action == "reload": try: old_confidence = self.config.min_confidence self.config.reload() self.global_stats["config_reloads"] += 1 self.logger.info(f"Config reloaded: min_confidence={self.config.min_confidence}") return { "status": "success", "message": "Configuration reloaded", "changes": { "min_confidence": { "old": old_confidence, "new": self.config.min_confidence, }, }, } except Exception as e: return {"status": "error", "message": str(e)} elif action == "set_confidence": try: confidence = float(cmd.get("value")) if 0 <= confidence <= 1: self.config.min_confidence = confidence self.logger.info(f"Min confidence changed to: {confidence}") return { "status": "success", "message": f"Min confidence set to {confidence}", } else: return { "status": "error", "message": "Confidence must be between 0 and 1", } except (TypeError, ValueError): return {"status": "error", "message": "Invalid confidence value"} elif action == "set_cooldown": try: cooldown = int(cmd.get("value")) if cooldown >= 0: self.config.cooldown_seconds = cooldown self.logger.info(f"Cooldown changed to: {cooldown}s") return { "status": "success", "message": f"Cooldown set to {cooldown}s", } else: return {"status": "error", "message": "Cooldown must be >= 0"} except (TypeError, ValueError): return {"status": "error", "message": "Invalid cooldown value"} elif action == "toggle_debug": self.debug_mode = not self.debug_mode level = logging.DEBUG if self.debug_mode else logging.INFO self.logger.setLevel(level) self.logger.info(f"Debug mode: {'ON' if self.debug_mode else 'OFF'}") return { "status": "success", "message": f"Debug mode: {'ON' if self.debug_mode else 'OFF'}", } elif action == "clear_cooldowns": for engine in self.engines.values(): engine.last_signal_time.clear() self.logger.info("All signal cooldowns cleared") return {"status": "success", "message": "All cooldowns cleared"} elif action == "reset_stats": for engine in self.engines.values(): with engine.lock: engine.stats = { "total_signals": 0, "buy_signals": 0, "sell_signals": 0, "last_signal_time": None, "errors": 0, } engine.signal_history.clear() self.global_stats["uptime_start"] = datetime.now(timezone.utc) self.logger.info("Statistics reset") return {"status": "success", "message": "Statistics reset"} else: return {"status": "error", "message": f"Unknown action: {action}"} def accept_connections(self): """Accept new client connections""" try: conn, _ = self.socket.accept() with self.connections_lock: self.connections.append(conn) self.logger.info( f"New client connected. Total clients: {len(self.connections)}" ) except socket.timeout: pass except Exception as e: self.logger.debug(f"Accept error: {e}") def personality_worker(self, personality: str): """Worker thread for a specific personality""" engine = self.engines[personality] self.logger.info(f"[{personality}] Worker thread started") while self.running: try: for timeframe in self.config.timeframes: try: signal = engine.generate_signal(timeframe) if signal: self.broadcast_signal(signal) except Exception as e: self.logger.error(f"[{personality}] Error processing {timeframe}: {e}") with engine.lock: engine.stats["errors"] += 1 time.sleep(self.config.poll_interval) except Exception as e: self.logger.error(f"[{personality}] Worker error: {e}") with engine.lock: engine.stats["errors"] += 1 time.sleep(1) # Brief pause on error self.logger.info(f"[{personality}] Worker thread stopped") def run(self): """Main processing loop""" self.running = True self.setup_signal_socket() self.setup_health_socket() self.setup_control_socket() self.logger.info("Multi-personality signal generator started") self.logger.info(f"Running personalities: scalping, swing") self.logger.info(f"Monitoring timeframes: {', '.join(self.config.timeframes)}") self.logger.info(f"Poll interval: {self.config.poll_interval}s") # Start personality worker threads for personality in ["scalping", "swing"]: thread = threading.Thread( target=self.personality_worker, args=(personality,), name=f"{personality}-worker", daemon=True ) thread.start() self.threads.append(thread) try: # Main thread handles connections and management while self.running: self.accept_connections() self.handle_health_checks() self.handle_control_commands() time.sleep(0.1) except KeyboardInterrupt: self.logger.info("Received interrupt signal") finally: self.cleanup() def cleanup(self): """Cleanup resources""" self.logger.info("Shutting down...") self.running = False # Wait for worker threads self.logger.info("Waiting for worker threads to finish...") for thread in self.threads: thread.join(timeout=2.0) # Close connections with self.connections_lock: for conn in self.connections: try: conn.close() except: pass if self.socket: self.socket.close() if os.path.exists(self.config.socket_path): os.remove(self.config.socket_path) if self.health_socket: self.health_socket.close() if os.path.exists(self.config.health_socket_path): os.remove(self.config.health_socket_path) if self.control_socket: self.control_socket.close() if os.path.exists(self.config.control_socket_path): os.remove(self.config.control_socket_path) self.logger.info("Shutdown complete") def main(): config = Config() logger = setup_logging(config) generator = SignalGenerator(config, logger) # Signal handlers def reload_config(sig, frame): """SIGUSR1: Reload configuration""" logger.info("Received SIGUSR1 - Reloading configuration...") try: old_confidence = config.min_confidence config.reload() generator.global_stats["config_reloads"] += 1 logger.info( f"Configuration reloaded successfully " f"(min_confidence: {old_confidence} -> {config.min_confidence})" ) except Exception as e: logger.error(f"Failed to reload configuration: {e}") def toggle_debug(sig, frame): """SIGUSR2: Toggle debug logging""" generator.debug_mode = not generator.debug_mode level = logging.DEBUG if generator.debug_mode else logging.INFO logger.setLevel(level) logger.info(f"Debug mode {'enabled' if generator.debug_mode else 'disabled'}") def shutdown(sig, frame): """SIGINT/SIGTERM: Graceful shutdown""" generator.running = False signal.signal(signal.SIGUSR1, reload_config) signal.signal(signal.SIGUSR2, toggle_debug) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) generator.run() if __name__ == "__main__": main()