#!/usr/bin/env python3 """ BTC/USDT Continuous Analysis Engine This script continuously reads BTC/USDT candle data from `candles.db`, computes technical indicators (EMA, SMA, RSI, MACD, Bollinger Bands with squeeze, Volume MA), and stores the results in `analysis.db`. It uses SQLite with WAL mode for safe concurrent reads :contentReference[oaicite:0]{index=0}, ensures only closed candles are processed (filtering out the current open candle):contentReference[oaicite:1]{index=1}, and maintains a sliding one-month window of analysis data with incremental vacuuming. A TCP status server provides health information (new candles processed, last timestamps, active timeframes). Configuration (config.json): { "candels_database": "/databases/candles.db", "analysis_database": "./analysis.db", "status_host": "127.0.0.1", "status_port": 9997 } """ import sqlite3 import pandas as pd import json import time import threading import socketserver import logging import os from datetime import datetime, timedelta # ========== Configuration ========== with open('config.json', 'r') as f: config = json.load(f) CANDLES_DB = config.get('candels_database') or config.get('candles_database') ANALYSIS_DB = config.get('analysis_database') STATUS_HOST = config.get('status_host', '127.0.0.1') STATUS_PORT = config.get('status_port', 9997) # ========== Logging Setup ========== log_cfg = config.get("logging", {}) LOG_TO_STDOUT = log_cfg.get("stdout", True) LOG_TO_FILE = log_cfg.get("file", False) LOG_FILE_PATH = log_cfg.get("file_path", "./analyst.log") LOG_LEVEL = log_cfg.get("level", "INFO").upper() LOG_FORMAT = "%(asctime)s [%(name)s] %(levelname)s: %(message)s" DATE_FORMAT = "%Y-%m-%d %H:%M:%S" # Root logger root_logger = logging.getLogger() root_logger.setLevel(LOG_LEVEL) # Prevent duplicate handlers if module reloads root_logger.handlers.clear() formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT) # --- STDOUT handler --- if LOG_TO_STDOUT: stdout_handler = logging.StreamHandler() stdout_handler.setFormatter(formatter) root_logger.addHandler(stdout_handler) # --- FILE handler --- if LOG_TO_FILE: os.makedirs(os.path.dirname(LOG_FILE_PATH), exist_ok=True) file_handler = logging.FileHandler(LOG_FILE_PATH) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) # Named loggers (inherit handlers) logger = logging.getLogger("AnalysisEngine") status_logger = logging.getLogger("StatusServer") logger.info("Logging initialized") logger.info( "stdout=%s file=%s level=%s path=%s", LOG_TO_STDOUT, LOG_TO_FILE, LOG_LEVEL, LOG_FILE_PATH if LOG_TO_FILE else "-" ) # ========== SQLite Connections ========== def get_candles_connection(path): # Connect with timeout and WAL mode to avoid locks:contentReference[oaicite:2]{index=2} conn = sqlite3.connect(path, timeout=10, check_same_thread=False) conn.execute("PRAGMA journal_mode=WAL;") return conn candles_conn = get_candles_connection(CANDLES_DB) analysis_conn = sqlite3.connect(ANALYSIS_DB, timeout=10, check_same_thread=False) analysis_conn.execute("PRAGMA journal_mode=WAL;") # Create analysis table if it doesn't exist analysis_conn.execute(""" CREATE TABLE IF NOT EXISTS analysis ( timeframe TEXT, timestamp INTEGER, ema_9 REAL, ema_21 REAL, sma_50 REAL, sma_200 REAL, rsi_14 REAL, macd REAL, macd_signal REAL, macd_hist REAL, bb_upper REAL, bb_middle REAL, bb_lower REAL, bb_squeeze INTEGER, volume_ma_20 REAL, PRIMARY KEY (timeframe, timestamp) ) """) analysis_conn.commit() # ========== Technical Indicator Functions ========== def compute_indicators(df): """Compute indicators using TA-Lib for accuracy""" import talib close = df['close'].values high = df['high'].values low = df['low'].values volume = df['volume'].values # EMA and SMA df['ema_9'] = talib.EMA(close, timeperiod=9) df['ema_21'] = talib.EMA(close, timeperiod=21) df['sma_50'] = talib.SMA(close, timeperiod=50) df['sma_200'] = talib.SMA(close, timeperiod=200) # RSI (14) - Proper calculation df['rsi_14'] = talib.RSI(close, timeperiod=14) # MACD (12,26,9) macd, macd_signal, macd_hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9) df['macd'] = macd df['macd_signal'] = macd_signal df['macd_hist'] = macd_hist # Bollinger Bands (20,2) bb_upper, bb_middle, bb_lower = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0) df['bb_upper'] = bb_upper df['bb_middle'] = bb_middle df['bb_lower'] = bb_lower # Bollinger Squeeze bb_width = bb_upper - bb_lower bb_width_series = pd.Series(bb_width) rolling_min_width = bb_width_series.rolling(window=20, min_periods=20).min() df['bb_squeeze'] = (bb_width_series <= rolling_min_width).fillna(0).astype(int) # Volume MA df['volume_ma_20'] = talib.SMA(volume, timeperiod=20) return df # ========== Health Check Server ========== status_lock = threading.Lock() status_data = {tf: {'new': 0, 'last': None} for tf in ["1m", "5m", "15m", "1h"]} active_timeframes = list(status_data.keys()) class StatusHandler(socketserver.BaseRequestHandler): def handle(self): with status_lock: report = { 'timeframes': { tf: {'new': status_data[tf]['new'], 'last': status_data[tf]['last']} for tf in status_data }, 'active_timeframes': active_timeframes } self.request.sendall(json.dumps(report).encode()) def start_status_server(host, port): server = socketserver.ThreadingTCPServer((host, port), StatusHandler) status_logger.info(f"Status server listening on {host}:{port}") threading.Thread(target=server.serve_forever, daemon=True).start() start_status_server(STATUS_HOST, STATUS_PORT) # ========== Main Loop ========== timeframes = {"1m":60, "5m":300, "15m":900, "1h":3600} logger.info("Starting analysis loop") while True: now = int(time.time()) one_month_ago = now - 30*24*3600 for tf, tf_seconds in timeframes.items(): try: cur = analysis_conn.cursor() cur.execute("SELECT MAX(timestamp) FROM analysis WHERE timeframe=?", (tf,)) row = cur.fetchone() last_processed = row[0] if row and row[0] is not None else None if last_processed: begin_time = last_processed else: begin_time = one_month_ago # Only closed candles: timestamp < current_window_start:contentReference[oaicite:5]{index=5} window_start = (now // tf_seconds) * tf_seconds query = """ SELECT timestamp, open, high, low, close, volume FROM candles WHERE timeframe = ? AND timestamp >= ? AND timestamp < ? ORDER BY timestamp ASC """ df = pd.read_sql_query(query, candles_conn, params=(tf, begin_time, window_start)) if df.empty: new_count = 0 else: df = compute_indicators(df) if last_processed: new_df = df[df['timestamp'] > last_processed].copy() else: new_df = df.copy() new_count = len(new_df) if new_count > 0: for _, row in new_df.iterrows(): analysis_conn.execute(""" INSERT OR IGNORE INTO analysis ( timeframe, timestamp, ema_9, ema_21, sma_50, sma_200, rsi_14, macd, macd_signal, macd_hist, bb_upper, bb_middle, bb_lower, bb_squeeze, volume_ma_20 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( tf, int(row['timestamp']), float(row['ema_9']) if pd.notnull(row['ema_9']) else None, float(row['ema_21']) if pd.notnull(row['ema_21']) else None, float(row['sma_50']) if pd.notnull(row['sma_50']) else None, float(row['sma_200']) if pd.notnull(row['sma_200']) else None, float(row['rsi_14']) if pd.notnull(row['rsi_14']) else None, float(row['macd']) if pd.notnull(row['macd']) else None, float(row['macd_signal']) if pd.notnull(row['macd_signal']) else None, float(row['macd_hist']) if pd.notnull(row['macd_hist']) else None, float(row['bb_upper']) if pd.notnull(row['bb_upper']) else None, float(row['bb_middle']) if pd.notnull(row['bb_middle']) else None, float(row['bb_lower']) if pd.notnull(row['bb_lower']) else None, int(row['bb_squeeze']), float(row['volume_ma_20']) if pd.notnull(row['volume_ma_20']) else None )) analysis_conn.commit() with status_lock: status_data[tf]['new'] = new_count if new_count > 0: status_data[tf]['last'] = int(new_df['timestamp'].max()) # Log processing result logger.info(f"[{tf}] New candles: {new_count}, Last timestamp: {status_data[tf]['last']}") except Exception as e: logger.error(f"Error processing timeframe {tf}: {e}") # Vacuum/cleanup for sliding window effect try: analysis_conn.execute("PRAGMA incremental_vacuum;") analysis_conn.execute("DELETE FROM analysis WHERE timestamp < ?", (one_month_ago,)) analysis_conn.commit() except Exception as e: logger.error(f"Error during vacuum/cleanup: {e}") time.sleep(30)