core_daemon/daemon_cli.py

97 lines
3.0 KiB
Python
Raw Permalink Normal View History

import cmd
import socket
import json
import os
class ModuleShell(cmd.Cmd):
intro = "Welcome to the Module Shell. Type 'help' for commands."
prompt = "(module_shell) "
def __init__(self, host='localhost', port=9999):
super().__init__()
self.host = host
self.port = port
self.session_id = None
def send_command(self, action, **kwargs):
command = {'action': action, **kwargs}
if self.session_id:
command['session_id'] = self.session_id
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.host, self.port))
s.sendall(json.dumps(command).encode())
data = s.recv(1024).decode()
return json.loads(data) if data else None
except Exception as e:
print(f"Error: {str(e)}")
return None
def do_load(self, arg):
"""Load a module: load <module_name>"""
response = self.send_command('load', module=arg)
if response:
print(response.get('message', 'Unknown response'))
def do_unload(self, arg):
"""Unload a module: unload <module_name>"""
response = self.send_command('unload', module=arg)
if response:
print(response.get('message', 'Unknown response'))
def do_list(self, arg):
"""List all loaded modules"""
response = self.send_command('list')
if response and response.get('success'):
modules = response.get('message', [])
if modules:
print("Loaded modules:")
for module in modules:
print(f"- {module}")
else:
print("No modules are currently loaded.")
else:
print("Failed to retrieve module list.")
def do_execute(self, arg):
"""Execute a command: execute <command> [args]"""
parts = arg.split()
if not parts:
print("Error: No command specified.")
return
command = parts[0]
args = ' '.join(parts[1:])
response = self.send_command('execute', command=command, args=args)
if response:
print(response.get('message', 'Unknown response'))
def do_upgrade(self, arg):
"""Upgrade the core daemon and modules"""
response = self.send_command('upgrade')
if response:
print(response.get('message', 'Unknown response'))
def do_exit(self, arg):
"""Exit the shell"""
print("Exiting...")
return True
def do_shutdown(self, arg):
"""Shutdown the core daemon"""
response = self.send_command('shutdown')
if response:
print(response.get('message', 'Unknown response'))
print("Core daemon is shutting down. Exiting CLI.")
return True
def default(self, line):
"""Handle unknown commands by trying to execute them"""
self.do_execute(line)
def main():
ModuleShell().cmdloop()
if __name__ == "__main__":
main()