From 937e3f272a44880cbb4e90edf310fac8280138cd Mon Sep 17 00:00:00 2001 From: kalzu rekku Date: Sat, 19 Oct 2024 23:49:07 +0300 Subject: [PATCH] Simple POC for creating easy way to manage wireguard peer configs on the server. --- README.md | 87 +++++++++++++++++ config.toml | 11 +++ wpm.py | 257 ++++++++++++++++++++++++++++++++++++++++++++++++++ wpm_client.py | 84 +++++++++++++++++ 4 files changed, 439 insertions(+) create mode 100644 README.md create mode 100644 config.toml create mode 100644 wpm.py create mode 100644 wpm_client.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..969b39a --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# Wireguard Peer Manager + +This is simple CURD for managing wireguard peer notations on a wireguard server config. + +## Requirements + +- Python 3.x +- `requests` library (for the client) +- WireGuard (`wg-quick` and `wg` commands must be available on the server) + +## Server: wpm.py + +### How to Run the Server + +`python wpm.py` + +### Endpoints + + GET /peers: List all peers. + POST /peers: Add a new peer. + PUT /peers/: Update an existing peer. + DELETE /peers/: Delete an existing peer. + POST /restore: Restore the WireGuard configuration from a backup. + + +## Client: wpm_client.py + +The client script allows interaction with the WireGuard Peer Management API. + +### Usage + + +python wpm_client.py [options] + +### Available Actions + + create: Create a new peer. + Required options: --public-key, --allowed-ips + update: Update an existing peer. + Required options: --public-key, --allowed-ips + delete: Delete a peer by its public key. + Required options: --public-key + list: List all peers. + restore: Restore the WireGuard configuration from the most recent backup. + +### Example Usage + + List Peers: + +``` +python wpm_client.py list + +``` + + Create a New Peer: +``` +python wpm_client.py create --public-key "" --allowed-ips "10.0.0.2/32" + +``` + + Update an Existing Peer: +``` +python wpm_client.py update --public-key "" --allowed-ips "10.0.0.3/32" + +``` + + Delete a Peer: +``` +python wpm_client.py delete --public-key "" + +``` + + Restore Configuration: + +``` +python wpm_client.py restore + +``` + +### Backup and Restore + +The server automatically creates a backup before making any changes to the WireGuard configuration. The backups are stored in the same directory as the configuration file, inside a backups/ folder. + +You can restore the latest backup by sending a POST /restore request, which can be done using the client or via curl: + +curl -X POST http://localhost:8000/restore + diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..5270910 --- /dev/null +++ b/config.toml @@ -0,0 +1,11 @@ +[server] +host = "127.0.0.1" # IP address to bind to +port = 8080 # Port to bind to + +[wireguard] +config_file = "../wireguard_example_server_config.conf" + +[logging] +log_file = "../wpm.log" # Optional: Log file location +debug = true # Enable debug logging + diff --git a/wpm.py b/wpm.py new file mode 100644 index 0000000..e4c7383 --- /dev/null +++ b/wpm.py @@ -0,0 +1,257 @@ +import http.server +import socketserver +import json +import re +import urllib.parse +import tomllib +import logging +import subprocess +import shutil +from datetime import datetime +from typing import Dict, List, Tuple +from pathlib import Path + +# Default configuration +DEFAULT_CONFIG = { + "server": { + "host": "0.0.0.0", + "port": 8000, + }, + "wireguard": { + "config_file": "/etc/wireguard/wg0.conf", + }, + "logging": { + "log_file": None, + "debug": False, + } +} + +def load_config(config_file: str) -> dict: + try: + with open(config_file, "rb") as f: + config = tomllib.load(f) + # Merge with default config + return {**DEFAULT_CONFIG, **config} + except FileNotFoundError: + logging.warning(f"Config file {config_file} not found. Using default configuration.") + return DEFAULT_CONFIG + except tomllib.TOMLDecodeError as e: + logging.error(f"Error parsing config file: {e}") + exit(1) + +CONFIG = load_config("config.toml") + +# Set up logging +logging.basicConfig( + level=logging.DEBUG if CONFIG["logging"]["debug"] else logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + filename=CONFIG["logging"]["log_file"], +) + +def read_config() -> str: + with open(CONFIG["wireguard"]["config_file"], 'r') as file: + return file.read() + +def write_config(content: str) -> None: + with open(CONFIG["wireguard"]["config_file"], 'w') as file: + file.write(content) + +def create_backup(): + config_file = Path(CONFIG["wireguard"]["config_file"]) + backup_dir = config_file.parent / "backups" + backup_dir.mkdir(exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = backup_dir / f"{config_file.stem}_{timestamp}.conf" + + shutil.copy2(config_file, backup_file) + logging.info(f"Backup created: {backup_file}") + return backup_file + +def restore_from_backup(): + config_file = Path(CONFIG["wireguard"]["config_file"]) + backup_dir = config_file.parent / "backups" + + if not backup_dir.exists() or not list(backup_dir.glob("*.conf")): + logging.error("No backups found") + return False + + latest_backup = max(backup_dir.glob("*.conf"), key=lambda f: f.stat().st_mtime) + shutil.copy2(latest_backup, config_file) + logging.info(f"Restored from backup: {latest_backup}") + return True + +def parse_peers(config: str) -> List[Dict[str, str]]: + peers = [] + current_peer = None + for line in config.split('\n'): + line = line.strip() + if line == "[Peer]": + if current_peer: + peers.append(current_peer) + current_peer = {} + elif current_peer is not None and '=' in line: + key, value = line.split('=', 1) + current_peer[key.strip()] = value.strip() + if current_peer: + peers.append(current_peer) + return peers + +def peer_to_string(peer: Dict[str, str]) -> str: + return f"[Peer]\n" + "\n".join(f"{k} = {v}" for k, v in peer.items()) + +def reload_wireguard_service(): + interface = Path(CONFIG["wireguard"]["config_file"]).stem + + try: + # Check if wg-quick is available + subprocess.run(["which", "wg-quick"], check=True, capture_output=True) + except subprocess.CalledProcessError: + logging.warning("wg-quick not found. WireGuard might not be installed.") + return False, "WireGuard (wg-quick) not found. Please ensure WireGuard is installed." + + try: + # Check if the interface is up + result = subprocess.run(["wg", "show", interface], capture_output=True, text=True) + if result.returncode == 0: + # Interface is up, we'll restart it + subprocess.run(["wg-quick", "down", interface], check=True) + subprocess.run(["wg-quick", "up", interface], check=True) + logging.info(f"WireGuard service for interface {interface} restarted successfully") + else: + # Interface is down, we'll just bring it up + subprocess.run(["wg-quick", "up", interface], check=True) + logging.info(f"WireGuard service for interface {interface} started successfully") + + return True, f"WireGuard service for interface {interface} reloaded successfully" + except subprocess.CalledProcessError as e: + error_message = f"Failed to reload WireGuard service: {e}" + if e.stderr: + error_message += f"\nError details: {e.stderr.decode('utf-8')}" + logging.error(error_message) + return False, error_message + +class WireGuardHandler(http.server.SimpleHTTPRequestHandler): + def _send_response(self, status_code: int, data: dict) -> None: + self.send_response(status_code) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(json.dumps(data).encode()) + + def do_GET(self) -> None: + if self.path == '/peers': + config = read_config() + peers = parse_peers(config) + self._send_response(200, peers) + else: + self._send_response(404, {"error": "Not found"}) + + def do_POST(self) -> None: + if self.path == '/peers': + create_backup() # Create a backup before making changes + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + new_peer = json.loads(post_data.decode('utf-8')) + + config = read_config() + config += "\n\n" + peer_to_string(new_peer) + write_config(config) + + if reload_wireguard_service(): + self._send_response(201, {"message": "Peer added successfully and service reloaded"}) + else: + self._send_response(500, {"error": "Peer added but failed to reload service"}) + elif self.path == '/restore': + if restore_from_backup(): + if reload_wireguard_service(): + self._send_response(200, {"message": "Configuration restored from backup and service reloaded"}) + else: + self._send_response(500, {"error": "Configuration restored but failed to reload service"}) + else: + self._send_response(500, {"error": "Failed to restore from backup"}) + else: + self._send_response(404, {"error": "Not found"}) + + def do_PUT(self) -> None: + path_parts = self.path.split('/') + if len(path_parts) == 3 and path_parts[1] == 'peers': + create_backup() # Create a backup before making changes + public_key = urllib.parse.unquote(path_parts[2]) + content_length = int(self.headers['Content-Length']) + put_data = self.rfile.read(content_length) + updated_peer = json.loads(put_data.decode('utf-8')) + + config = read_config() + peers = parse_peers(config) + + peer_found = False + for i, peer in enumerate(peers): + if peer.get('PublicKey') == public_key: + peer_found = True + # Update the peer + peers[i] = updated_peer + new_config = re.sub(r'(\[Interface\].*?\n\n)(.*)', + r'\1' + '\n\n'.join(peer_to_string(p) for p in peers), + config, + flags=re.DOTALL) + write_config(new_config) + + # Reload WireGuard service and send the appropriate response + success, message = reload_wireguard_service() + if success: + self._send_response(200, {"message": "Peer updated successfully and service reloaded"}) + else: + self._send_response(500, {"error": f"Peer updated but failed to reload service: {message}"}) + break + + if not peer_found: + # If no peer with the given public key was found, return a 404 response + self._send_response(404, {"error": "Peer not found"}) + else: + self._send_response(404, {"error": "Not found"}) + + def do_DELETE(self) -> None: + path_parts = self.path.split('/') + if len(path_parts) == 3 and path_parts[1] == 'peers': + create_backup() # Create a backup before making changes + public_key = urllib.parse.unquote(path_parts[2]) + + config = read_config() + peers = parse_peers(config) + + peer_found = False + for peer in peers: + if peer.get('PublicKey') == public_key: + peer_found = True + # Remove the peer + peers.remove(peer) + new_config = re.sub(r'(\[Interface\].*?\n\n)(.*)', + r'\1' + '\n\n'.join(peer_to_string(p) for p in peers), + config, + flags=re.DOTALL) + write_config(new_config) + + # Reload WireGuard service and send the appropriate response + success, message = reload_wireguard_service() + if success: + self._send_response(200, {"message": "Peer deleted successfully and service reloaded"}) + else: + self._send_response(500, {"error": f"Peer deleted but failed to reload service: {message}"}) + break + + if not peer_found: + # If no peer with the given public key was found, return a 404 response + self._send_response(404, {"error": "Peer not found"}) + else: + self._send_response(404, {"error": "Not found"}) + +def run_server() -> None: + host = CONFIG["server"]["host"] + port = CONFIG["server"]["port"] + with socketserver.TCPServer((host, port), WireGuardHandler) as httpd: + logging.info(f"Serving on {host}:{port}") + httpd.serve_forever() + +if __name__ == "__main__": + run_server() + diff --git a/wpm_client.py b/wpm_client.py new file mode 100644 index 0000000..d26367b --- /dev/null +++ b/wpm_client.py @@ -0,0 +1,84 @@ +import argparse +import requests +import json +import sys + +BASE_URL = "http://localhost:8080" # Update this if your server is running on a different host or port + +def create_or_update_peer(public_key, allowed_ips): + url = f"{BASE_URL}/peers" + data = { + "PublicKey": public_key, + "AllowedIPs": allowed_ips + } + + # First, try to update an existing peer + response = requests.put(f"{url}/{public_key}", json=data) + + if response.status_code == 404: + # If the peer doesn't exist, create a new one + response = requests.post(url, json=data) + + if response.status_code in (200, 201): + result = response.json() + if "warning" in result: + print(f"Warning: {result['warning']}") + else: + print(result['message']) + else: + print(f"Error: {response.status_code} - {response.text}") + +def delete_peer(public_key): + url = f"{BASE_URL}/peers/{public_key}" + response = requests.delete(url) + + if response.status_code == 200: + print("Peer deleted successfully.") + else: + print(f"Error: {response.status_code} - {response.text}") + +def list_peers(): + url = f"{BASE_URL}/peers" + response = requests.get(url) + + if response.status_code == 200: + peers = response.json() + print(json.dumps(peers, indent=2)) + else: + print(f"Error: {response.status_code} - {response.text}") + +def restore_config(): + url = f"{BASE_URL}/restore" + response = requests.post(url) + + if response.status_code == 200: + print("Configuration restored successfully.") + else: + print(f"Error: {response.status_code} - {response.text}") + +def main(): + parser = argparse.ArgumentParser(description="WireGuard Config Manager Client") + parser.add_argument("action", choices=["create", "update", "delete", "list", "restore"], help="Action to perform") + parser.add_argument("--public-key", help="Public key of the peer") + parser.add_argument("--allowed-ips", help="Allowed IPs for the peer") + + args = parser.parse_args() + + if args.action in ["create", "update"]: + if not args.public_key or not args.allowed_ips: + print("Error: Both --public-key and --allowed-ips are required for create/update actions.") + sys.exit(1) + create_or_update_peer(args.public_key, args.allowed_ips) + elif args.action == "delete": + if not args.public_key: + print("Error: --public-key is required for delete action.") + sys.exit(1) + delete_peer(args.public_key) + elif args.action == "list": + list_peers() + elif args.action == "restore": + restore_config() + +if __name__ == "__main__": + main() +