#!/usr/bin/python3 """ Fetch BTCUSD OHLC data from few market places and serve it forward with simple json api. Creates: ./btc_ohlc.db serves: localhost:5000/[t] and /serverkey Authentication via auth header with signatures """ import math import json import os import time import sys import sqlite3 import binascii import threading from hashlib import sha256 import requests import ecdsa import krakenex from flask import Flask, jsonify, request # from Cryptodome.Cipher import AES DATABASE = "btc_ohlc.db" KEYSFILE = "userkeys.json" app = Flask(__name__) ## Generate the ECDSA keys for this instance print("Generating ECDSA keys for this instance... just wait a bit...") server_private_key = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1) server_public_key = server_private_key.get_verifying_key() # We need the hexadecimal form for sharing over http/json server_public_key_hex = binascii.hexlify(server_public_key.to_string()).decode("utf-8") # Empty response json empty_dict = { "exchange": "", "timestamp": 0, "open": 0, "high": 0, "low": 0, "close": 0, "volume_quote": 0, "volume_base": 0, "trades": 0, } empty_json = json.dumps(empty_dict) def read_keys(): """ Reads the declared KEYSFILE and returns "user_publickeys" from it. Returns: json object """ empty_userkeys = { "user_publickeys": { "user_name": "user_ecdsa_public key" } } # test if the file is there, we could make one if there is non try: # ascii is subset of UTF-8, this should be quite safe... with open(KEYSFILE, "r", encoding='utf-8') as cfile: user_keys = json.load(cfile) if 'user_publickeys' not in user_keys: print('Missing required property "user_publickeys" in config file') sys.exit(1) if user_keys == empty_userkeys: print('Your userkeys.json file seems to not filled. Please insert your key there.') sys.exit(1) return user_keys["user_publickeys"] except FileNotFoundError: with open(KEYSFILE, "w", encoding='utf-8') as nfile: print('You seem be lacking the users key file, created example for you. Exiting') json.dump(empty_userkeys, nfile, indent=2) sys.exit(1) def check_database(): """ Check the database for the 'ohlc' table. If the database file or the table does not exist, create them. """ if not os.path.exists(DATABASE): new_db = sqlite3.connect(DATABASE) new_db.execute( """\ CREATE TABLE ohlc ( id INTEGER PRIMARY KEY, exchange TEXT NOT NULL, timestamp INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume_quote REAL NOT NULL, volume_base REAL NOT NULL, trades INTEGER NOT NULL )""" ) new_db.commit() new_db.close() new_db = sqlite3.connect(DATABASE) # Check if the table exists table_exists = False cursor = new_db.execute("PRAGMA table_info(ohlc)") for row in cursor: table_exists = True # Create the table if it doesn't exist if not table_exists: new_db.execute( """\ CREATE TABLE ohlc ( id INTEGER PRIMARY KEY, exchange TEXT NOT NULL, timestamp INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume_quote REAL NOT NULL, volume_base REAL NOT NULL, trades INTEGER NOT NULL )""" ) new_db.commit() def fetch_kraken(): """ Fetch BTCUSD OHLC data from Kraken in json. Returns: str: 5min OHLC data in JSON format. """ kraken = krakenex.API() response = kraken.query_public("OHLC", {"pair": "BTCUSD", "interval": 240}) ohlc_data = response["result"]["XXBTZUSD"] candle_stick_data = { "exchange": "kraken", "timestamp": ohlc_data[1][0], "open": ohlc_data[0][1], "high": max(item[2] for item in ohlc_data), "low": min(item[3] for item in ohlc_data), "close": ohlc_data[-1][4], "volume_quote": sum(float(item[5]) for item in ohlc_data), "volume_base": sum(float(item[6]) for item in ohlc_data), "trades": sum(item[7] for item in ohlc_data), } kraken_json = json.dumps(candle_stick_data, indent=2) return kraken_json def fetch_bitstamp(): """ Fetch Bitstamp data ja serve it as json. Returns: str: 5min OHLC data in JSON format. """ response = requests.get( "https://www.bitstamp.net/api/v2/ohlc/btcusd/?step=300&limit=1" ) if response.status_code == 200: # check if the request was successful bitstamp_data = response.json() ohlc_data = bitstamp_data["data"]["ohlc"] candle_stick_data = { "exchange": "bitstamp", "timestamp": int(ohlc_data[0]["timestamp"]), "open": float(ohlc_data[0]["open"]), "high": float(ohlc_data[0]["high"]), "low": float(ohlc_data[0]["low"]), "close": float(ohlc_data[0]["close"]), "volume_quote": float(ohlc_data[0]["volume"]), "volume_base": 0, # not provided by Bitstamp API "trades": 0, # not provided by Bitstamp API } bitstamp_json = json.dumps(candle_stick_data, indent=2) return bitstamp_json # if we get any thing else than http/200 print(f"Error fetching data from Bitstamp API: {response.status_code}") return empty_json def fetch_bitfinex(): """ Bitfinex Returns: str: 5min OHLC data in JSON format. """ response = requests.get( "https://api-pub.bitfinex.com/v2/candles/trade:5m:tBTCUSD/last" ) if response.status_code == 200: # check if the request was successful ohlc_data = response.json() candle_stick_data = { "exchange": "bitfinex", "timestamp": ohlc_data[0], "open": ohlc_data[1], "high": ohlc_data[2], "low": ohlc_data[3], "close": ohlc_data[4], "volume_quote": ohlc_data[5], "volume_base": 0, # not provided by Bitfinex API "trades": 0, # not provided by Bitfinex API } bitfinex_json = json.dumps(candle_stick_data, indent=2) return bitfinex_json # if we get any thing else than http/200 print(f"Error fetching data from Bitfinex API: {response.status_code}") return empty_json def fetch_gemini(): """ Fetch BTCUSD OHLC data from Gemini Returns: str: 5min OHLC data in JSON format. """ response = requests.get("https://api.gemini.com/v2/candles/btcusd/5m") if response.status_code == 200: # check if the request was successful gemini_ohlc = response.json() candle_stick_data = { "exchange": "gemini", "timestamp": gemini_ohlc[0][0], "open": gemini_ohlc[0][1], "high": gemini_ohlc[0][2], "low": gemini_ohlc[0][3], "close": gemini_ohlc[0][4], "volume_quote": 0, # not provided by Gemini API "volume_base": gemini_ohlc[0][5], "trades": 0, # not provided by Gemini API } gemini_json = json.dumps(candle_stick_data, indent=2) return gemini_json # if we get any thing else than http/200 print(f"Error fetching data from Gemini API: {response.status_code}") return empty_json def fetch_bybit(): """ Fetch BTCUSD OHLC data from Bybit Returns: str: 5min OHLC data in JSON format. """ base_url = ( "https://api.bybit.com/v2/public/kline/list?symbol=BTCUSD&interval=5&from=" ) current_unixtime = int(time.time()) last_minute = math.floor(current_unixtime / 60) last_minute_unixtime = str(last_minute * 60 - 300) query_url = "".join([base_url, last_minute_unixtime]) response = requests.get(query_url) if response.status_code == 200: # check if the request was successful bybit_ohlc = response.json() candle_stick_data = { "exchange": "bybit", "timestamp": bybit_ohlc["result"][0]["open_time"], "open": bybit_ohlc["result"][0]["open"], "high": bybit_ohlc["result"][0]["high"], "low": bybit_ohlc["result"][0]["low"], "close": bybit_ohlc["result"][0]["close"], "volume_quote": bybit_ohlc["result"][0]["volume"], "volume_base": bybit_ohlc["result"][0]["turnover"], "trades": 0, } bybit_json = json.dumps(candle_stick_data, indent=2) return bybit_json # if we get any thing else than http/200 print(f"Error fetching data from Bybit API: {response.status_code}") return empty_json def write_dict_to_database(in_dict, connection): """ Writes given dict to given database. Arguments: dict, db.connection() Uses shared global database_lock. """ cursor = connection.cursor() # use placeholders for the values in the insert statement insert_query = "insert into ohlc (exchange, timestamp, open, high, low, close, volume_quote, volume_base, trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" values = ( in_dict["exchange"], in_dict["timestamp"], in_dict["open"], in_dict["high"], in_dict["low"], in_dict["close"], in_dict["volume_quote"], in_dict["volume_base"], in_dict["trades"], ) ## apply lock while writing to database with database_lock: cursor.execute(insert_query, values) connection.commit() def get_the_data(): """ Creates infinite While True loop to fetch OHLC data and save it to database. """ while True: ohlc_db = sqlite3.connect(DATABASE) write_dict_to_database(json.loads(fetch_kraken()), ohlc_db) write_dict_to_database(json.loads(fetch_bitfinex()), ohlc_db) write_dict_to_database(json.loads(fetch_bitstamp()), ohlc_db) write_dict_to_database(json.loads(fetch_gemini()), ohlc_db) write_dict_to_database(json.loads(fetch_bybit()), ohlc_db) ohlc_db.close() print("fetches done at", time.time(), "sleeping now for 290") time.sleep(290) def check_auth(text, signature): """ Check signatures against known public keys Arguments: text, signature Reads: Global public user_publickeys dict. Returns: True / False """ ## Make bytes-object from given signature sig_bytes = bytes.fromhex(signature) ## We will iterate over all user keys to determ who is we are talking to and should they have access for key, value in user_publickeys.items(): ## Create bytes-object from the public in 'value' variable ## and use it to create VerifyingKey (vk) public_key_bytes = bytes.fromhex(value) verifying_key = ecdsa.VerifyingKey.from_string( public_key_bytes, curve=ecdsa.SECP256k1 ) try: verifying_key.verify(sig_bytes, bytes(text, "utf-8")) print("user is", key) return True except ecdsa.BadSignatureError: return False @app.route("/") def get_data(): """ Serve the data from the database. Limit the responses by given timestamp. The pretty thing is under consideration... """ # Get the time (t) argument from the url" query_timestamp = request.args.get("t") # Should we make output pretty for curl users? query_pretty = request.args.get("pretty") # Authentication header, signatured the query with private key of a user signature = request.headers.get("auth") get_url = request.url if not check_auth(get_url, signature): return "Access denied! Check your keys, maybe.", 403 with database_lock: btc_db = sqlite3.connect(DATABASE) if query_timestamp: rows = btc_db.execute( "SELECT exchange, timestamp, open, high, low, close FROM ohlc WHERE timestamp > ? ORDER BY timestamp", (query_timestamp,), ).fetchall() else: rows = btc_db.execute( "SELECT exchange, timestamp, open, high, low, close FROM ohlc ORDER BY timestamp" ).fetchall() query_timestamp = 0 data = {"timestamp": time.time(), "rows": rows} # make sha256 checksum and append it to the data object data_shasum = sha256(json.dumps(data).encode("utf-8")).hexdigest() updated_data = {"shasum": data_shasum} updated_data.update(data) data = updated_data # sign the response signature = server_private_key.sign(json.dumps(data).encode("utf-8")) signature_hex = binascii.hexlify(signature).decode("utf-8") data["signature"] = signature_hex if query_pretty: response = json.dumps(data, indent=2, separators=(";\n", " :")) else: response = json.dumps(data) return response, 200, {"Content-Type": "application/json"} @app.route("/serverkey") def give_serverkey(): """ Serve the public keys of this instace to the world. """ ## This endpoint also under Authentication? signature = request.headers.get("auth") get_url = request.url if not check_auth(get_url, signature): return "Access denied! Check your keys, maybe.", 403 return jsonify({"public_key": server_public_key_hex}) if __name__ == "__main__": # Make sanity checks for the database check_database() database_lock = threading.Lock() # Get the users public keys user_publickeys = read_keys() # Start the data fetching backend processĀ§ fetch_thread = threading.Thread(target=get_the_data) fetch_thread.daemon = True fetch_thread.start() # Start the Flask app app.run()