wireguard_peer_manager/kiss/toml-manager/client.py

218 lines
8.1 KiB
Python
Raw Permalink Normal View History

import argparse
import requests
import json
import sys
import os
import base64
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
BASE_URL = "http://localhost:8080" # Update this if your server is running on a different host or port
class SecureApiClient:
def __init__(self, server_public_key, client_private_key):
self.server_public_key = server_public_key
self.client_private_key = client_private_key
def generate_symmetric_key(self):
return os.urandom(32) # AES 256-bit key
def encrypt_symmetric_key(self, symmetric_key):
encrypted_symmetric_key = self.server_public_key.encrypt(
symmetric_key,
ec.ECIES(hashes.SHA256())
)
return encrypted_symmetric_key
def encrypt_data(self, data, symmetric_key):
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(json.dumps(data).encode('utf-8')) + encryptor.finalize()
return iv + encrypted_data
def decrypt_data(self, encrypted_data, symmetric_key):
encrypted_data = base64.b64decode(encrypted_data)
iv = encrypted_data[:16]
cipher = Cipher(algorithms.AES(symmetric_key), modes.CFB(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data[16:]) + decryptor.finalize()
return decrypted_data.decode('utf-8')
def sign_data(self, data):
signature = self.client_private_key.sign(
data,
ec.ECDSA(hashes.SHA256())
)
return base64.b64encode(signature).decode('utf-8')
def create_jwe(self, encrypted_symmetric_key, encrypted_data, signature):
token = base64.b64encode(json.dumps({
'enc_sym_key': base64.b64encode(encrypted_symmetric_key).decode('utf-8'),
'data': base64.b64encode(encrypted_data).decode('utf-8'),
'signature': signature
}).encode()).decode()
return f"eyJhbGciOiJub25lIn0.{token}." # Add header and empty signature
def make_request(self, method, endpoint, data):
symmetric_key = self.generate_symmetric_key()
encrypted_symmetric_key = self.encrypt_symmetric_key(symmetric_key)
encrypted_data = self.encrypt_data(data, symmetric_key)
signature = self.sign_data(encrypted_data)
jwe_token = self.create_jwe(encrypted_symmetric_key, encrypted_data, signature)
headers = {
'Authorization': f'Bearer {jwe_token}',
'Content-Type': 'application/octet-stream'
}
response = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, data=encrypted_data)
if response.status_code in (200, 201):
# Decrypt the response body using the same symmetric key
encrypted_response_data = response.json().get('data')
if encrypted_response_data:
decrypted_response = self.decrypt_data(encrypted_response_data, symmetric_key)
return json.loads(decrypted_response)
else:
print("Error: Encrypted response data not found.")
return None
else:
print(f"Error: {response.status_code} - {response.text}")
return None
return response
def get_server_public_key():
response = requests.get(f"{BASE_URL}/public_key")
if response.status_code == 200:
public_key_pem = response.json()['public_key']
return serialization.load_pem_public_key(public_key_pem.encode(), backend=default_backend())
else:
print(f"Error getting server public key: {response.status_code} - {response.text}")
sys.exit(1)
def load_client_private_key(key_path):
with open(key_path, "rb") as key_file:
return serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
def create_or_update_peer(client, public_key, allowed_ips):
data = {
"action": "add_peer" if not public_key else "update_peer",
"peer": {
"PublicKey": public_key,
"AllowedIPs": allowed_ips
}
}
if public_key:
data["public_key"] = public_key
response = client.make_request("POST" if not public_key else "PUT", "/peers", data)
if response.status_code in (200, 201):
print(response.json()['message'])
else:
print(f"Error: {response.status_code} - {response.text}")
def delete_peer(client, public_key):
data = {
"action": "delete_peer",
"public_key": public_key
}
response = client.make_request("DELETE", "/peers", data)
if response.status_code == 200:
print(response.json()['message'])
else:
print(f"Error: {response.status_code} - {response.text}")
def list_peers(client):
response = requests.get(f"{BASE_URL}/peers")
if response.status_code == 200:
peers = response.json()
print(json.dumps(peers, indent=2))
else:
print(f"Error: {response.status_code} - {response.text}")
def restore_config(client):
data = {
"action": "restore"
}
response = client.make_request("POST", "/restore", data)
if response.status_code == 200:
print(response.json()['message'])
else:
print(f"Error: {response.status_code} - {response.text}")
def generate_client_keys():
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
public_key = private_key.public_key()
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
with open("client_private_key.pem", "wb") as f:
f.write(private_pem)
with open("client_public_key.pem", "wb") as f:
f.write(public_pem)
print("Client keys generated successfully.")
print("Private key saved to: client_private_key.pem")
print("Public key saved to: client_public_key.pem")
print("\nIMPORTANT: Add the following to the server's config.toml file:")
print("\n[client_keys]")
print(f'public_key = """\n{public_pem.decode()}"""')
print("\nAfter adding the key, restart the server for the changes to take effect.")
def main():
parser = argparse.ArgumentParser(description="WireGuard Config Manager Client")
parser.add_argument("action", choices=["create", "update", "delete", "list", "restore", "generate_keys"], help="Action to perform")
parser.add_argument("--public-key", help="Public key of the peer")
parser.add_argument("--allowed-ips", help="Allowed IPs for the peer")
parser.add_argument("--private-key", default="client_private_key.pem", help="Path to client's private key file")
args = parser.parse_args()
if args.action == "generate_keys":
generate_client_keys()
return
server_public_key = get_server_public_key()
client_private_key = load_client_private_key(args.private_key)
client = SecureApiClient(server_public_key, client_private_key)
if args.action in ["create", "update"]:
if not args.public_key or not args.allowed_ips:
print("Error: Both --public-key and --allowed-ips are required for create/update actions.")
sys.exit(1)
create_or_update_peer(client, args.public_key, args.allowed_ips)
elif args.action == "delete":
if not args.public_key:
print("Error: --public-key is required for delete action.")
sys.exit(1)
delete_peer(client, args.public_key)
elif args.action == "list":
list_peers(client)
elif args.action == "restore":
restore_config(client)
if __name__ == "__main__":
main()