chatgpt/btc_tracker/kraken_fetch.py

412 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/python3
"""
2023-01-01 13:40:47 +02:00
Fetch BTCUSD OHLC data from few market places and serve it forward with simple json api.
2023-01-01 13:40:47 +02:00
Creates: ./btc_ohlc.db
serves: localhost:5000/[t] and /serverkey
Authentication via auth header with signatures
"""
2023-01-01 13:40:47 +02:00
import math
import json
import os
import time
import sqlite3
import binascii
import threading
2022-12-28 20:36:44 +02:00
from hashlib import sha256
2023-01-01 13:40:47 +02:00
import requests
import ecdsa
import krakenex
from flask import Flask, jsonify, request
# from Cryptodome.Cipher import AES
2023-01-01 13:40:47 +02:00
DATABASE = "btc_ohlc.db"
KEYSFILE = "userkeys.json"
app = Flask(__name__)
## Generate the ECDSA keys for this instance
print("Generating ECDSA keys for this instance... just wait a bit...")
server_private_key = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1)
server_public_key = server_private_key.get_verifying_key()
# We need the hexadecimal form for sharing over http/json
server_public_key_hex = binascii.hexlify(server_public_key.to_string()).decode("utf-8")
database_lock = threading.Lock()
# Empty response json
empty_dict = {
"exchange": "",
"timestamp": 0,
"open": 0,
"high": 0,
"low": 0,
"close": 0,
"volume_quote": 0,
"volume_base": 0,
"trades": 0,
}
empty_json = json.dumps(empty_dict)
def read_keys():
with open(KEYSFILE, "r") as cfile:
user_keys = json.load(cfile)
return user_keys["user_publickeys"]
2023-01-01 13:40:47 +02:00
def check_database():
"""
Check the database for the 'ohlc' table.
If the database file or the table does not exist, create them.
"""
if not os.path.exists(DATABASE):
new_db = sqlite3.connect(DATABASE)
new_db.execute(
"""\
CREATE TABLE ohlc (
id INTEGER PRIMARY KEY,
exchange TEXT NOT NULL,
timestamp INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume_quote REAL NOT NULL,
volume_base REAL NOT NULL,
trades INTEGER NOT NULL )"""
)
new_db.commit()
new_db.close()
2023-01-01 13:40:47 +02:00
new_db = sqlite3.connect(DATABASE)
# Check if the table exists
2023-01-01 13:40:47 +02:00
table_exists = False
cursor = new_db.execute("PRAGMA table_info(ohlc)")
for row in cursor:
table_exists = True
2023-01-01 13:40:47 +02:00
# Create the table if it doesn't exist
if not table_exists:
new_db.execute(
"""\
CREATE TABLE ohlc (
id INTEGER PRIMARY KEY,
exchange TEXT NOT NULL,
timestamp INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume_quote REAL NOT NULL,
volume_base REAL NOT NULL,
trades INTEGER NOT NULL )"""
)
new_db.commit()
def fetch_kraken():
"""
Fetch BTCUSD OHLC data from Kraken in json.
Returns:
str: 5min OHLC data in JSON format.
"""
kraken = krakenex.API()
2023-01-01 13:40:47 +02:00
response = kraken.query_public("OHLC", {"pair": "BTCUSD", "interval": 240})
ohlc_data = response["result"]["XXBTZUSD"]
2023-01-01 13:40:47 +02:00
candle_stick_data = {
"exchange": "kraken",
"timestamp": ohlc_data[1][0],
"open": ohlc_data[0][1],
"high": max(item[2] for item in ohlc_data),
"low": min(item[3] for item in ohlc_data),
"close": ohlc_data[-1][4],
"volume_quote": sum(float(item[5]) for item in ohlc_data),
"volume_base": sum(float(item[6]) for item in ohlc_data),
"trades": sum(item[7] for item in ohlc_data),
}
2023-01-01 13:40:47 +02:00
kraken_json = json.dumps(candle_stick_data, indent=2)
return kraken_json
def fetch_bitstamp():
"""
Fetch Bitstamp data ja serve it as json.
Returns:
str: 5min OHLC data in JSON format.
"""
response = requests.get(
"https://www.bitstamp.net/api/v2/ohlc/btcusd/?step=300&limit=1"
)
if response.status_code == 200: # check if the request was successful
bitstamp_data = response.json()
ohlc_data = bitstamp_data["data"]["ohlc"]
candle_stick_data = {
"exchange": "bitstamp",
"timestamp": int(ohlc_data[0]["timestamp"]),
"open": float(ohlc_data[0]["open"]),
"high": float(ohlc_data[0]["high"]),
"low": float(ohlc_data[0]["low"]),
"close": float(ohlc_data[0]["close"]),
"volume_quote": float(ohlc_data[0]["volume"]),
"volume_base": 0, # not provided by Bitstamp API
"trades": 0, # not provided by Bitstamp API
}
bitstamp_json = json.dumps(candle_stick_data, indent=2)
return bitstamp_json
# if we get any thing else than http/200
print(f"Error fetching data from Bitstamp API: {response.status_code}")
return empty_json
def fetch_bitfinex():
"""
Bitfinex
Returns:
str: 5min OHLC data in JSON format.
"""
response = requests.get(
"https://api-pub.bitfinex.com/v2/candles/trade:5m:tBTCUSD/last"
)
if response.status_code == 200: # check if the request was successful
ohlc_data = response.json()
candle_stick_data = {
"exchange": "bitfinex",
"timestamp": ohlc_data[0],
"open": ohlc_data[1],
"high": ohlc_data[2],
"low": ohlc_data[3],
"close": ohlc_data[4],
"volume_quote": ohlc_data[5],
"volume_base": 0, # not provided by Bitfinex API
"trades": 0, # not provided by Bitfinex API
}
2023-01-01 13:40:47 +02:00
bitfinex_json = json.dumps(candle_stick_data, indent=2)
return bitfinex_json
# if we get any thing else than http/200
print(f"Error fetching data from Bitfinex API: {response.status_code}")
return empty_json
def fetch_gemini():
"""
Fetch BTCUSD OHLC data from Gemini
Returns:
str: 5min OHLC data in JSON format.
"""
response = requests.get("https://api.gemini.com/v2/candles/btcusd/5m")
if response.status_code == 200: # check if the request was successful
gemini_ohlc = response.json()
candle_stick_data = {
"exchange": "gemini",
"timestamp": gemini_ohlc[0][0],
"open": gemini_ohlc[0][1],
"high": gemini_ohlc[0][2],
"low": gemini_ohlc[0][3],
"close": gemini_ohlc[0][4],
"volume_quote": 0, # not provided by Gemini API
"volume_base": gemini_ohlc[0][5],
"trades": 0, # not provided by Gemini API
}
gemini_json = json.dumps(candle_stick_data, indent=2)
return gemini_json
# if we get any thing else than http/200
print(f"Error fetching data from Gemini API: {response.status_code}")
return empty_json
2022-12-28 15:58:55 +02:00
def fetch_bybit():
"""
Fetch BTCUSD OHLC data from Bybit
Returns:
str: 5min OHLC data in JSON format.
"""
base_url = (
"https://api.bybit.com/v2/public/kline/list?symbol=BTCUSD&interval=5&from="
)
current_unixtime = int(time.time())
last_minute = math.floor(current_unixtime / 60)
last_minute_unixtime = str(last_minute * 60 - 300)
query_url = "".join([base_url, last_minute_unixtime])
response = requests.get(query_url)
if response.status_code == 200: # check if the request was successful
bybit_ohlc = response.json()
candle_stick_data = {
"exchange": "bybit",
"timestamp": bybit_ohlc["result"][0]["open_time"],
"open": bybit_ohlc["result"][0]["open"],
"high": bybit_ohlc["result"][0]["high"],
"low": bybit_ohlc["result"][0]["low"],
"close": bybit_ohlc["result"][0]["close"],
"volume_quote": bybit_ohlc["result"][0]["volume"],
"volume_base": bybit_ohlc["result"][0]["turnover"],
"trades": 0,
}
bybit_json = json.dumps(candle_stick_data, indent=2)
return bybit_json
# if we get any thing else than http/200
2022-12-28 15:58:55 +02:00
print(f"Error fetching data from Bybit API: {response.status_code}")
return empty_json
def write_dict_to_database(in_dict, connection):
"""
Writes given dict to given database.
Arguments: dict, db.connection()
Uses shared global database_lock.
"""
cursor = connection.cursor()
# use placeholders for the values in the insert statement
insert_query = "insert into ohlc (exchange, timestamp, open, high, low, close, volume_quote, volume_base, trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
values = (
in_dict["exchange"],
in_dict["timestamp"],
in_dict["open"],
in_dict["high"],
in_dict["low"],
in_dict["close"],
in_dict["volume_quote"],
in_dict["volume_base"],
in_dict["trades"],
)
## apply lock while writing to database
with database_lock:
cursor.execute(insert_query, values)
connection.commit()
def get_the_data():
"""
Creates infinite While True loop to fetch OHLC data and save it to database.
"""
while True:
ohlc_db = sqlite3.connect(DATABASE)
write_dict_to_database(json.loads(fetch_kraken()), ohlc_db)
write_dict_to_database(json.loads(fetch_bitfinex()), ohlc_db)
write_dict_to_database(json.loads(fetch_bitstamp()), ohlc_db)
write_dict_to_database(json.loads(fetch_gemini()), ohlc_db)
write_dict_to_database(json.loads(fetch_bybit()), ohlc_db)
ohlc_db.close()
print("fetches done at", time.time(), "sleeping now for 290")
time.sleep(290)
def check_auth(text, signature):
"""
Check signatures against known public keys
Arguments: text, signature
Reads: Global public user_publickeys dict.
Returns: True / False
"""
## Make bytes-object from given signature
sig_bytes = bytes.fromhex(signature)
## We will iterate over all user keys to determ who is we are talking to and should they have access
for key, value in user_publickeys.items():
## Create bytes-object from the public in 'value' variable
## and use it to create VerifyingKey (vk)
public_key_bytes = bytes.fromhex(value)
verifying_key = ecdsa.VerifyingKey.from_string(
public_key_bytes, curve=ecdsa.SECP256k1
)
try:
verifying_key.verify(sig_bytes, bytes(text, "utf-8"))
print("user is", key)
return True
except ecdsa.BadSignatureError:
return False
@app.route("/")
def get_data():
"""
Serve the data from the database. Limit the responses by given timestamp.
The pretty thing is under consideration...
"""
# Get the time (t) argument from the url"
query_timestamp = request.args.get("t")
# Should we make output pretty for curl users?
query_pretty = request.args.get("pretty")
# Authentication header, signatured the query with private key of a user
signature = request.headers.get("auth")
get_url = request.url
if not check_auth(get_url, signature):
return "Access denied! Check your keys, maybe.", 403
with database_lock:
btc_db = sqlite3.connect(DATABASE)
if query_timestamp:
rows = btc_db.execute(
"SELECT exchange, timestamp, open, high, low, close FROM ohlc WHERE timestamp > ? ORDER BY timestamp",
(query_timestamp,),
).fetchall()
else:
rows = btc_db.execute(
"SELECT exchange, timestamp, open, high, low, close FROM ohlc ORDER BY timestamp"
).fetchall()
query_timestamp = 0
data = {"timestamp": time.time(), "rows": rows}
# make sha256 checksum and append it to the data object
data_shasum = sha256(json.dumps(data).encode("utf-8")).hexdigest()
updated_data = {"shasum": data_shasum}
updated_data.update(data)
data = updated_data
# sign the response
signature = server_private_key.sign(json.dumps(data).encode("utf-8"))
signature_hex = binascii.hexlify(signature).decode("utf-8")
data["signature"] = signature_hex
if query_pretty:
response = json.dumps(data, indent=2, separators=(";\n", " :"))
2023-01-01 13:40:47 +02:00
else:
response = json.dumps(data)
return response, 200, {"Content-Type": "application/json"}
@app.route("/serverkey")
def give_serverkey():
"""
Serve the public keys of this instace to the world.
"""
## This endpoint also under Authentication?
signature = request.headers.get("auth")
get_url = request.url
if not check_auth(get_url, signature):
return "Access denied! Check your keys, maybe.", 403
return jsonify({"public_key": server_public_key_hex})
if __name__ == "__main__":
# Make sanity checks for the database
check_database()
# Get the users public keys
user_publickeys = read_keys()
# Start the data fetching backend process§
fetch_thread = threading.Thread(target=get_the_data)
fetch_thread.daemon = True
fetch_thread.start()
# Start the Flask app
app.run()