import http.server import socketserver import json import re import urllib.parse import tomllib import logging import subprocess import shutil from datetime import datetime from typing import Dict, List, Tuple from pathlib import Path # Default configuration DEFAULT_CONFIG = { "server": { "host": "0.0.0.0", "port": 8000, }, "wireguard": { "config_file": "/etc/wireguard/wg0.conf", }, "logging": { "log_file": None, "debug": False, } } def load_config(config_file: str) -> dict: try: with open(config_file, "rb") as f: config = tomllib.load(f) # Merge with default config return {**DEFAULT_CONFIG, **config} except FileNotFoundError: logging.warning(f"Config file {config_file} not found. Using default configuration.") return DEFAULT_CONFIG except tomllib.TOMLDecodeError as e: logging.error(f"Error parsing config file: {e}") exit(1) CONFIG = load_config("config.toml") # Set up logging logging.basicConfig( level=logging.DEBUG if CONFIG["logging"]["debug"] else logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", filename=CONFIG["logging"]["log_file"], ) def read_config() -> str: with open(CONFIG["wireguard"]["config_file"], 'r') as file: return file.read() def write_config(content: str) -> None: with open(CONFIG["wireguard"]["config_file"], 'w') as file: file.write(content) def create_backup(): config_file = Path(CONFIG["wireguard"]["config_file"]) backup_dir = config_file.parent / "backups" backup_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = backup_dir / f"{config_file.stem}_{timestamp}.conf" shutil.copy2(config_file, backup_file) logging.info(f"Backup created: {backup_file}") return backup_file def restore_from_backup(): config_file = Path(CONFIG["wireguard"]["config_file"]) backup_dir = config_file.parent / "backups" if not backup_dir.exists() or not list(backup_dir.glob("*.conf")): logging.error("No backups found") return False latest_backup = max(backup_dir.glob("*.conf"), key=lambda f: f.stat().st_mtime) shutil.copy2(latest_backup, config_file) logging.info(f"Restored from backup: {latest_backup}") return True def parse_peers(config: str) -> List[Dict[str, str]]: peers = [] current_peer = None for line in config.split('\n'): line = line.strip() if line == "[Peer]": if current_peer: peers.append(current_peer) current_peer = {} elif current_peer is not None and '=' in line: key, value = line.split('=', 1) current_peer[key.strip()] = value.strip() if current_peer: peers.append(current_peer) return peers def peer_to_string(peer: Dict[str, str]) -> str: return f"[Peer]\n" + "\n".join(f"{k} = {v}" for k, v in peer.items()) def reload_wireguard_service(): interface = Path(CONFIG["wireguard"]["config_file"]).stem try: # Check if wg-quick is available subprocess.run(["which", "wg-quick"], check=True, capture_output=True) except subprocess.CalledProcessError: logging.warning("wg-quick not found. WireGuard might not be installed.") return False, "WireGuard (wg-quick) not found. Please ensure WireGuard is installed." try: # Check if the interface is up result = subprocess.run(["wg", "show", interface], capture_output=True, text=True) if result.returncode == 0: # Interface is up, we'll restart it subprocess.run(["wg-quick", "down", interface], check=True) subprocess.run(["wg-quick", "up", interface], check=True) logging.info(f"WireGuard service for interface {interface} restarted successfully") else: # Interface is down, we'll just bring it up subprocess.run(["wg-quick", "up", interface], check=True) logging.info(f"WireGuard service for interface {interface} started successfully") return True, f"WireGuard service for interface {interface} reloaded successfully" except subprocess.CalledProcessError as e: error_message = f"Failed to reload WireGuard service: {e}" if e.stderr: error_message += f"\nError details: {e.stderr.decode('utf-8')}" logging.error(error_message) return False, error_message class WireGuardHandler(http.server.SimpleHTTPRequestHandler): def _send_response(self, status_code: int, data: dict) -> None: self.send_response(status_code) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps(data).encode()) def do_GET(self) -> None: if self.path == '/peers': config = read_config() peers = parse_peers(config) self._send_response(200, peers) else: self._send_response(404, {"error": "Not found"}) def do_POST(self) -> None: if self.path == '/peers': create_backup() # Create a backup before making changes content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) new_peer = json.loads(post_data.decode('utf-8')) config = read_config() config += "\n\n" + peer_to_string(new_peer) write_config(config) if reload_wireguard_service(): self._send_response(201, {"message": "Peer added successfully and service reloaded"}) else: self._send_response(500, {"error": "Peer added but failed to reload service"}) elif self.path == '/restore': if restore_from_backup(): if reload_wireguard_service(): self._send_response(200, {"message": "Configuration restored from backup and service reloaded"}) else: self._send_response(500, {"error": "Configuration restored but failed to reload service"}) else: self._send_response(500, {"error": "Failed to restore from backup"}) else: self._send_response(404, {"error": "Not found"}) def do_PUT(self) -> None: path_parts = self.path.split('/') if len(path_parts) == 3 and path_parts[1] == 'peers': create_backup() # Create a backup before making changes public_key = urllib.parse.unquote(path_parts[2]) content_length = int(self.headers['Content-Length']) put_data = self.rfile.read(content_length) updated_peer = json.loads(put_data.decode('utf-8')) config = read_config() peers = parse_peers(config) peer_found = False for i, peer in enumerate(peers): if peer.get('PublicKey') == public_key: peer_found = True # Update the peer peers[i] = updated_peer new_config = re.sub(r'(\[Interface\].*?\n\n)(.*)', r'\1' + '\n\n'.join(peer_to_string(p) for p in peers), config, flags=re.DOTALL) write_config(new_config) # Reload WireGuard service and send the appropriate response success, message = reload_wireguard_service() if success: self._send_response(200, {"message": "Peer updated successfully and service reloaded"}) else: self._send_response(500, {"error": f"Peer updated but failed to reload service: {message}"}) break if not peer_found: # If no peer with the given public key was found, return a 404 response self._send_response(404, {"error": "Peer not found"}) else: self._send_response(404, {"error": "Not found"}) def do_DELETE(self) -> None: path_parts = self.path.split('/') if len(path_parts) == 3 and path_parts[1] == 'peers': create_backup() # Create a backup before making changes public_key = urllib.parse.unquote(path_parts[2]) config = read_config() peers = parse_peers(config) peer_found = False for peer in peers: if peer.get('PublicKey') == public_key: peer_found = True # Remove the peer peers.remove(peer) new_config = re.sub(r'(\[Interface\].*?\n\n)(.*)', r'\1' + '\n\n'.join(peer_to_string(p) for p in peers), config, flags=re.DOTALL) write_config(new_config) # Reload WireGuard service and send the appropriate response success, message = reload_wireguard_service() if success: self._send_response(200, {"message": "Peer deleted successfully and service reloaded"}) else: self._send_response(500, {"error": f"Peer deleted but failed to reload service: {message}"}) break if not peer_found: # If no peer with the given public key was found, return a 404 response self._send_response(404, {"error": "Peer not found"}) else: self._send_response(404, {"error": "Not found"}) def run_server() -> None: host = CONFIG["server"]["host"] port = CONFIG["server"]["port"] with socketserver.TCPServer((host, port), WireGuardHandler) as httpd: logging.info(f"Serving on {host}:{port}") httpd.serve_forever() if __name__ == "__main__": run_server()