made some work for pastedb / pastecache..
This commit is contained in:
36
letters/fifo-sample.py
Executable file
36
letters/fifo-sample.py
Executable file
@ -0,0 +1,36 @@
|
||||
#!/usr/bin/python3
|
||||
import os
|
||||
import sys
|
||||
import queue
|
||||
import threading
|
||||
|
||||
def listen_to_fifo(q):
|
||||
fifo = "/tmp/my_fifo"
|
||||
if not os.path.exists(fifo):
|
||||
os.mkfifo(fifo)
|
||||
with open(fifo, 'r') as f:
|
||||
while True:
|
||||
data = f.readline().strip()
|
||||
if not data:
|
||||
break
|
||||
q.put(data)
|
||||
|
||||
def read_queue(q):
|
||||
while True:
|
||||
data = q.get()
|
||||
if data == "reboot":
|
||||
# Restart the script
|
||||
print('## RESTARTING SCRIPT')
|
||||
os.execv(sys.executable, [sys.executable] + sys.argv)
|
||||
else:
|
||||
print(data)
|
||||
sys.stdout.flush()
|
||||
q.task_done()
|
||||
|
||||
if __name__ == '__main__':
|
||||
q = queue.Queue()
|
||||
t1 = threading.Thread(target=listen_to_fifo, args=(q,))
|
||||
t2 = threading.Thread(target=read_queue, args=(q,))
|
||||
t1.start()
|
||||
t2.start()
|
||||
q.join()
|
35
letters/fifo-sample_with-commands.py
Executable file
35
letters/fifo-sample_with-commands.py
Executable file
@ -0,0 +1,35 @@
|
||||
#!/usr/bin/python3
|
||||
import os
|
||||
import sys
|
||||
import queue
|
||||
import threading
|
||||
|
||||
command_queue = queue.Queue()
|
||||
fifo_file = "/tmp/my_fifo"
|
||||
|
||||
def listen_to_fifo(queue):
|
||||
if not os.path.exists(fifo_file):
|
||||
os.mkfifo(fifo_file)
|
||||
with open(fifo_file, 'r') as f:
|
||||
while True:
|
||||
data = f.readline().strip()
|
||||
if not data:
|
||||
break
|
||||
queue.put(data)
|
||||
|
||||
def read_queue(queue):
|
||||
while True:
|
||||
data = queue.get()
|
||||
if data == "reboot":
|
||||
fifo_file.close()
|
||||
os.execv(sys.executable, [sys.executable] + sys.argv)
|
||||
print(data)
|
||||
sys.stdout.flush()
|
||||
queue.task_done()
|
||||
|
||||
if __name__ == '__main__':
|
||||
t1 = threading.Thread(target=listen_to_fifo, args=(command_queue,))
|
||||
t2 = threading.Thread(target=read_queue, args=(command_queue,))
|
||||
t1.start()
|
||||
t2.start()
|
||||
command_queue.join()
|
27
letters/sample_function.py
Normal file
27
letters/sample_function.py
Normal file
@ -0,0 +1,27 @@
|
||||
import requests
|
||||
import json
|
||||
import ecdsa
|
||||
import binascii
|
||||
|
||||
def get_btc_ohlc_data(server_url, user_private_key):
|
||||
# Load user's ECDSA private key
|
||||
user_private_key = ecdsa.SigningKey.from_string(binascii.unhexlify(user_private_key), curve=ecdsa.SECP256k1)
|
||||
# Get server public key from endpoint
|
||||
server_public_key_hex = requests.get(server_url + "/serverkey").text
|
||||
server_public_key = ecdsa.VerifyingKey.from_string(binascii.unhexlify(server_public_key_hex), curve=ecdsa.SECP256k1)
|
||||
# Get timestamp
|
||||
timestamp = str(int(time.time()))
|
||||
# Create signature using user's private key
|
||||
signature = binascii.hexlify(user_private_key.sign(bytes(timestamp, 'utf-8'))).decode("utf-8")
|
||||
# Create authentication header
|
||||
auth_header = {"auth": timestamp + ":" + signature}
|
||||
# Make request to server with auth header
|
||||
response = requests.get(server_url + "/t", headers=auth_header)
|
||||
# Verify server's signature
|
||||
server_signature = response.headers["signature"]
|
||||
if server_public_key.verify(bytes(server_signature, 'utf-8'), bytes(timestamp, 'utf-8')):
|
||||
# If signature is valid, return json data
|
||||
return json.loads(response.text)
|
||||
else:
|
||||
# If signature is invalid, return error message
|
||||
return {"error": "Invalid signature from server"}
|
Reference in New Issue
Block a user