diff --git a/btc_tracker.py b/btc_tracker.py new file mode 100644 index 0000000..6c0db93 --- /dev/null +++ b/btc_tracker.py @@ -0,0 +1,5 @@ +import fetcher + +fetcher.start() + +exit(0) diff --git a/btc_tracker/database b/btc_tracker/database new file mode 100644 index 0000000..51cac49 Binary files /dev/null and b/btc_tracker/database differ diff --git a/btc_tracker/fetcher.py b/btc_tracker/fetcher.py new file mode 100755 index 0000000..e0e92a9 --- /dev/null +++ b/btc_tracker/fetcher.py @@ -0,0 +1,228 @@ +#!/usr/bin/python3 + +import krakenex +import json, sqlite3 +import requests, os, time +import threading + +database = "btc_ohlc.db" + +def Checkthedatabase(): + ## Some sanity for the database + # check if btc_timeseries.db database file exists + if not os.path.exists(database): + db = sqlite3.connect(database) + + db.execute("""\ + CREATE TABLE ohlc ( + id INTEGER PRIMARY KEY, + exchange TEXT NOT NULL, + timestamp INTEGER NOT NULL, + open REAL NOT NULL, + high REAL NOT NULL, + low REAL NOT NULL, + close REAL NOT NULL, + volume_quote REAL NOT NULL, + volume_base REAL NOT NULL, + trades INTEGER NOT NULL )""") + + db.commit() + db.close() + + db = sqlite3.connect(database) + + # Check if the table exists + table_exists = False + cursor = db.execute("PRAGMA table_info(ohlc)") + for row in cursor: + table_exists = True + + # Create the table if it doesn't exist + if not table_exists: + db.execute("""\ + CREATE TABLE ohlc ( + id INTEGER PRIMARY KEY, + exchange TEXT NOT NULL, + timestamp INTEGER NOT NULL, + open REAL NOT NULL, + high REAL NOT NULL, + low REAL NOT NULL, + close REAL NOT NULL, + volume_quote REAL NOT NULL, + volume_base REAL NOT NULL, + trades INTEGER NOT NULL )""") + db.commit() + +def fetch_kraken(): +### Kraken + kraken = krakenex.API() + + response = kraken.query_public('OHLC', {'pair': 'BTCUSD', 'interval': 240 }) + ohlc_data = response['result']['XXBTZUSD'] + + candle_stick_data = { + 'exchange': 'kraken', + 'timestamp': ohlc_data[1][0], + 'open': ohlc_data[0][1], + 'high': max(item[2] for item in ohlc_data), + 'low': min(item[3] for item in ohlc_data), + 'close': ohlc_data[-1][4], + 'volume_quote': sum(float(item[5]) for item in ohlc_data), + 'volume_base': sum(float(item[6]) for item in ohlc_data), + 'trades': sum(item[7] for item in ohlc_data), + } + + kraken_json = json.dumps(candle_stick_data, indent=2) + #print("Kraken: OK") + #print(kraken_json) + #q.put("Kraken: OK") + return kraken_json + +def fetch_bitstamp(q): +## Bitstamp + response = requests.get("https://www.bitstamp.net/api/v2/ohlc/btcusd/?step=300&limit=1") + + if response.status_code == 200: # check if the request was successful + bitstamp_data = response.json() + ohlc_data = bitstamp_data["data"]["ohlc"] + + candle_stick_data = { + 'exchange': 'bitstamp', + 'timestamp': int(ohlc_data[0]['timestamp']), + 'open': float(ohlc_data[0]['open']), + 'high': float(ohlc_data[0]['high']), + 'low': float(ohlc_data[0]['low']), + 'close': float(ohlc_data[0]['close']), + 'volume_quote': float(ohlc_data[0]['volume']), + 'volume_base': 0, # not provided by Bitstamp API + 'trades': 0, # not provided by Bitstamp API + } + + bitstamp_json = json.dumps(candle_stick_data, indent=2) + #print("Bitstamp: OK") + #print(bitstamp_json) + #q.put("Bitstamp: OK") + return bitstamp_json + else: + print(f"Error fetching data from Bitstamp API: {response.status_code}") + q.put("Bitstamp: ERROR") + return empty_json + +def fetch_bitfinex(q): +## Bitfinex + response = requests.get("https://api-pub.bitfinex.com/v2/candles/trade:5m:tBTCUSD/last") + + if response.status_code == 200: # check if the request was successful + ohlc_data = response.json() + candle_stick_data = { + 'exchange': 'bitfinex', + 'timestamp': ohlc_data[0], + 'open': ohlc_data[1], + 'high': ohlc_data[2], + 'low': ohlc_data[3], + 'close': ohlc_data[4], + 'volume_quote': ohlc_data[5], + 'volume_base': 0, # not provided by Bitfinex API + 'trades': 0, # not provided by Bitfinex API + } + + bitfinex_json = json.dumps(candle_stick_data, indent=2) + #print("Bitfinex: OK") + #print(bitfinex_json) + #q.put("Bitfinex: OK") + return bitfinex_json + else: + print(f"Error fetching data from Bitfinex API: {response.status_code}") + q.put("Bitfinex: ERROR") + return empty_json + +def fetch_gemini(q): +## Gemini + response = requests.get("https://api.gemini.com/v2/candles/btcusd/5m") + + if response.status_code == 200: # check if the request was successful + gemini_ohlc = response.json() + candle_stick_data = { + 'exchange': 'gemini', + 'timestamp': gemini_ohlc[0][0], + 'open': gemini_ohlc[0][1], + 'high': gemini_ohlc[0][2], + 'low': gemini_ohlc[0][3], + 'close': gemini_ohlc[0][4], + 'volume_quote': 0, # not provided by Gemini API + 'volume_base': gemini_ohlc[0][5], + 'trades': 0, # not provided by Gemini API + } + gemini_json = json.dumps(candle_stick_data, indent=2) + #print("Gemini: OK") + #print(gemini_json) + #q.put("Gemini: OK") + return gemini_json + else: + print(f"Error fetching data from Gemini API: {response.status_code}") + q.put("Gemini: ERROR") + return empty_json + +def write_dict_to_database(in_dict, connection): + cursor = connection.cursor() + # Use placeholders for the values in the INSERT statement + insert_query = "INSERT INTO ohlc (exchange, timestamp, open, high, low, close, volume_quote, volume_base, trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + + values = (in_dict['exchange'], + in_dict['timestamp'], + in_dict['open'], + in_dict['high'], + in_dict['low'], + in_dict['close'], + in_dict['volume_quote'], + in_dict['volume_base'], + in_dict['trades']) + ## apply lock while writing to database + with database_lock: + cursor.execute(insert_query, values) + connection.commit() + +def get_the_data(q): + #cursor = db.cursor() + while True: + #logline = "Fetching at: " + str(time.time()) + #q.put(logline) + db = sqlite3.connect(database) + write_dict_to_database(json.loads(fetch_kraken()), db) + write_dict_to_database(json.loads(fetch_bitfinex(q)), db) + write_dict_to_database(json.loads(fetch_bitstamp(q)), db) + write_dict_to_database(json.loads(fetch_gemini(q)), db) + db.close() + time.sleep(290) + + +Checkthedatabase() + +# Empty response json +empty_dict = {"exchange": "", "timestamp": 0, "open": 0, "high": 0, "low": 0, "close": 0, "volume_quote": 0, "volume_base": 0, "trades": 0} +empty_json = json.dumps(empty_dict) +database_lock = threading.Lock() + +fetch_thread = threading.Thread() +def get_health(): + if fetch_thread.is_alive(): + return "Alive" + else: + return "Dead" + +def start(q): + logline = "Started at " + str(time.time()) + q.put(logline) + fetch_thread.target = get_the_data + fetch_thread.args = (q, ) + fetch_thread.daemon = True + fetch_thread.start() + logline = "Fetcher ID: " + str(fetch_thread.ident) + " or " + str(fetch_thread.native_id) + q.put(logline) + + db = sqlite3.connect(database) + lastID_andTime = db.execute("SELECT id, timestamp FROM ohlc LIMIT 1 OFFSET (SELECT COUNT(*) FROM ohlc) - 1").fetchall() + #q.put(json.dumps(lastID_andTime, indent=2, separators=(',', ':'))) + q.put(json.dumps(lastID_andTime, separators=(',', ':'))) + + diff --git a/btc_tracker/kraken_fetch.py b/btc_tracker/kraken_fetch.py index 72cbac4..d2ddddd 100755 --- a/btc_tracker/kraken_fetch.py +++ b/btc_tracker/kraken_fetch.py @@ -99,11 +99,11 @@ def fetch_bitstamp(): bitstamp_json = json.dumps(candle_stick_data, indent=2) #print("Bitstamp: OK") - #print(bitstamp_json) + # print(bitstamp_json) return bitstamp_json else: print(f"Error fetching data from Bitstamp API: {response.status_code}") - return "Bitstamp: ERROR" + return empty_json def fetch_bitfinex(): ## Bitfinex @@ -129,7 +129,7 @@ def fetch_bitfinex(): return bitfinex_json else: print(f"Error fetching data from Bitfinex API: {response.status_code}") - return "Bitfinex: ERROR" + return empty_json def fetch_gemini(): ## Gemini @@ -154,7 +154,7 @@ def fetch_gemini(): return gemini_json else: print(f"Error fetching data from Gemini API: {response.status_code}") - return "Gemini: ERROR" + return empty_json def write_dict_to_database(in_dict, connection): cursor = connection.cursor() @@ -189,6 +189,10 @@ def get_the_data(): Checkthedatabase() +# Empty response json +empty_dict = {"exchange": "", "timestamp": 0, "open": 0, "high": 0, "low": 0, "close": 0, "volume_quote": 0, "volume_base": 0, "trades": 0} +empty_json = json.dumps(empty_dict) + database_lock = threading.Lock() # make this run in thread and allow contium to http api stuff fetch_thread = threading.Thread(target=get_the_data) @@ -200,7 +204,7 @@ count = 1 while True: if count % 10 == 0: print("------- status ", count, "-------") - top2_rows = db.execute("SELECT * FROM ohlc LIMIT 2").fetchall() + top2_rows = db.execute("SELECT * FROM ohlc LIMIT 2 OFFSET (SELECT COUNT(*) FROM ohlc) - 2").fetchall() print(json.dumps(top2_rows, indent=4)) else: output = str(count) + ".. " diff --git a/btc_tracker/the_server.py b/btc_tracker/the_server.py new file mode 100755 index 0000000..4f5fc2a --- /dev/null +++ b/btc_tracker/the_server.py @@ -0,0 +1,31 @@ +#!/usr/bin/python3 +import fetcher +import time +from queue import Queue +from flask import Flask + + +# Create a queue to get some info how the fetcher is doing +q = Queue() +# Start the data collecting +fetcher.start(q) + +# Initialize the Flask app +app = Flask(__name__) + +@app.route("/") +def root(): + if not q.empty(): + data = q.get() + return(str(data)) + else: + return("Fetcher message queue is empty") + +@app.route("/fetcher_health") +def fetcher_health(): + health = fetcher.get_health() + return(str(health)) + +# Run the app +if __name__ == "__main__": + app.run() diff --git a/letters/Flask_and_backend_process.py b/letters/Flask_and_backend_process.py new file mode 100644 index 0000000..2e21c5e --- /dev/null +++ b/letters/Flask_and_backend_process.py @@ -0,0 +1,62 @@ +from flask import Flask +import threading +import sqlite3 + +app = Flask(__name__) + +# Create a lock for synchronizing access to the database +lock = threading.Lock() + +def my_background_process(): + # Acquire the lock + lock.acquire() + + # Connect to the in-memory database + conn = sqlite3.connect(':memory:') + c = conn.cursor() + + # Create a table in the database + c.execute("CREATE TABLE mytable (col1 INTEGER, col2 TEXT)") + + # Write to the database + c.execute("INSERT INTO mytable (col1, col2) VALUES (?, ?)", (value1, value2)) + conn.commit() + + # Close the connection + conn.close() + + # Release the lock + lock.release() + +@app.route('/') +def read_from_database(): + # Acquire the lock + lock.acquire() + + # Connect to the in-memory database + conn = sqlite3.connect(':memory:') + c = conn.cursor() + + # Read from the database + c.execute("SELECT * FROM mytable") + rows = c.fetchall() + + # Close the connection + conn.close() + + # Release the lock + lock.release() + + # Return the rows to the client + return rows + +if __name__ == '__main__': + # Create a new thread for the background process + thread = threading.Thread(target=my_background_process) + + # Start the thread + thread.start() + + # Start the Flask app + app.run() +