From 8cbe334f1b12f58a4ee0814bd84668e40b69c885 Mon Sep 17 00:00:00 2001 From: kalzu rekku Date: Tue, 22 Oct 2024 22:21:17 +0300 Subject: [PATCH] quick, maybe working, client.py (updatet from the wpm_client.py) --- kiss/toml-manager/client.py | 217 ++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 kiss/toml-manager/client.py diff --git a/kiss/toml-manager/client.py b/kiss/toml-manager/client.py new file mode 100644 index 0000000..f048350 --- /dev/null +++ b/kiss/toml-manager/client.py @@ -0,0 +1,217 @@ +import argparse +import requests +import json +import sys +import os +import base64 +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization + +BASE_URL = "http://localhost:8080" # Update this if your server is running on a different host or port + +class SecureApiClient: + def __init__(self, server_public_key, client_private_key): + self.server_public_key = server_public_key + self.client_private_key = client_private_key + + def generate_symmetric_key(self): + return os.urandom(32) # AES 256-bit key + + def encrypt_symmetric_key(self, symmetric_key): + encrypted_symmetric_key = self.server_public_key.encrypt( + symmetric_key, + ec.ECIES(hashes.SHA256()) + ) + return encrypted_symmetric_key + + def encrypt_data(self, data, symmetric_key): + iv = os.urandom(16) + cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend()) + encryptor = cipher.encryptor() + encrypted_data = encryptor.update(json.dumps(data).encode('utf-8')) + encryptor.finalize() + return iv + encrypted_data + + def decrypt_data(self, encrypted_data, symmetric_key): + encrypted_data = base64.b64decode(encrypted_data) + iv = encrypted_data[:16] + cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend()) + decryptor = cipher.decryptor() + decrypted_data = decryptor.update(encrypted_data[16:]) + decryptor.finalize() + return decrypted_data.decode('utf-8') + + def sign_data(self, data): + signature = self.client_private_key.sign( + data, + ec.ECDSA(hashes.SHA256()) + ) + return base64.b64encode(signature).decode('utf-8') + + def create_jwe(self, encrypted_symmetric_key, encrypted_data, signature): + token = base64.b64encode(json.dumps({ + 'enc_sym_key': base64.b64encode(encrypted_symmetric_key).decode('utf-8'), + 'data': base64.b64encode(encrypted_data).decode('utf-8'), + 'signature': signature + }).encode()).decode() + return f"eyJhbGciOiJub25lIn0.{token}." # Add header and empty signature + + def make_request(self, method, endpoint, data): + symmetric_key = self.generate_symmetric_key() + encrypted_symmetric_key = self.encrypt_symmetric_key(symmetric_key) + encrypted_data = self.encrypt_data(data, symmetric_key) + signature = self.sign_data(encrypted_data) + jwe_token = self.create_jwe(encrypted_symmetric_key, encrypted_data, signature) + + headers = { + 'Authorization': f'Bearer {jwe_token}', + 'Content-Type': 'application/octet-stream' + } + + response = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, data=encrypted_data) + + if response.status_code in (200, 201): + # Decrypt the response body using the same symmetric key + encrypted_response_data = response.json().get('data') + if encrypted_response_data: + decrypted_response = self.decrypt_data(encrypted_response_data, symmetric_key) + return json.loads(decrypted_response) + else: + print("Error: Encrypted response data not found.") + return None + else: + print(f"Error: {response.status_code} - {response.text}") + return None + return response + +def get_server_public_key(): + response = requests.get(f"{BASE_URL}/public_key") + if response.status_code == 200: + public_key_pem = response.json()['public_key'] + return serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend()) + else: + print(f"Error getting server public key: {response.status_code} - {response.text}") + sys.exit(1) + +def load_client_private_key(key_path): + with open(key_path, "rb") as key_file: + return serialization.load_pem_private_key( + key_file.read(), + password=None, + backend=default_backend() + ) + +def create_or_update_peer(client, public_key, allowed_ips): + data = { + "action": "add_peer" if not public_key else "update_peer", + "peer": { + "PublicKey": public_key, + "AllowedIPs": allowed_ips + } + } + if public_key: + data["public_key"] = public_key + + response = client.make_request("POST" if not public_key else "PUT", "/peers", data) + + if response.status_code in (200, 201): + print(response.json()['message']) + else: + print(f"Error: {response.status_code} - {response.text}") + +def delete_peer(client, public_key): + data = { + "action": "delete_peer", + "public_key": public_key + } + response = client.make_request("DELETE", "/peers", data) + + if response.status_code == 200: + print(response.json()['message']) + else: + print(f"Error: {response.status_code} - {response.text}") + +def list_peers(client): + response = requests.get(f"{BASE_URL}/peers") + + if response.status_code == 200: + peers = response.json() + print(json.dumps(peers, indent=2)) + else: + print(f"Error: {response.status_code} - {response.text}") + +def restore_config(client): + data = { + "action": "restore" + } + response = client.make_request("POST", "/restore", data) + + if response.status_code == 200: + print(response.json()['message']) + else: + print(f"Error: {response.status_code} - {response.text}") + +def generate_client_keys(): + private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) + public_key = private_key.public_key() + + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + + public_pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + + with open("client_private_key.pem", "wb") as f: + f.write(private_pem) + + with open("client_public_key.pem", "wb") as f: + f.write(public_pem) + + print("Client keys generated successfully.") + print("Private key saved to: client_private_key.pem") + print("Public key saved to: client_public_key.pem") + print("\nIMPORTANT: Add the following to the server's config.toml file:") + print("\n[client_keys]") + print(f'public_key = """\n{public_pem.decode()}"""') + print("\nAfter adding the key, restart the server for the changes to take effect.") + +def main(): + parser = argparse.ArgumentParser(description="WireGuard Config Manager Client") + parser.add_argument("action", choices=["create", "update", "delete", "list", "restore", "generate_keys"], help="Action to perform") + parser.add_argument("--public-key", help="Public key of the peer") + parser.add_argument("--allowed-ips", help="Allowed IPs for the peer") + parser.add_argument("--private-key", default="client_private_key.pem", help="Path to client's private key file") + + args = parser.parse_args() + + if args.action == "generate_keys": + generate_client_keys() + return + + server_public_key = get_server_public_key() + client_private_key = load_client_private_key(args.private_key) + client = SecureApiClient(server_public_key, client_private_key) + + if args.action in ["create", "update"]: + if not args.public_key or not args.allowed_ips: + print("Error: Both --public-key and --allowed-ips are required for create/update actions.") + sys.exit(1) + create_or_update_peer(client, args.public_key, args.allowed_ips) + elif args.action == "delete": + if not args.public_key: + print("Error: --public-key is required for delete action.") + sys.exit(1) + delete_peer(client, args.public_key) + elif args.action == "list": + list_peers(client) + elif args.action == "restore": + restore_config(client) + +if __name__ == "__main__": + main()