Files
BytbitBTC/trader/paper_trader.py

702 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Paper Trading Bot
Executes paper trades based on signals from the signal generator
Real-time TUI interface with trade tracking
"""
import sqlite3
import json
import socket
import time
import signal
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, List
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from collections import deque
from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.layout import Layout
from rich.panel import Panel
from rich.text import Text
from rich import box
class TradeStatus(Enum):
OPEN = "OPEN"
CLOSED = "CLOSED"
STOPPED = "STOPPED"
class TradeType(Enum):
LONG = "LONG"
SHORT = "SHORT"
@dataclass
class Trade:
id: Optional[int]
trade_type: str
entry_price: float
entry_time: str
position_size: float
stop_loss: float
take_profit: Optional[float]
timeframe: str
personality: str
signal_confidence: float
status: str
exit_price: Optional[float] = None
exit_time: Optional[str] = None
pnl: Optional[float] = None
pnl_percent: Optional[float] = None
exit_reason: Optional[str] = None
class Config:
def __init__(self, config_path: str = "config.json"):
self.config_path = config_path
self.reload()
def reload(self):
with open(self.config_path, "r") as f:
data = json.load(f)
# Database paths
self.trades_db = data.get("trades_db", "trades.db")
self.candles_db = data.get("candles_db", "../onramp/market_data.db")
# Signal socket
self.signal_socket = data.get("signal_socket", "/tmp/signals.sock")
# Trading parameters
self.initial_balance = data.get("initial_balance", 10000.0)
self.position_size_percent = data.get("position_size_percent", 2.0) # % of balance per trade
self.max_positions = data.get("max_positions", 3)
self.stop_loss_percent = data.get("stop_loss_percent", 2.0)
self.take_profit_percent = data.get("take_profit_percent", 4.0)
# Strategy filters
self.min_confidence = data.get("min_confidence", 0.5)
self.enabled_personalities = data.get("enabled_personalities", ["scalping", "swing"])
self.enabled_timeframes = data.get("enabled_timeframes", ["1m", "5m"])
# Risk management
self.max_daily_loss_percent = data.get("max_daily_loss_percent", 5.0)
self.trailing_stop = data.get("trailing_stop", False)
self.trailing_stop_percent = data.get("trailing_stop_percent", 1.5)
class TradeDatabase:
def __init__(self, db_path: str):
self.db_path = db_path
self.init_database()
def init_database(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trade_type TEXT NOT NULL,
entry_price REAL NOT NULL,
entry_time TEXT NOT NULL,
position_size REAL NOT NULL,
stop_loss REAL NOT NULL,
take_profit REAL,
timeframe TEXT NOT NULL,
personality TEXT NOT NULL,
signal_confidence REAL NOT NULL,
status TEXT NOT NULL,
exit_price REAL,
exit_time TEXT,
pnl REAL,
pnl_percent REAL,
exit_reason TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS balance_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
balance REAL NOT NULL,
equity REAL NOT NULL,
open_positions INTEGER NOT NULL
)
""")
conn.commit()
conn.close()
def save_trade(self, trade: Trade) -> int:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if trade.id is None:
cursor.execute("""
INSERT INTO trades (
trade_type, entry_price, entry_time, position_size,
stop_loss, take_profit, timeframe, personality,
signal_confidence, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
trade.trade_type, trade.entry_price, trade.entry_time,
trade.position_size, trade.stop_loss, trade.take_profit,
trade.timeframe, trade.personality, trade.signal_confidence,
trade.status
))
trade_id = cursor.lastrowid
else:
cursor.execute("""
UPDATE trades SET
exit_price = ?, exit_time = ?, pnl = ?,
pnl_percent = ?, status = ?, exit_reason = ?
WHERE id = ?
""", (
trade.exit_price, trade.exit_time, trade.pnl,
trade.pnl_percent, trade.status, trade.exit_reason,
trade.id
))
trade_id = trade.id
conn.commit()
conn.close()
return trade_id
def get_open_trades(self) -> List[Trade]:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM trades WHERE status = 'OPEN' ORDER BY entry_time DESC")
rows = cursor.fetchall()
conn.close()
trades = []
for row in rows:
trades.append(Trade(**dict(row)))
return trades
def get_recent_trades(self, limit: int = 10) -> List[Trade]:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM trades
WHERE status != 'OPEN'
ORDER BY exit_time DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
conn.close()
trades = []
for row in rows:
trades.append(Trade(**dict(row)))
return trades
def save_balance_snapshot(self, balance: float, equity: float, open_positions: int):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO balance_history (timestamp, balance, equity, open_positions)
VALUES (?, ?, ?, ?)
""", (datetime.now(timezone.utc).isoformat(), balance, equity, open_positions))
conn.commit()
conn.close()
def get_statistics(self) -> Dict:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Total trades
cursor.execute("SELECT COUNT(*) FROM trades WHERE status != 'OPEN'")
total_trades = cursor.fetchone()[0]
# Win/Loss
cursor.execute("SELECT COUNT(*) FROM trades WHERE status != 'OPEN' AND pnl > 0")
wins = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM trades WHERE status != 'OPEN' AND pnl < 0")
losses = cursor.fetchone()[0]
# Total PnL
cursor.execute("SELECT SUM(pnl) FROM trades WHERE status != 'OPEN'")
total_pnl = cursor.fetchone()[0] or 0.0
# Average win/loss
cursor.execute("SELECT AVG(pnl) FROM trades WHERE status != 'OPEN' AND pnl > 0")
avg_win = cursor.fetchone()[0] or 0.0
cursor.execute("SELECT AVG(pnl) FROM trades WHERE status != 'OPEN' AND pnl < 0")
avg_loss = cursor.fetchone()[0] or 0.0
conn.close()
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0.0
return {
"total_trades": total_trades,
"wins": wins,
"losses": losses,
"win_rate": win_rate,
"total_pnl": total_pnl,
"avg_win": avg_win,
"avg_loss": avg_loss,
}
class PriceMonitor:
"""Monitor current price from candles database"""
def __init__(self, candles_db: str):
self.candles_db = candles_db
self.current_price = None
self.last_update = None
def get_current_price(self) -> Optional[float]:
"""Get most recent close price from 1m timeframe"""
try:
conn = sqlite3.connect(f"file:{self.candles_db}?mode=ro", uri=True, timeout=5)
cursor = conn.cursor()
cursor.execute("""
SELECT close, timestamp
FROM candles
WHERE timeframe = '1m'
ORDER BY timestamp DESC
LIMIT 1
""")
row = cursor.fetchone()
conn.close()
if row:
self.current_price = float(row[0])
self.last_update = row[1]
return self.current_price
return None
except Exception as e:
return None
class PaperTrader:
def __init__(self, config: Config):
self.config = config
self.db = TradeDatabase(config.trades_db)
self.price_monitor = PriceMonitor(config.candles_db)
self.balance = config.initial_balance
self.open_trades: List[Trade] = []
self.recent_signals = deque(maxlen=20)
self.running = False
self.lock = threading.Lock()
# Stats
self.stats = {
"signals_received": 0,
"signals_filtered": 0,
"trades_opened": 0,
"trades_closed": 0,
}
# Load open trades from DB
self.open_trades = self.db.get_open_trades()
def connect_to_signals(self) -> socket.socket:
"""Connect to signal generator socket"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.config.signal_socket)
return sock
def calculate_position_size(self, price: float) -> float:
"""Calculate position size based on balance and config"""
position_value = self.balance * (self.config.position_size_percent / 100)
return position_value / price
def should_take_signal(self, signal: Dict) -> bool:
"""Filter signals based on config"""
# Check confidence
if signal.get("confidence", 0) < self.config.min_confidence:
return False
# Check personality
if signal.get("personality") not in self.config.enabled_personalities:
return False
# Check timeframe
if signal.get("timeframe") not in self.config.enabled_timeframes:
return False
# Check max positions
if len(self.open_trades) >= self.config.max_positions:
return False
return True
def open_trade(self, signal: Dict):
"""Open a new paper trade"""
with self.lock:
trade_type = TradeType.LONG.value if signal["signal"] == "BUY" else TradeType.SHORT.value
entry_price = signal["price"]
position_size = self.calculate_position_size(entry_price)
# Calculate stop loss and take profit
if trade_type == TradeType.LONG.value:
stop_loss = entry_price * (1 - self.config.stop_loss_percent / 100)
take_profit = entry_price * (1 + self.config.take_profit_percent / 100)
else:
stop_loss = entry_price * (1 + self.config.stop_loss_percent / 100)
take_profit = entry_price * (1 - self.config.take_profit_percent / 100)
trade = Trade(
id=None,
trade_type=trade_type,
entry_price=entry_price,
entry_time=signal.get("generated_at", datetime.now(timezone.utc).isoformat()),
position_size=position_size,
stop_loss=stop_loss,
take_profit=take_profit,
timeframe=signal["timeframe"],
personality=signal["personality"],
signal_confidence=signal["confidence"],
status=TradeStatus.OPEN.value,
)
trade_id = self.db.save_trade(trade)
trade.id = trade_id
self.open_trades.append(trade)
self.stats["trades_opened"] += 1
def check_exits(self, current_price: float):
"""Check if any open trades should be closed"""
with self.lock:
for trade in self.open_trades[:]:
should_exit = False
exit_reason = None
if trade.trade_type == TradeType.LONG.value:
# Long position checks
if current_price <= trade.stop_loss:
should_exit = True
exit_reason = "Stop Loss"
elif trade.take_profit and current_price >= trade.take_profit:
should_exit = True
exit_reason = "Take Profit"
else:
# Short position checks
if current_price >= trade.stop_loss:
should_exit = True
exit_reason = "Stop Loss"
elif trade.take_profit and current_price <= trade.take_profit:
should_exit = True
exit_reason = "Take Profit"
if should_exit:
self.close_trade(trade, current_price, exit_reason)
def close_trade(self, trade: Trade, exit_price: float, reason: str):
"""Close a trade and calculate PnL"""
trade.exit_price = exit_price
trade.exit_time = datetime.now(timezone.utc).isoformat()
trade.exit_reason = reason
# Calculate PnL
if trade.trade_type == TradeType.LONG.value:
pnl = (exit_price - trade.entry_price) * trade.position_size
else:
pnl = (trade.entry_price - exit_price) * trade.position_size
trade.pnl = pnl
trade.pnl_percent = (pnl / (trade.entry_price * trade.position_size)) * 100
trade.status = TradeStatus.CLOSED.value
self.balance += pnl
self.db.save_trade(trade)
self.open_trades.remove(trade)
self.stats["trades_closed"] += 1
def get_equity(self, current_price: float) -> float:
"""Calculate current equity (balance + unrealized PnL)"""
unrealized_pnl = 0.0
for trade in self.open_trades:
if trade.trade_type == TradeType.LONG.value:
pnl = (current_price - trade.entry_price) * trade.position_size
else:
pnl = (trade.entry_price - current_price) * trade.position_size
unrealized_pnl += pnl
return self.balance + unrealized_pnl
def process_signal(self, signal: Dict):
"""Process incoming signal"""
self.stats["signals_received"] += 1
self.recent_signals.append(signal)
if self.should_take_signal(signal):
self.open_trade(signal)
else:
self.stats["signals_filtered"] += 1
def signal_listener_thread(self):
"""Background thread to listen for signals"""
while self.running:
try:
sock = self.connect_to_signals()
sock.settimeout(1.0)
buffer = ""
while self.running:
try:
data = sock.recv(4096).decode("utf-8")
if not data:
break
buffer += data
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.strip():
signal = json.loads(line)
self.process_signal(signal)
except socket.timeout:
continue
except json.JSONDecodeError:
continue
sock.close()
except Exception as e:
if self.running:
time.sleep(5) # Wait before reconnecting
def price_monitor_thread(self):
"""Background thread to monitor prices and check exits"""
while self.running:
current_price = self.price_monitor.get_current_price()
if current_price and self.open_trades:
self.check_exits(current_price)
time.sleep(1)
def build_tui(self) -> Layout:
"""Build the TUI layout"""
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="body"),
Layout(name="footer", size=3),
)
layout["body"].split_row(
Layout(name="left"),
Layout(name="right"),
)
layout["left"].split_column(
Layout(name="stats", size=10),
Layout(name="open_trades"),
)
layout["right"].split_column(
Layout(name="recent_closed", size=15),
Layout(name="signals"),
)
return layout
def render_header(self) -> Panel:
"""Render header panel"""
current_price = self.price_monitor.current_price or 0.0
equity = self.get_equity(current_price)
pnl = equity - self.config.initial_balance
pnl_percent = (pnl / self.config.initial_balance) * 100
pnl_color = "green" if pnl >= 0 else "red"
text = Text()
text.append("PAPER TRADING BOT", style="bold cyan")
text.append(f" | BTC: ${current_price:,.2f}", style="yellow")
text.append(f" | Balance: ${self.balance:,.2f}", style="white")
text.append(f" | Equity: ${equity:,.2f}", style="white")
text.append(f" | PnL: ${pnl:+,.2f} ({pnl_percent:+.2f}%)", style=pnl_color)
return Panel(text, style="bold")
def render_stats(self) -> Panel:
"""Render statistics panel"""
db_stats = self.db.get_statistics()
table = Table(show_header=False, box=box.SIMPLE)
table.add_column("Metric", style="cyan")
table.add_column("Value", justify="right")
table.add_row("Total Trades", str(db_stats["total_trades"]))
table.add_row("Wins", f"[green]{db_stats['wins']}[/green]")
table.add_row("Losses", f"[red]{db_stats['losses']}[/red]")
table.add_row("Win Rate", f"{db_stats['win_rate']:.1f}%")
table.add_row("Total PnL", f"${db_stats['total_pnl']:,.2f}")
table.add_row("", "")
table.add_row("Signals RX", str(self.stats["signals_received"]))
table.add_row("Filtered", str(self.stats["signals_filtered"]))
return Panel(table, title="Statistics", border_style="blue")
def render_open_trades(self) -> Panel:
"""Render open trades table"""
table = Table(box=box.SIMPLE)
table.add_column("ID", style="cyan")
table.add_column("Type")
table.add_column("Entry", justify="right")
table.add_column("Current", justify="right")
table.add_column("PnL", justify="right")
table.add_column("TF")
table.add_column("Pers")
current_price = self.price_monitor.current_price or 0.0
for trade in self.open_trades:
if trade.trade_type == TradeType.LONG.value:
pnl = (current_price - trade.entry_price) * trade.position_size
type_color = "green"
else:
pnl = (trade.entry_price - current_price) * trade.position_size
type_color = "red"
pnl_color = "green" if pnl >= 0 else "red"
table.add_row(
str(trade.id),
f"[{type_color}]{trade.trade_type}[/{type_color}]",
f"${trade.entry_price:,.2f}",
f"${current_price:,.2f}",
f"[{pnl_color}]${pnl:+,.2f}[/{pnl_color}]",
trade.timeframe,
trade.personality[:4].upper(),
)
return Panel(table, title=f"Open Positions ({len(self.open_trades)})", border_style="green")
def render_recent_closed(self) -> Panel:
"""Render recent closed trades"""
table = Table(box=box.SIMPLE)
table.add_column("ID", style="cyan")
table.add_column("Type")
table.add_column("PnL", justify="right")
table.add_column("Exit")
recent = self.db.get_recent_trades(10)
for trade in recent:
type_color = "green" if trade.trade_type == TradeType.LONG.value else "red"
pnl_color = "green" if trade.pnl and trade.pnl >= 0 else "red"
table.add_row(
str(trade.id),
f"[{type_color}]{trade.trade_type}[/{type_color}]",
f"[{pnl_color}]${trade.pnl:+,.2f}[/{pnl_color}]" if trade.pnl else "N/A",
trade.exit_reason or "N/A",
)
return Panel(table, title="Recent Closed Trades", border_style="yellow")
def render_signals(self) -> Panel:
"""Render recent signals"""
table = Table(box=box.SIMPLE)
table.add_column("Signal")
table.add_column("TF")
table.add_column("Conf", justify="right")
table.add_column("Pers")
for signal in list(self.recent_signals)[-10:]:
signal_color = "green" if signal["signal"] == "BUY" else "red"
table.add_row(
f"[{signal_color}]{signal['signal']}[/{signal_color}]",
signal["timeframe"],
f"{signal['confidence']:.2f}",
signal["personality"][:4].upper(),
)
return Panel(table, title="Recent Signals", border_style="magenta")
def render_footer(self) -> Panel:
"""Render footer panel"""
text = Text()
text.append("Press ", style="dim")
text.append("Ctrl+C", style="bold red")
text.append(" to exit", style="dim")
return Panel(text, style="dim")
def update_display(self, layout: Layout):
"""Update all TUI panels"""
layout["header"].update(self.render_header())
layout["stats"].update(self.render_stats())
layout["open_trades"].update(self.render_open_trades())
layout["recent_closed"].update(self.render_recent_closed())
layout["signals"].update(self.render_signals())
layout["footer"].update(self.render_footer())
def run(self):
"""Main run loop with TUI"""
self.running = True
# Start background threads
signal_thread = threading.Thread(target=self.signal_listener_thread, daemon=True)
price_thread = threading.Thread(target=self.price_monitor_thread, daemon=True)
signal_thread.start()
price_thread.start()
# TUI
console = Console()
layout = self.build_tui()
try:
with Live(layout, console=console, screen=True, refresh_per_second=2):
while self.running:
self.update_display(layout)
time.sleep(0.5)
except KeyboardInterrupt:
pass
finally:
self.running = False
signal_thread.join(timeout=2)
price_thread.join(timeout=2)
def main():
config = Config()
trader = PaperTrader(config)
def shutdown(sig, frame):
trader.running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
trader.run()
if __name__ == "__main__":
main()