import argparse import requests import json import sys import os import base64 from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization BASE_URL = "http://localhost:8080" # Update this if your server is running on a different host or port class SecureApiClient: def __init__(self, server_public_key, client_private_key): self.server_public_key = server_public_key self.client_private_key = client_private_key def generate_symmetric_key(self): return os.urandom(32) # AES 256-bit key def encrypt_symmetric_key(self, symmetric_key): encrypted_symmetric_key = self.server_public_key.encrypt( symmetric_key, ec.ECIES(hashes.SHA256()) ) return encrypted_symmetric_key def encrypt_data(self, data, symmetric_key): iv = os.urandom(16) cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend()) encryptor = cipher.encryptor() encrypted_data = encryptor.update(json.dumps(data).encode('utf-8')) + encryptor.finalize() return iv + encrypted_data def decrypt_data(self, encrypted_data, symmetric_key): encrypted_data = base64.b64decode(encrypted_data) iv = encrypted_data[:16] cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted_data = decryptor.update(encrypted_data[16:]) + decryptor.finalize() return decrypted_data.decode('utf-8') def sign_data(self, data): signature = self.client_private_key.sign( data, ec.ECDSA(hashes.SHA256()) ) return base64.b64encode(signature).decode('utf-8') def create_jwe(self, encrypted_symmetric_key, encrypted_data, signature): token = base64.b64encode(json.dumps({ 'enc_sym_key': base64.b64encode(encrypted_symmetric_key).decode('utf-8'), 'data': base64.b64encode(encrypted_data).decode('utf-8'), 'signature': signature }).encode()).decode() return f"eyJhbGciOiJub25lIn0.{token}." # Add header and empty signature def make_request(self, method, endpoint, data): symmetric_key = self.generate_symmetric_key() encrypted_symmetric_key = self.encrypt_symmetric_key(symmetric_key) encrypted_data = self.encrypt_data(data, symmetric_key) signature = self.sign_data(encrypted_data) jwe_token = self.create_jwe(encrypted_symmetric_key, encrypted_data, signature) headers = { 'Authorization': f'Bearer {jwe_token}', 'Content-Type': 'application/octet-stream' } response = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, data=encrypted_data) if response.status_code in (200, 201): # Decrypt the response body using the same symmetric key encrypted_response_data = response.json().get('data') if encrypted_response_data: decrypted_response = self.decrypt_data(encrypted_response_data, symmetric_key) return json.loads(decrypted_response) else: print("Error: Encrypted response data not found.") return None else: print(f"Error: {response.status_code} - {response.text}") return None return response def get_server_public_key(): response = requests.get(f"{BASE_URL}/public_key") if response.status_code == 200: public_key_pem = response.json()['public_key'] return serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend()) else: print(f"Error getting server public key: {response.status_code} - {response.text}") sys.exit(1) def load_client_private_key(key_path): with open(key_path, "rb") as key_file: return serialization.load_pem_private_key( key_file.read(), password=None, backend=default_backend() ) def create_or_update_peer(client, public_key, allowed_ips): data = { "action": "add_peer" if not public_key else "update_peer", "peer": { "PublicKey": public_key, "AllowedIPs": allowed_ips } } if public_key: data["public_key"] = public_key response = client.make_request("POST" if not public_key else "PUT", "/peers", data) if response.status_code in (200, 201): print(response.json()['message']) else: print(f"Error: {response.status_code} - {response.text}") def delete_peer(client, public_key): data = { "action": "delete_peer", "public_key": public_key } response = client.make_request("DELETE", "/peers", data) if response.status_code == 200: print(response.json()['message']) else: print(f"Error: {response.status_code} - {response.text}") def list_peers(client): response = requests.get(f"{BASE_URL}/peers") if response.status_code == 200: peers = response.json() print(json.dumps(peers, indent=2)) else: print(f"Error: {response.status_code} - {response.text}") def restore_config(client): data = { "action": "restore" } response = client.make_request("POST", "/restore", data) if response.status_code == 200: print(response.json()['message']) else: print(f"Error: {response.status_code} - {response.text}") def generate_client_keys(): private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) public_key = private_key.public_key() private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) public_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) with open("client_private_key.pem", "wb") as f: f.write(private_pem) with open("client_public_key.pem", "wb") as f: f.write(public_pem) print("Client keys generated successfully.") print("Private key saved to: client_private_key.pem") print("Public key saved to: client_public_key.pem") print("\nIMPORTANT: Add the following to the server's config.toml file:") print("\n[client_keys]") print(f'public_key = """\n{public_pem.decode()}"""') print("\nAfter adding the key, restart the server for the changes to take effect.") def main(): parser = argparse.ArgumentParser(description="WireGuard Config Manager Client") parser.add_argument("action", choices=["create", "update", "delete", "list", "restore", "generate_keys"], help="Action to perform") parser.add_argument("--public-key", help="Public key of the peer") parser.add_argument("--allowed-ips", help="Allowed IPs for the peer") parser.add_argument("--private-key", default="client_private_key.pem", help="Path to client's private key file") args = parser.parse_args() if args.action == "generate_keys": generate_client_keys() return server_public_key = get_server_public_key() client_private_key = load_client_private_key(args.private_key) client = SecureApiClient(server_public_key, client_private_key) if args.action in ["create", "update"]: if not args.public_key or not args.allowed_ips: print("Error: Both --public-key and --allowed-ips are required for create/update actions.") sys.exit(1) create_or_update_peer(client, args.public_key, args.allowed_ips) elif args.action == "delete": if not args.public_key: print("Error: --public-key is required for delete action.") sys.exit(1) delete_peer(client, args.public_key) elif args.action == "list": list_peers(client) elif args.action == "restore": restore_config(client) if __name__ == "__main__": main()