#!/usr/bin/python3 import krakenex import json, sqlite3 import requests, os, time import threading database = "btc_ohlc.db" def Checkthedatabase(): ## Some sanity for the database # check if btc_timeseries.db database file exists if not os.path.exists(database): db = sqlite3.connect(database) db.execute("""\ CREATE TABLE ohlc ( id INTEGER PRIMARY KEY, exchange TEXT NOT NULL, timestamp INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume_quote REAL NOT NULL, volume_base REAL NOT NULL, trades INTEGER NOT NULL )""") db.commit() db.close() db = sqlite3.connect("database") # Check if the table exists table_exists = False cursor = db.execute("PRAGMA table_info(ohlc)") for row in cursor: table_exists = True # Create the table if it doesn't exist if not table_exists: db.execute("""\ CREATE TABLE ohlc ( id INTEGER PRIMARY KEY, exchange TEXT NOT NULL, timestamp INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume_quote REAL NOT NULL, volume_base REAL NOT NULL, trades INTEGER NOT NULL )""") db.commit() def fetch_kraken(): ### Kraken kraken = krakenex.API() response = kraken.query_public('OHLC', {'pair': 'BTCUSD', 'interval': 240 }) ohlc_data = response['result']['XXBTZUSD'] candle_stick_data = { 'exchange': 'kraken', 'timestamp': ohlc_data[1][0], 'open': ohlc_data[0][1], 'high': max(item[2] for item in ohlc_data), 'low': min(item[3] for item in ohlc_data), 'close': ohlc_data[-1][4], 'volume_quote': sum(float(item[5]) for item in ohlc_data), 'volume_base': sum(float(item[6]) for item in ohlc_data), 'trades': sum(item[7] for item in ohlc_data), } kraken_json = json.dumps(candle_stick_data, indent=2) #print("Kraken: OK") #print(kraken_json) return kraken_json def fetch_bitstamp(): ## Bitstamp response = requests.get("https://www.bitstamp.net/api/v2/ohlc/btcusd/?step=300&limit=1") if response.status_code == 200: # check if the request was successful bitstamp_data = response.json() ohlc_data = bitstamp_data["data"]["ohlc"] candle_stick_data = { 'exchange': 'bitstamp', 'timestamp': int(ohlc_data[0]['timestamp']), 'open': float(ohlc_data[0]['open']), 'high': float(ohlc_data[0]['high']), 'low': float(ohlc_data[0]['low']), 'close': float(ohlc_data[0]['close']), 'volume_quote': float(ohlc_data[0]['volume']), 'volume_base': 0, # not provided by Bitstamp API 'trades': 0, # not provided by Bitstamp API } bitstamp_json = json.dumps(candle_stick_data, indent=2) #print("Bitstamp: OK") #print(bitstamp_json) return bitstamp_json else: print(f"Error fetching data from Bitstamp API: {response.status_code}") return "Bitstamp: ERROR" def fetch_bitfinex(): ## Bitfinex response = requests.get("https://api-pub.bitfinex.com/v2/candles/trade:5m:tBTCUSD/last") if response.status_code == 200: # check if the request was successful ohlc_data = response.json() candle_stick_data = { 'exchange': 'bitfinex', 'timestamp': ohlc_data[0], 'open': ohlc_data[1], 'high': ohlc_data[2], 'low': ohlc_data[3], 'close': ohlc_data[4], 'volume_quote': ohlc_data[5], 'volume_base': 0, # not provided by Bitfinex API 'trades': 0, # not provided by Bitfinex API } bitfinex_json = json.dumps(candle_stick_data, indent=2) #print("Bitfinex: OK") #print(bitfinex_json) return bitfinex_json else: print(f"Error fetching data from Bitfinex API: {response.status_code}") return "Bitfinex: ERROR" def fetch_gemini(): ## Gemini response = requests.get("https://api.gemini.com/v2/candles/btcusd/5m") if response.status_code == 200: # check if the request was successful gemini_ohlc = response.json() candle_stick_data = { 'exchange': 'gemini', 'timestamp': gemini_ohlc[0][0], 'open': gemini_ohlc[0][1], 'high': gemini_ohlc[0][2], 'low': gemini_ohlc[0][3], 'close': gemini_ohlc[0][4], 'volume_quote': 0, # not provided by Gemini API 'volume_base': gemini_ohlc[0][5], 'trades': 0, # not provided by Gemini API } gemini_json = json.dumps(candle_stick_data, indent=2) #print("Gemini: OK") #print(gemini_json) return gemini_json else: print(f"Error fetching data from Gemini API: {response.status_code}") return "Gemini: ERROR" def write_dict_to_database(in_dict, connection): cursor = connection.cursor() # Use placeholders for the values in the INSERT statement insert_query = "INSERT INTO ohlc (exchange, timestamp, open, high, low, close, volume_quote, volume_base, trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" values = (in_dict['exchange'], in_dict['timestamp'], in_dict['open'], in_dict['high'], in_dict['low'], in_dict['close'], in_dict['volume_quote'], in_dict['volume_base'], in_dict['trades']) ## apply lock while writing to database with database_lock: cursor.execute(insert_query, values) connection.commit() def get_the_data(): #cursor = db.cursor() while True: db = sqlite3.connect(database) write_dict_to_database(json.loads(fetch_kraken()), db) write_dict_to_database(json.loads(fetch_bitfinex()), db) write_dict_to_database(json.loads(fetch_bitstamp()), db) write_dict_to_database(json.loads(fetch_gemini()), db) db.close() time.sleep(290) Checkthedatabase() database_lock = threading.Lock() # make this run in thread and allow contium to http api stuff fetch_thread = threading.Thread(target=get_the_data) fetch_thread.daemon = True fetch_thread.start() db = sqlite3.connect(database) count = 1 while True: if count % 10 == 0: print("------- status ", count, "-------") top2_rows = db.execute("SELECT * FROM ohlc LIMIT 2").fetchall() print(json.dumps(top2_rows, indent=4)) else: output = str(count) + ".. " print(output, end = '\r') count = count + 1 time.sleep(30)