import cmd import socket import json import os class ModuleShell(cmd.Cmd): intro = "Welcome to the Module Shell. Type 'help' for commands." prompt = "(module_shell) " def __init__(self, host='localhost', port=9999): super().__init__() self.host = host self.port = port self.session_id = None def send_command(self, action, **kwargs): command = {'action': action, **kwargs} if self.session_id: command['session_id'] = self.session_id try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((self.host, self.port)) s.sendall(json.dumps(command).encode()) data = s.recv(1024).decode() return json.loads(data) if data else None except Exception as e: print(f"Error: {str(e)}") return None def do_load(self, arg): """Load a module: load """ response = self.send_command('load', module=arg) if response: print(response.get('message', 'Unknown response')) def do_unload(self, arg): """Unload a module: unload """ response = self.send_command('unload', module=arg) if response: print(response.get('message', 'Unknown response')) def do_list(self, arg): """List all loaded modules""" response = self.send_command('list') if response and response.get('success'): modules = response.get('message', []) if modules: print("Loaded modules:") for module in modules: print(f"- {module}") else: print("No modules are currently loaded.") else: print("Failed to retrieve module list.") def do_execute(self, arg): """Execute a command: execute [args]""" parts = arg.split() if not parts: print("Error: No command specified.") return command = parts[0] args = ' '.join(parts[1:]) response = self.send_command('execute', command=command, args=args) if response: print(response.get('message', 'Unknown response')) def do_upgrade(self, arg): """Upgrade the core daemon and modules""" response = self.send_command('upgrade') if response: print(response.get('message', 'Unknown response')) def do_exit(self, arg): """Exit the shell""" print("Exiting...") return True def do_shutdown(self, arg): """Shutdown the core daemon""" response = self.send_command('shutdown') if response: print(response.get('message', 'Unknown response')) print("Core daemon is shutting down. Exiting CLI.") return True def default(self, line): """Handle unknown commands by trying to execute them""" self.do_execute(line) def main(): ModuleShell().cmdloop() if __name__ == "__main__": main()