diff --git a/signals/signals.py b/signals/signals.py old mode 100755 new mode 100644 index 337bb28..d5e5d61 --- a/signals/signals.py +++ b/signals/signals.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 """ -BTCUSDT Signal Generator +BTCUSDT Signal Generator - Multi-Personality Edition Generates trading signals from candles.db and analysis.db +Runs both scalping and swing personalities simultaneously Streams signals via Unix Domain Socket """ @@ -46,11 +47,10 @@ class Config: self.log_file = data.get("log_file", "logs/signal_generator.log") self.log_to_stdout = data.get("log_to_stdout", True) self.poll_interval = data.get("poll_interval", 0.5) - self.personality = data.get("personality", "scalping") self.timeframes = data.get("timeframes", ["1m", "5m"]) self.lookback = data.get("lookback", 200) - # Signal thresholds + # Signal thresholds (can be personality-specific) self.min_confidence = data.get("min_confidence", 0.6) self.cooldown_seconds = data.get("cooldown_seconds", 60) @@ -92,13 +92,14 @@ def setup_logging(config: Config): return logging.getLogger(__name__) -# Signal Generator Class -class SignalGenerator: - def __init__(self, config: Config, logger: logging.Logger): +# Signal Generator Class (per-personality) +class PersonalityEngine: + """Single personality signal generation engine""" + + def __init__(self, personality: str, config: Config, logger: logging.Logger): + self.personality = personality self.config = config self.logger = logger - self.running = False - self.debug_mode = False self.last_signal_time = {} self.signal_history = deque(maxlen=100) self.stats = { @@ -106,20 +107,9 @@ class SignalGenerator: "buy_signals": 0, "sell_signals": 0, "last_signal_time": None, - "uptime_start": datetime.now(timezone.utc), "errors": 0, - "config_reloads": 0, } - - # Unix socket - self.socket = None - self.connections = [] - - # Health check socket - self.health_socket = None - - # Control socket - self.control_socket = None + self.lock = threading.Lock() def fetch_and_enrich(self, timeframe: str) -> Optional[pd.DataFrame]: """Fetch data from databases and enrich with additional indicators""" @@ -181,7 +171,7 @@ class SignalGenerator: df = df[df["timestamp"] < (current_time - window)] if len(df) < 50: - self.logger.debug(f"Not enough data for {timeframe}: {len(df)} rows") + self.logger.debug(f"[{self.personality}] Not enough data for {timeframe}: {len(df)} rows") return None # Drop rows with NULL in critical columns @@ -189,7 +179,7 @@ class SignalGenerator: if len(df) < 50: self.logger.debug( - f"Not enough valid data after NULL filtering for {timeframe}" + f"[{self.personality}] Not enough valid data after NULL filtering for {timeframe}" ) return None @@ -210,8 +200,9 @@ class SignalGenerator: return df except Exception as e: - self.logger.error(f"Error fetching data for {timeframe}: {e}") - self.stats["errors"] += 1 + self.logger.error(f"[{self.personality}] Error fetching data for {timeframe}: {e}") + with self.lock: + self.stats["errors"] += 1 return None def generate_signal_scalping( @@ -219,13 +210,13 @@ class SignalGenerator: ) -> Optional[Dict]: """Generate signal using scalping personality""" if len(df) < 21: - self.logger.debug(f"[{timeframe}] Insufficient data: {len(df)} rows") + self.logger.debug(f"[{self.personality}/{timeframe}] Insufficient data: {len(df)} rows") return None latest = df.iloc[-1] prev = df.iloc[-2] - # Check for NULL indicators - skip if essential indicators are missing + # Check for NULL indicators required_cols = [ "ema_9", "ema_21", @@ -236,7 +227,7 @@ class SignalGenerator: "macd_signal", ] if any(pd.isna(latest[col]) for col in required_cols): - self.logger.debug(f"[{timeframe}] Skipping: missing required indicators") + self.logger.debug(f"[{self.personality}/{timeframe}] Skipping: missing required indicators") return None score = 0 @@ -262,12 +253,6 @@ class SignalGenerator: reasons.append("EMA9 crossed below EMA21") signal_type = "SELL" - # Log EMA status for debugging - self.logger.debug( - f"[{timeframe}] EMA9={latest['ema_9']:.2f} vs EMA21={latest['ema_21']:.2f}, " - f"Prev: EMA9={prev['ema_9']:.2f} vs EMA21={prev['ema_21']:.2f}" - ) - # Stochastic if signal_type == "BUY": if latest["stoch_k"] > latest["stoch_d"] and latest["stoch_k"] < 30: @@ -299,13 +284,6 @@ class SignalGenerator: score += weights["macd"] reasons.append("MACD bearish") - # Debug output - if signal_type: - self.logger.debug( - f"[{timeframe}] Potential {signal_type} signal - Score: {score:.3f} " - f"(threshold: {self.config.min_confidence}), Reasons: {len(reasons)}" - ) - if signal_type and score >= self.config.min_confidence: return { "signal": signal_type, @@ -327,7 +305,7 @@ class SignalGenerator: latest = df.iloc[-1] prev = df.iloc[-2] - # Check for NULL indicators - skip if essential indicators are missing + # Check for NULL indicators required_cols = [ "sma_50", "sma_200", @@ -339,7 +317,7 @@ class SignalGenerator: "buy_ratio", ] if any(pd.isna(latest[col]) for col in required_cols): - self.logger.debug(f"Skipping {timeframe}: missing required indicators") + self.logger.debug(f"[{self.personality}/{timeframe}] Skipping: missing required indicators") return None score = 0 @@ -418,7 +396,7 @@ class SignalGenerator: def generate_signal(self, timeframe: str) -> Optional[Dict]: """Main signal generation dispatcher""" # Check cooldown - cooldown_key = f"{self.config.personality}_{timeframe}" + cooldown_key = f"{self.personality}_{timeframe}" if cooldown_key in self.last_signal_time: elapsed = time.time() - self.last_signal_time[cooldown_key] if elapsed < self.config.cooldown_seconds: @@ -428,12 +406,12 @@ class SignalGenerator: if df is None: return None - if self.config.personality == "scalping": + if self.personality == "scalping": signal = self.generate_signal_scalping(df, timeframe) - elif self.config.personality == "swing": + elif self.personality == "swing": signal = self.generate_signal_swing(df, timeframe) else: - self.logger.error(f"Unknown personality: {self.config.personality}") + self.logger.error(f"Unknown personality: {self.personality}") return None if signal: @@ -441,40 +419,76 @@ class SignalGenerator: signal["generated_at"] = datetime.now(timezone.utc).isoformat() # Update stats - self.stats["total_signals"] += 1 - if signal["signal"] == "BUY": - self.stats["buy_signals"] += 1 - else: - self.stats["sell_signals"] += 1 - self.stats["last_signal_time"] = signal["generated_at"] - - self.signal_history.append(signal) + with self.lock: + self.stats["total_signals"] += 1 + if signal["signal"] == "BUY": + self.stats["buy_signals"] += 1 + else: + self.stats["sell_signals"] += 1 + self.stats["last_signal_time"] = signal["generated_at"] + self.signal_history.append(signal) return signal + +# Main Signal Generator Coordinator +class SignalGenerator: + def __init__(self, config: Config, logger: logging.Logger): + self.config = config + self.logger = logger + self.running = False + self.debug_mode = False + + # Create personality engines + self.engines = { + "scalping": PersonalityEngine("scalping", config, logger), + "swing": PersonalityEngine("swing", config, logger), + } + + self.global_stats = { + "uptime_start": datetime.now(timezone.utc), + "config_reloads": 0, + } + + # Unix socket + self.socket = None + self.connections = [] + self.connections_lock = threading.Lock() + + # Health check socket + self.health_socket = None + + # Control socket + self.control_socket = None + + # Thread pool + self.threads = [] + def broadcast_signal(self, signal: Dict): """Broadcast signal to all connected clients""" message = json.dumps(signal) + "\n" message_bytes = message.encode("utf-8") - disconnected = [] - for conn in self.connections: - try: - conn.sendall(message_bytes) - self.logger.info( - f"Sent {signal['signal']} signal: {signal['timeframe']} @ {signal['price']} (conf: {signal['confidence']})" - ) - except Exception as e: - self.logger.warning(f"Failed to send to client: {e}") - disconnected.append(conn) + with self.connections_lock: + disconnected = [] + for conn in self.connections: + try: + conn.sendall(message_bytes) + self.logger.info( + f"[{signal['personality']}] Sent {signal['signal']} signal: " + f"{signal['timeframe']} @ {signal['price']} (conf: {signal['confidence']})" + ) + except Exception as e: + self.logger.warning(f"Failed to send to client: {e}") + disconnected.append(conn) - # Remove disconnected clients - for conn in disconnected: - try: - conn.close() - except: - pass - self.connections.remove(conn) + # Remove disconnected clients + for conn in disconnected: + try: + conn.close() + except: + pass + self.connections.remove(conn) def setup_signal_socket(self): """Setup Unix domain socket for signal streaming""" @@ -485,7 +499,7 @@ class SignalGenerator: self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.socket.bind(self.config.socket_path) self.socket.listen(5) - self.socket.settimeout(0.1) # Non-blocking accept + self.socket.settimeout(0.1) self.logger.info(f"Signal socket listening on {self.config.socket_path}") @@ -545,12 +559,10 @@ class SignalGenerator: ) cursor = conn_c.cursor() - # Check total rows cursor.execute("SELECT COUNT(*) FROM candles") status["candles_db"]["row_count"] = cursor.fetchone()[0] status["candles_db"]["accessible"] = True - # Check per-timeframe data for tf in self.config.timeframes: cursor.execute( "SELECT COUNT(*), MAX(timestamp) FROM candles WHERE timeframe = ?", @@ -578,7 +590,6 @@ class SignalGenerator: status["analysis_db"]["row_count"] = cursor.fetchone()[0] status["analysis_db"]["accessible"] = True - # Check per-timeframe data for tf in self.config.timeframes: cursor.execute( "SELECT COUNT(*), MAX(timestamp) FROM analysis WHERE timeframe = ?", @@ -602,28 +613,56 @@ class SignalGenerator: try: conn, _ = self.health_socket.accept() - uptime = datetime.now(timezone.utc) - self.stats["uptime_start"] + uptime = datetime.now(timezone.utc) - self.global_stats["uptime_start"] db_status = self.check_database_status() + # Aggregate stats from both engines + total_stats = { + "total_signals": 0, + "buy_signals": 0, + "sell_signals": 0, + "errors": 0, + } + + personality_stats = {} + recent_signals = [] + + for name, engine in self.engines.items(): + with engine.lock: + personality_stats[name] = { + "total_signals": engine.stats["total_signals"], + "buy_signals": engine.stats["buy_signals"], + "sell_signals": engine.stats["sell_signals"], + "last_signal": engine.stats["last_signal_time"], + "errors": engine.stats["errors"], + "recent_signals": list(engine.signal_history)[-5:], + } + total_stats["total_signals"] += engine.stats["total_signals"] + total_stats["buy_signals"] += engine.stats["buy_signals"] + total_stats["sell_signals"] += engine.stats["sell_signals"] + total_stats["errors"] += engine.stats["errors"] + recent_signals.extend(list(engine.signal_history)[-5:]) + + # Sort recent signals by timestamp + recent_signals.sort(key=lambda x: x.get("timestamp", 0), reverse=True) + health = { "status": "running", - "personality": self.config.personality, + "mode": "multi-personality", + "personalities": ["scalping", "swing"], "timeframes": self.config.timeframes, "uptime_seconds": int(uptime.total_seconds()), - "total_signals": self.stats["total_signals"], - "buy_signals": self.stats["buy_signals"], - "sell_signals": self.stats["sell_signals"], - "last_signal": self.stats["last_signal_time"], - "errors": self.stats["errors"], + "total_stats": total_stats, + "personality_stats": personality_stats, "connected_clients": len(self.connections), - "recent_signals": list(self.signal_history)[-5:], + "recent_signals": recent_signals[:10], "databases": db_status, "config": { "min_confidence": self.config.min_confidence, "cooldown_seconds": self.config.cooldown_seconds, "lookback": self.config.lookback, - "weights": self.config.weights[self.config.personality], - "reloads": self.stats["config_reloads"], + "weights": self.config.weights, + "reloads": self.global_stats["config_reloads"], }, "debug_mode": self.debug_mode, } @@ -641,7 +680,6 @@ class SignalGenerator: try: conn, _ = self.control_socket.accept() - # Receive command data = conn.recv(4096).decode("utf-8").strip() if not data: @@ -668,25 +706,17 @@ class SignalGenerator: if action == "reload": try: - old_personality = self.config.personality old_confidence = self.config.min_confidence self.config.reload() - self.stats["config_reloads"] += 1 + self.global_stats["config_reloads"] += 1 - self.logger.info( - f"Config reloaded: personality={self.config.personality}, " - f"min_confidence={self.config.min_confidence}" - ) + self.logger.info(f"Config reloaded: min_confidence={self.config.min_confidence}") return { "status": "success", "message": "Configuration reloaded", "changes": { - "personality": { - "old": old_personality, - "new": self.config.personality, - }, "min_confidence": { "old": old_confidence, "new": self.config.min_confidence, @@ -696,21 +726,6 @@ class SignalGenerator: except Exception as e: return {"status": "error", "message": str(e)} - elif action == "set_personality": - personality = cmd.get("value") - if personality in ["scalping", "swing"]: - self.config.personality = personality - self.logger.info(f"Personality changed to: {personality}") - return { - "status": "success", - "message": f"Personality set to {personality}", - } - else: - return { - "status": "error", - "message": "Invalid personality (use 'scalping' or 'swing')", - } - elif action == "set_confidence": try: confidence = float(cmd.get("value")) @@ -755,21 +770,24 @@ class SignalGenerator: } elif action == "clear_cooldowns": - self.last_signal_time.clear() - self.logger.info("Signal cooldowns cleared") + for engine in self.engines.values(): + engine.last_signal_time.clear() + self.logger.info("All signal cooldowns cleared") return {"status": "success", "message": "All cooldowns cleared"} elif action == "reset_stats": - self.stats = { - "total_signals": 0, - "buy_signals": 0, - "sell_signals": 0, - "last_signal_time": None, - "uptime_start": datetime.now(timezone.utc), - "errors": 0, - "config_reloads": self.stats["config_reloads"], - } - self.signal_history.clear() + for engine in self.engines.values(): + with engine.lock: + engine.stats = { + "total_signals": 0, + "buy_signals": 0, + "sell_signals": 0, + "last_signal_time": None, + "errors": 0, + } + engine.signal_history.clear() + + self.global_stats["uptime_start"] = datetime.now(timezone.utc) self.logger.info("Statistics reset") return {"status": "success", "message": "Statistics reset"} @@ -780,15 +798,43 @@ class SignalGenerator: """Accept new client connections""" try: conn, _ = self.socket.accept() - self.connections.append(conn) - self.logger.info( - f"New client connected. Total clients: {len(self.connections)}" - ) + with self.connections_lock: + self.connections.append(conn) + self.logger.info( + f"New client connected. Total clients: {len(self.connections)}" + ) except socket.timeout: pass except Exception as e: self.logger.debug(f"Accept error: {e}") + def personality_worker(self, personality: str): + """Worker thread for a specific personality""" + engine = self.engines[personality] + self.logger.info(f"[{personality}] Worker thread started") + + while self.running: + try: + for timeframe in self.config.timeframes: + try: + signal = engine.generate_signal(timeframe) + if signal: + self.broadcast_signal(signal) + except Exception as e: + self.logger.error(f"[{personality}] Error processing {timeframe}: {e}") + with engine.lock: + engine.stats["errors"] += 1 + + time.sleep(self.config.poll_interval) + + except Exception as e: + self.logger.error(f"[{personality}] Worker error: {e}") + with engine.lock: + engine.stats["errors"] += 1 + time.sleep(1) # Brief pause on error + + self.logger.info(f"[{personality}] Worker thread stopped") + def run(self): """Main processing loop""" self.running = True @@ -796,34 +842,29 @@ class SignalGenerator: self.setup_health_socket() self.setup_control_socket() - self.logger.info( - f"Signal generator started - Personality: {self.config.personality}" - ) + self.logger.info("Multi-personality signal generator started") + self.logger.info(f"Running personalities: scalping, swing") self.logger.info(f"Monitoring timeframes: {', '.join(self.config.timeframes)}") self.logger.info(f"Poll interval: {self.config.poll_interval}s") + # Start personality worker threads + for personality in ["scalping", "swing"]: + thread = threading.Thread( + target=self.personality_worker, + args=(personality,), + name=f"{personality}-worker", + daemon=True + ) + thread.start() + self.threads.append(thread) + try: + # Main thread handles connections and management while self.running: - # Accept new connections self.accept_connections() - - # Handle health checks self.handle_health_checks() - - # Handle control commands self.handle_control_commands() - - # Generate signals for each timeframe - for timeframe in self.config.timeframes: - try: - signal = self.generate_signal(timeframe) - if signal: - self.broadcast_signal(signal) - except Exception as e: - self.logger.error(f"Error processing {timeframe}: {e}") - self.stats["errors"] += 1 - - time.sleep(self.config.poll_interval) + time.sleep(0.1) except KeyboardInterrupt: self.logger.info("Received interrupt signal") @@ -833,12 +874,20 @@ class SignalGenerator: def cleanup(self): """Cleanup resources""" self.logger.info("Shutting down...") + self.running = False - for conn in self.connections: - try: - conn.close() - except: - pass + # Wait for worker threads + self.logger.info("Waiting for worker threads to finish...") + for thread in self.threads: + thread.join(timeout=2.0) + + # Close connections + with self.connections_lock: + for conn in self.connections: + try: + conn.close() + except: + pass if self.socket: self.socket.close() @@ -864,17 +913,17 @@ def main(): generator = SignalGenerator(config, logger) - # Signal handlers for hot-reload and control + # Signal handlers def reload_config(sig, frame): """SIGUSR1: Reload configuration""" logger.info("Received SIGUSR1 - Reloading configuration...") try: - old_personality = config.personality + old_confidence = config.min_confidence config.reload() - generator.stats["config_reloads"] += 1 + generator.global_stats["config_reloads"] += 1 logger.info( f"Configuration reloaded successfully " - f"(personality: {old_personality} -> {config.personality})" + f"(min_confidence: {old_confidence} -> {config.min_confidence})" ) except Exception as e: logger.error(f"Failed to reload configuration: {e}") @@ -899,4 +948,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file