268 lines
9.9 KiB
Python
Executable File
268 lines
9.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
BTC/USDT Continuous Analysis Engine
|
|
|
|
This script continuously reads BTC/USDT candle data from `candles.db`,
|
|
computes technical indicators (EMA, SMA, RSI, MACD, Bollinger Bands with squeeze, Volume MA),
|
|
and stores the results in `analysis.db`. It uses SQLite with WAL mode for safe concurrent reads :contentReference[oaicite:0]{index=0},
|
|
ensures only closed candles are processed (filtering out the current open candle):contentReference[oaicite:1]{index=1},
|
|
and maintains a sliding one-month window of analysis data with incremental vacuuming.
|
|
A TCP status server provides health information (new candles processed, last timestamps, active timeframes).
|
|
|
|
Configuration (config.json):
|
|
{
|
|
"candels_database": "/databases/candles.db",
|
|
"analysis_database": "./analysis.db",
|
|
"status_host": "127.0.0.1",
|
|
"status_port": 9997
|
|
}
|
|
"""
|
|
|
|
import sqlite3
|
|
import pandas as pd
|
|
import json
|
|
import time
|
|
import threading
|
|
import socketserver
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
# ========== Configuration ==========
|
|
with open('config.json', 'r') as f:
|
|
config = json.load(f)
|
|
CANDLES_DB = config.get('candels_database') or config.get('candles_database')
|
|
ANALYSIS_DB = config.get('analysis_database')
|
|
STATUS_HOST = config.get('status_host', '127.0.0.1')
|
|
STATUS_PORT = config.get('status_port', 9997)
|
|
|
|
# ========== Logging Setup ==========
|
|
|
|
log_cfg = config.get("logging", {})
|
|
|
|
LOG_TO_STDOUT = log_cfg.get("stdout", True)
|
|
LOG_TO_FILE = log_cfg.get("file", False)
|
|
LOG_FILE_PATH = log_cfg.get("file_path", "./analyst.log")
|
|
LOG_LEVEL = log_cfg.get("level", "INFO").upper()
|
|
|
|
LOG_FORMAT = "%(asctime)s [%(name)s] %(levelname)s: %(message)s"
|
|
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
|
|
# Root logger
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(LOG_LEVEL)
|
|
|
|
# Prevent duplicate handlers if module reloads
|
|
root_logger.handlers.clear()
|
|
|
|
formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT)
|
|
|
|
# --- STDOUT handler ---
|
|
if LOG_TO_STDOUT:
|
|
stdout_handler = logging.StreamHandler()
|
|
stdout_handler.setFormatter(formatter)
|
|
root_logger.addHandler(stdout_handler)
|
|
|
|
# --- FILE handler ---
|
|
if LOG_TO_FILE:
|
|
os.makedirs(os.path.dirname(LOG_FILE_PATH), exist_ok=True)
|
|
|
|
file_handler = logging.FileHandler(LOG_FILE_PATH)
|
|
file_handler.setFormatter(formatter)
|
|
root_logger.addHandler(file_handler)
|
|
|
|
# Named loggers (inherit handlers)
|
|
logger = logging.getLogger("AnalysisEngine")
|
|
status_logger = logging.getLogger("StatusServer")
|
|
|
|
logger.info("Logging initialized")
|
|
logger.info(
|
|
"stdout=%s file=%s level=%s path=%s",
|
|
LOG_TO_STDOUT,
|
|
LOG_TO_FILE,
|
|
LOG_LEVEL,
|
|
LOG_FILE_PATH if LOG_TO_FILE else "-"
|
|
)
|
|
|
|
|
|
# ========== SQLite Connections ==========
|
|
def get_candles_connection(path):
|
|
# Connect with timeout and WAL mode to avoid locks:contentReference[oaicite:2]{index=2}
|
|
conn = sqlite3.connect(path, timeout=10, check_same_thread=False)
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
return conn
|
|
|
|
candles_conn = get_candles_connection(CANDLES_DB)
|
|
analysis_conn = sqlite3.connect(ANALYSIS_DB, timeout=10, check_same_thread=False)
|
|
analysis_conn.execute("PRAGMA journal_mode=WAL;")
|
|
|
|
# Create analysis table if it doesn't exist
|
|
analysis_conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS analysis (
|
|
timeframe TEXT,
|
|
timestamp INTEGER,
|
|
ema_9 REAL,
|
|
ema_21 REAL,
|
|
sma_50 REAL,
|
|
sma_200 REAL,
|
|
rsi_14 REAL,
|
|
macd REAL,
|
|
macd_signal REAL,
|
|
macd_hist REAL,
|
|
bb_upper REAL,
|
|
bb_middle REAL,
|
|
bb_lower REAL,
|
|
bb_squeeze INTEGER,
|
|
volume_ma_20 REAL,
|
|
PRIMARY KEY (timeframe, timestamp)
|
|
)
|
|
""")
|
|
analysis_conn.commit()
|
|
|
|
# ========== Technical Indicator Functions ==========
|
|
def compute_indicators(df):
|
|
"""Compute indicators using TA-Lib for accuracy"""
|
|
import talib
|
|
|
|
close = df['close'].values
|
|
high = df['high'].values
|
|
low = df['low'].values
|
|
volume = df['volume'].values
|
|
|
|
# EMA and SMA
|
|
df['ema_9'] = talib.EMA(close, timeperiod=9)
|
|
df['ema_21'] = talib.EMA(close, timeperiod=21)
|
|
df['sma_50'] = talib.SMA(close, timeperiod=50)
|
|
df['sma_200'] = talib.SMA(close, timeperiod=200)
|
|
|
|
# RSI (14) - Proper calculation
|
|
df['rsi_14'] = talib.RSI(close, timeperiod=14)
|
|
|
|
# MACD (12,26,9)
|
|
macd, macd_signal, macd_hist = talib.MACD(close, fastperiod=12, slowperiod=26, signalperiod=9)
|
|
df['macd'] = macd
|
|
df['macd_signal'] = macd_signal
|
|
df['macd_hist'] = macd_hist
|
|
|
|
# Bollinger Bands (20,2)
|
|
bb_upper, bb_middle, bb_lower = talib.BBANDS(close, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
|
|
df['bb_upper'] = bb_upper
|
|
df['bb_middle'] = bb_middle
|
|
df['bb_lower'] = bb_lower
|
|
|
|
# Bollinger Squeeze
|
|
bb_width = bb_upper - bb_lower
|
|
bb_width_series = pd.Series(bb_width)
|
|
rolling_min_width = bb_width_series.rolling(window=20, min_periods=20).min()
|
|
df['bb_squeeze'] = (bb_width_series <= rolling_min_width).fillna(0).astype(int)
|
|
|
|
# Volume MA
|
|
df['volume_ma_20'] = talib.SMA(volume, timeperiod=20)
|
|
|
|
return df
|
|
|
|
# ========== Health Check Server ==========
|
|
status_lock = threading.Lock()
|
|
status_data = {tf: {'new': 0, 'last': None} for tf in ["1m", "5m", "15m", "1h"]}
|
|
active_timeframes = list(status_data.keys())
|
|
|
|
class StatusHandler(socketserver.BaseRequestHandler):
|
|
def handle(self):
|
|
with status_lock:
|
|
report = {
|
|
'timeframes': {
|
|
tf: {'new': status_data[tf]['new'], 'last': status_data[tf]['last']}
|
|
for tf in status_data
|
|
},
|
|
'active_timeframes': active_timeframes
|
|
}
|
|
self.request.sendall(json.dumps(report).encode())
|
|
|
|
def start_status_server(host, port):
|
|
server = socketserver.ThreadingTCPServer((host, port), StatusHandler)
|
|
status_logger.info(f"Status server listening on {host}:{port}")
|
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
|
|
|
start_status_server(STATUS_HOST, STATUS_PORT)
|
|
|
|
# ========== Main Loop ==========
|
|
timeframes = {"1m":60, "5m":300, "15m":900, "1h":3600}
|
|
logger.info("Starting analysis loop")
|
|
|
|
while True:
|
|
now = int(time.time())
|
|
one_month_ago = now - 30*24*3600
|
|
for tf, tf_seconds in timeframes.items():
|
|
try:
|
|
cur = analysis_conn.cursor()
|
|
cur.execute("SELECT MAX(timestamp) FROM analysis WHERE timeframe=?", (tf,))
|
|
row = cur.fetchone()
|
|
last_processed = row[0] if row and row[0] is not None else None
|
|
if last_processed:
|
|
begin_time = last_processed
|
|
else:
|
|
begin_time = one_month_ago
|
|
# Only closed candles: timestamp < current_window_start:contentReference[oaicite:5]{index=5}
|
|
window_start = (now // tf_seconds) * tf_seconds
|
|
query = """
|
|
SELECT timestamp, open, high, low, close, volume
|
|
FROM candles
|
|
WHERE timeframe = ?
|
|
AND timestamp >= ?
|
|
AND timestamp < ?
|
|
ORDER BY timestamp ASC
|
|
"""
|
|
df = pd.read_sql_query(query, candles_conn, params=(tf, begin_time, window_start))
|
|
if df.empty:
|
|
new_count = 0
|
|
else:
|
|
df = compute_indicators(df)
|
|
if last_processed:
|
|
new_df = df[df['timestamp'] > last_processed].copy()
|
|
else:
|
|
new_df = df.copy()
|
|
new_count = len(new_df)
|
|
if new_count > 0:
|
|
for _, row in new_df.iterrows():
|
|
analysis_conn.execute("""
|
|
INSERT OR IGNORE INTO analysis (
|
|
timeframe, timestamp, ema_9, ema_21, sma_50, sma_200,
|
|
rsi_14, macd, macd_signal, macd_hist,
|
|
bb_upper, bb_middle, bb_lower, bb_squeeze,
|
|
volume_ma_20
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
tf,
|
|
int(row['timestamp']),
|
|
float(row['ema_9']) if pd.notnull(row['ema_9']) else None,
|
|
float(row['ema_21']) if pd.notnull(row['ema_21']) else None,
|
|
float(row['sma_50']) if pd.notnull(row['sma_50']) else None,
|
|
float(row['sma_200']) if pd.notnull(row['sma_200']) else None,
|
|
float(row['rsi_14']) if pd.notnull(row['rsi_14']) else None,
|
|
float(row['macd']) if pd.notnull(row['macd']) else None,
|
|
float(row['macd_signal']) if pd.notnull(row['macd_signal']) else None,
|
|
float(row['macd_hist']) if pd.notnull(row['macd_hist']) else None,
|
|
float(row['bb_upper']) if pd.notnull(row['bb_upper']) else None,
|
|
float(row['bb_middle']) if pd.notnull(row['bb_middle']) else None,
|
|
float(row['bb_lower']) if pd.notnull(row['bb_lower']) else None,
|
|
int(row['bb_squeeze']),
|
|
float(row['volume_ma_20']) if pd.notnull(row['volume_ma_20']) else None
|
|
))
|
|
analysis_conn.commit()
|
|
with status_lock:
|
|
status_data[tf]['new'] = new_count
|
|
if new_count > 0:
|
|
status_data[tf]['last'] = int(new_df['timestamp'].max())
|
|
# Log processing result
|
|
logger.info(f"[{tf}] New candles: {new_count}, Last timestamp: {status_data[tf]['last']}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing timeframe {tf}: {e}")
|
|
# Vacuum/cleanup for sliding window effect
|
|
try:
|
|
analysis_conn.execute("PRAGMA incremental_vacuum;")
|
|
analysis_conn.execute("DELETE FROM analysis WHERE timestamp < ?", (one_month_ago,))
|
|
analysis_conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"Error during vacuum/cleanup: {e}")
|
|
time.sleep(30)
|