Added btc_tracker.py and letter about Flask and threading.
This commit is contained in:
		
							
								
								
									
										5
									
								
								btc_tracker.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								btc_tracker.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,5 @@
 | 
			
		||||
import fetcher
 | 
			
		||||
 | 
			
		||||
fetcher.start()
 | 
			
		||||
 | 
			
		||||
exit(0)
 | 
			
		||||
							
								
								
									
										
											BIN
										
									
								
								btc_tracker/database
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								btc_tracker/database
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
							
								
								
									
										228
									
								
								btc_tracker/fetcher.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										228
									
								
								btc_tracker/fetcher.py
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,228 @@
 | 
			
		||||
#!/usr/bin/python3
 | 
			
		||||
 | 
			
		||||
import krakenex
 | 
			
		||||
import json, sqlite3
 | 
			
		||||
import requests, os, time
 | 
			
		||||
import threading
 | 
			
		||||
 | 
			
		||||
database = "btc_ohlc.db"
 | 
			
		||||
 | 
			
		||||
def Checkthedatabase():
 | 
			
		||||
  ## Some sanity for the database
 | 
			
		||||
  # check if btc_timeseries.db database file exists
 | 
			
		||||
  if not os.path.exists(database):
 | 
			
		||||
    db = sqlite3.connect(database)
 | 
			
		||||
    
 | 
			
		||||
    db.execute("""\
 | 
			
		||||
    CREATE TABLE ohlc (
 | 
			
		||||
    id INTEGER PRIMARY KEY,
 | 
			
		||||
    exchange TEXT NOT NULL,
 | 
			
		||||
    timestamp INTEGER NOT NULL,
 | 
			
		||||
    open REAL NOT NULL,
 | 
			
		||||
    high REAL NOT NULL,
 | 
			
		||||
    low REAL NOT NULL,
 | 
			
		||||
    close REAL NOT NULL,
 | 
			
		||||
    volume_quote REAL NOT NULL,
 | 
			
		||||
    volume_base REAL NOT NULL,
 | 
			
		||||
    trades INTEGER NOT NULL )""")
 | 
			
		||||
 | 
			
		||||
    db.commit()
 | 
			
		||||
    db.close()
 | 
			
		||||
  
 | 
			
		||||
  db = sqlite3.connect(database)
 | 
			
		||||
  
 | 
			
		||||
  # Check if the table exists
 | 
			
		||||
  table_exists = False
 | 
			
		||||
  cursor = db.execute("PRAGMA table_info(ohlc)")
 | 
			
		||||
  for row in cursor:
 | 
			
		||||
    table_exists = True
 | 
			
		||||
  
 | 
			
		||||
  # Create the table if it doesn't exist
 | 
			
		||||
  if not table_exists:
 | 
			
		||||
    db.execute("""\
 | 
			
		||||
    CREATE TABLE ohlc (
 | 
			
		||||
    id INTEGER PRIMARY KEY,
 | 
			
		||||
    exchange TEXT NOT NULL,
 | 
			
		||||
    timestamp INTEGER NOT NULL,
 | 
			
		||||
    open REAL NOT NULL,
 | 
			
		||||
    high REAL NOT NULL,
 | 
			
		||||
    low REAL NOT NULL,
 | 
			
		||||
    close REAL NOT NULL,
 | 
			
		||||
    volume_quote REAL NOT NULL,
 | 
			
		||||
    volume_base REAL NOT NULL,
 | 
			
		||||
    trades INTEGER NOT NULL )""")
 | 
			
		||||
    db.commit()
 | 
			
		||||
 | 
			
		||||
def fetch_kraken():
 | 
			
		||||
### Kraken
 | 
			
		||||
  kraken = krakenex.API()
 | 
			
		||||
  
 | 
			
		||||
  response = kraken.query_public('OHLC', {'pair': 'BTCUSD', 'interval': 240 })
 | 
			
		||||
  ohlc_data = response['result']['XXBTZUSD']
 | 
			
		||||
  
 | 
			
		||||
  candle_stick_data = {
 | 
			
		||||
      'exchange': 'kraken',
 | 
			
		||||
      'timestamp': ohlc_data[1][0],
 | 
			
		||||
      'open': ohlc_data[0][1],
 | 
			
		||||
      'high': max(item[2] for item in ohlc_data),
 | 
			
		||||
      'low': min(item[3] for item in ohlc_data),
 | 
			
		||||
      'close': ohlc_data[-1][4],
 | 
			
		||||
      'volume_quote': sum(float(item[5]) for item in ohlc_data),
 | 
			
		||||
      'volume_base': sum(float(item[6]) for item in ohlc_data),
 | 
			
		||||
      'trades': sum(item[7] for item in ohlc_data),
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
  kraken_json = json.dumps(candle_stick_data, indent=2)
 | 
			
		||||
  #print("Kraken: OK")
 | 
			
		||||
  #print(kraken_json)
 | 
			
		||||
  #q.put("Kraken: OK")
 | 
			
		||||
  return kraken_json
 | 
			
		||||
 | 
			
		||||
def fetch_bitstamp(q):
 | 
			
		||||
## Bitstamp
 | 
			
		||||
  response = requests.get("https://www.bitstamp.net/api/v2/ohlc/btcusd/?step=300&limit=1")
 | 
			
		||||
  
 | 
			
		||||
  if response.status_code == 200:  # check if the request was successful
 | 
			
		||||
    bitstamp_data = response.json()
 | 
			
		||||
    ohlc_data = bitstamp_data["data"]["ohlc"]
 | 
			
		||||
    
 | 
			
		||||
    candle_stick_data = {
 | 
			
		||||
        'exchange': 'bitstamp',
 | 
			
		||||
        'timestamp': int(ohlc_data[0]['timestamp']),
 | 
			
		||||
        'open': float(ohlc_data[0]['open']),
 | 
			
		||||
        'high': float(ohlc_data[0]['high']),
 | 
			
		||||
        'low': float(ohlc_data[0]['low']),
 | 
			
		||||
        'close': float(ohlc_data[0]['close']),
 | 
			
		||||
        'volume_quote': float(ohlc_data[0]['volume']),
 | 
			
		||||
        'volume_base': 0,  # not provided by Bitstamp API
 | 
			
		||||
        'trades': 0,  # not provided by Bitstamp API
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    bitstamp_json = json.dumps(candle_stick_data, indent=2)
 | 
			
		||||
    #print("Bitstamp: OK")
 | 
			
		||||
    #print(bitstamp_json)
 | 
			
		||||
    #q.put("Bitstamp: OK")
 | 
			
		||||
    return bitstamp_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Bitstamp API: {response.status_code}")
 | 
			
		||||
    q.put("Bitstamp: ERROR")
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def fetch_bitfinex(q):
 | 
			
		||||
## Bitfinex
 | 
			
		||||
  response = requests.get("https://api-pub.bitfinex.com/v2/candles/trade:5m:tBTCUSD/last")
 | 
			
		||||
  
 | 
			
		||||
  if response.status_code == 200:  # check if the request was successful
 | 
			
		||||
    ohlc_data = response.json()
 | 
			
		||||
    candle_stick_data = {
 | 
			
		||||
        'exchange': 'bitfinex',
 | 
			
		||||
        'timestamp': ohlc_data[0],
 | 
			
		||||
        'open': ohlc_data[1],
 | 
			
		||||
        'high': ohlc_data[2],
 | 
			
		||||
        'low': ohlc_data[3],
 | 
			
		||||
        'close': ohlc_data[4],
 | 
			
		||||
        'volume_quote': ohlc_data[5],
 | 
			
		||||
        'volume_base': 0,  # not provided by Bitfinex API
 | 
			
		||||
        'trades': 0,  # not provided by Bitfinex API
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    bitfinex_json = json.dumps(candle_stick_data, indent=2)
 | 
			
		||||
    #print("Bitfinex: OK")
 | 
			
		||||
    #print(bitfinex_json)
 | 
			
		||||
    #q.put("Bitfinex: OK")
 | 
			
		||||
    return bitfinex_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Bitfinex API: {response.status_code}")
 | 
			
		||||
    q.put("Bitfinex: ERROR")
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def fetch_gemini(q):
 | 
			
		||||
## Gemini
 | 
			
		||||
  response = requests.get("https://api.gemini.com/v2/candles/btcusd/5m")
 | 
			
		||||
  
 | 
			
		||||
  if response.status_code == 200:  # check if the request was successful
 | 
			
		||||
    gemini_ohlc = response.json()
 | 
			
		||||
    candle_stick_data = {
 | 
			
		||||
        'exchange': 'gemini',
 | 
			
		||||
        'timestamp': gemini_ohlc[0][0],
 | 
			
		||||
        'open': gemini_ohlc[0][1],
 | 
			
		||||
        'high': gemini_ohlc[0][2],
 | 
			
		||||
        'low': gemini_ohlc[0][3],
 | 
			
		||||
        'close': gemini_ohlc[0][4],
 | 
			
		||||
        'volume_quote': 0,  # not provided by Gemini API
 | 
			
		||||
        'volume_base': gemini_ohlc[0][5],
 | 
			
		||||
        'trades': 0,  # not provided by Gemini API
 | 
			
		||||
    }
 | 
			
		||||
    gemini_json = json.dumps(candle_stick_data, indent=2)
 | 
			
		||||
    #print("Gemini: OK")
 | 
			
		||||
    #print(gemini_json)
 | 
			
		||||
    #q.put("Gemini: OK")
 | 
			
		||||
    return gemini_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Gemini API: {response.status_code}")
 | 
			
		||||
    q.put("Gemini: ERROR")
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def write_dict_to_database(in_dict, connection):
 | 
			
		||||
  cursor = connection.cursor()
 | 
			
		||||
  # Use placeholders for the values in the INSERT statement
 | 
			
		||||
  insert_query = "INSERT INTO ohlc (exchange, timestamp, open, high, low, close, volume_quote, volume_base, trades) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
 | 
			
		||||
 | 
			
		||||
  values = (in_dict['exchange'], 
 | 
			
		||||
            in_dict['timestamp'], 
 | 
			
		||||
            in_dict['open'], 
 | 
			
		||||
            in_dict['high'], 
 | 
			
		||||
            in_dict['low'], 
 | 
			
		||||
            in_dict['close'], 
 | 
			
		||||
            in_dict['volume_quote'], 
 | 
			
		||||
            in_dict['volume_base'], 
 | 
			
		||||
            in_dict['trades'])
 | 
			
		||||
  ## apply lock while writing to database
 | 
			
		||||
  with database_lock:
 | 
			
		||||
    cursor.execute(insert_query, values)
 | 
			
		||||
    connection.commit()
 | 
			
		||||
 | 
			
		||||
def get_the_data(q):
 | 
			
		||||
   #cursor = db.cursor()
 | 
			
		||||
   while True:  
 | 
			
		||||
    #logline = "Fetching at: " + str(time.time())
 | 
			
		||||
    #q.put(logline)
 | 
			
		||||
    db = sqlite3.connect(database)
 | 
			
		||||
    write_dict_to_database(json.loads(fetch_kraken()), db)
 | 
			
		||||
    write_dict_to_database(json.loads(fetch_bitfinex(q)), db)
 | 
			
		||||
    write_dict_to_database(json.loads(fetch_bitstamp(q)), db)
 | 
			
		||||
    write_dict_to_database(json.loads(fetch_gemini(q)), db)
 | 
			
		||||
    db.close()
 | 
			
		||||
    time.sleep(290)
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
Checkthedatabase()
 | 
			
		||||
 | 
			
		||||
# Empty response json
 | 
			
		||||
empty_dict = {"exchange": "", "timestamp": 0, "open": 0, "high": 0, "low": 0, "close": 0, "volume_quote": 0, "volume_base": 0, "trades": 0}
 | 
			
		||||
empty_json = json.dumps(empty_dict)
 | 
			
		||||
database_lock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
fetch_thread = threading.Thread()
 | 
			
		||||
def get_health():
 | 
			
		||||
  if fetch_thread.is_alive():
 | 
			
		||||
    return "Alive"
 | 
			
		||||
  else:
 | 
			
		||||
    return "Dead"
 | 
			
		||||
 | 
			
		||||
def start(q):
 | 
			
		||||
  logline = "Started at " + str(time.time())
 | 
			
		||||
  q.put(logline)
 | 
			
		||||
  fetch_thread.target = get_the_data
 | 
			
		||||
  fetch_thread.args = (q, )
 | 
			
		||||
  fetch_thread.daemon = True
 | 
			
		||||
  fetch_thread.start()
 | 
			
		||||
  logline = "Fetcher ID: " + str(fetch_thread.ident) + " or " + str(fetch_thread.native_id)
 | 
			
		||||
  q.put(logline)
 | 
			
		||||
 | 
			
		||||
  db = sqlite3.connect(database)
 | 
			
		||||
  lastID_andTime = db.execute("SELECT id, timestamp FROM ohlc LIMIT 1 OFFSET (SELECT COUNT(*) FROM ohlc) - 1").fetchall()
 | 
			
		||||
  #q.put(json.dumps(lastID_andTime, indent=2, separators=(',', ':')))
 | 
			
		||||
  q.put(json.dumps(lastID_andTime, separators=(',', ':')))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -99,11 +99,11 @@ def fetch_bitstamp():
 | 
			
		||||
    
 | 
			
		||||
    bitstamp_json = json.dumps(candle_stick_data, indent=2)
 | 
			
		||||
    #print("Bitstamp: OK")
 | 
			
		||||
    #print(bitstamp_json)
 | 
			
		||||
 #   print(bitstamp_json)
 | 
			
		||||
    return bitstamp_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Bitstamp API: {response.status_code}")
 | 
			
		||||
    return "Bitstamp: ERROR"
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def fetch_bitfinex():
 | 
			
		||||
## Bitfinex
 | 
			
		||||
@@ -129,7 +129,7 @@ def fetch_bitfinex():
 | 
			
		||||
    return bitfinex_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Bitfinex API: {response.status_code}")
 | 
			
		||||
    return "Bitfinex: ERROR"
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def fetch_gemini():
 | 
			
		||||
## Gemini
 | 
			
		||||
@@ -154,7 +154,7 @@ def fetch_gemini():
 | 
			
		||||
    return gemini_json
 | 
			
		||||
  else:
 | 
			
		||||
    print(f"Error fetching data from Gemini API: {response.status_code}")
 | 
			
		||||
    return "Gemini: ERROR"
 | 
			
		||||
    return empty_json
 | 
			
		||||
 | 
			
		||||
def write_dict_to_database(in_dict, connection):
 | 
			
		||||
  cursor = connection.cursor()
 | 
			
		||||
@@ -189,6 +189,10 @@ def get_the_data():
 | 
			
		||||
 | 
			
		||||
Checkthedatabase()
 | 
			
		||||
 | 
			
		||||
# Empty response json
 | 
			
		||||
empty_dict = {"exchange": "", "timestamp": 0, "open": 0, "high": 0, "low": 0, "close": 0, "volume_quote": 0, "volume_base": 0, "trades": 0}
 | 
			
		||||
empty_json = json.dumps(empty_dict)
 | 
			
		||||
 | 
			
		||||
database_lock = threading.Lock()
 | 
			
		||||
# make this run in thread and allow contium to http api stuff
 | 
			
		||||
fetch_thread = threading.Thread(target=get_the_data)
 | 
			
		||||
@@ -200,7 +204,7 @@ count = 1
 | 
			
		||||
while True:
 | 
			
		||||
  if count % 10 == 0:
 | 
			
		||||
    print("------- status ", count, "-------")
 | 
			
		||||
    top2_rows = db.execute("SELECT * FROM ohlc LIMIT 2").fetchall()
 | 
			
		||||
    top2_rows = db.execute("SELECT * FROM ohlc LIMIT 2 OFFSET (SELECT COUNT(*) FROM ohlc) - 2").fetchall()
 | 
			
		||||
    print(json.dumps(top2_rows, indent=4))
 | 
			
		||||
  else:
 | 
			
		||||
    output = str(count) + ".. "
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										31
									
								
								btc_tracker/the_server.py
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										31
									
								
								btc_tracker/the_server.py
									
									
									
									
									
										Executable file
									
								
							@@ -0,0 +1,31 @@
 | 
			
		||||
#!/usr/bin/python3
 | 
			
		||||
import fetcher
 | 
			
		||||
import time
 | 
			
		||||
from queue import Queue
 | 
			
		||||
from flask import Flask
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Create a queue to get some info how the fetcher is doing
 | 
			
		||||
q = Queue()
 | 
			
		||||
# Start the data collecting
 | 
			
		||||
fetcher.start(q)
 | 
			
		||||
 | 
			
		||||
# Initialize the Flask app
 | 
			
		||||
app = Flask(__name__)
 | 
			
		||||
 | 
			
		||||
@app.route("/")
 | 
			
		||||
def root():
 | 
			
		||||
  if not q.empty():
 | 
			
		||||
    data = q.get()
 | 
			
		||||
    return(str(data))
 | 
			
		||||
  else:
 | 
			
		||||
    return("Fetcher message queue is empty")
 | 
			
		||||
 | 
			
		||||
@app.route("/fetcher_health")
 | 
			
		||||
def fetcher_health():
 | 
			
		||||
  health = fetcher.get_health()
 | 
			
		||||
  return(str(health))
 | 
			
		||||
 | 
			
		||||
# Run the app
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
  app.run()
 | 
			
		||||
							
								
								
									
										62
									
								
								letters/Flask_and_backend_process.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										62
									
								
								letters/Flask_and_backend_process.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,62 @@
 | 
			
		||||
from flask import Flask
 | 
			
		||||
import threading
 | 
			
		||||
import sqlite3
 | 
			
		||||
 | 
			
		||||
app = Flask(__name__)
 | 
			
		||||
 | 
			
		||||
# Create a lock for synchronizing access to the database
 | 
			
		||||
lock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
def my_background_process():
 | 
			
		||||
    # Acquire the lock
 | 
			
		||||
    lock.acquire()
 | 
			
		||||
 | 
			
		||||
    # Connect to the in-memory database
 | 
			
		||||
    conn = sqlite3.connect(':memory:')
 | 
			
		||||
    c = conn.cursor()
 | 
			
		||||
 | 
			
		||||
    # Create a table in the database
 | 
			
		||||
    c.execute("CREATE TABLE mytable (col1 INTEGER, col2 TEXT)")
 | 
			
		||||
 | 
			
		||||
    # Write to the database
 | 
			
		||||
    c.execute("INSERT INTO mytable (col1, col2) VALUES (?, ?)", (value1, value2))
 | 
			
		||||
    conn.commit()
 | 
			
		||||
 | 
			
		||||
    # Close the connection
 | 
			
		||||
    conn.close()
 | 
			
		||||
 | 
			
		||||
    # Release the lock
 | 
			
		||||
    lock.release()
 | 
			
		||||
 | 
			
		||||
@app.route('/')
 | 
			
		||||
def read_from_database():
 | 
			
		||||
    # Acquire the lock
 | 
			
		||||
    lock.acquire()
 | 
			
		||||
 | 
			
		||||
    # Connect to the in-memory database
 | 
			
		||||
    conn = sqlite3.connect(':memory:')
 | 
			
		||||
    c = conn.cursor()
 | 
			
		||||
 | 
			
		||||
    # Read from the database
 | 
			
		||||
    c.execute("SELECT * FROM mytable")
 | 
			
		||||
    rows = c.fetchall()
 | 
			
		||||
 | 
			
		||||
    # Close the connection
 | 
			
		||||
    conn.close()
 | 
			
		||||
 | 
			
		||||
    # Release the lock
 | 
			
		||||
    lock.release()
 | 
			
		||||
 | 
			
		||||
    # Return the rows to the client
 | 
			
		||||
    return rows
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    # Create a new thread for the background process
 | 
			
		||||
    thread = threading.Thread(target=my_background_process)
 | 
			
		||||
 | 
			
		||||
    # Start the thread
 | 
			
		||||
    thread.start()
 | 
			
		||||
 | 
			
		||||
    # Start the Flask app
 | 
			
		||||
    app.run()
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user