core_daemon/modules/message_bus.py

95 lines
3.0 KiB
Python

import queue
import threading
class MessageBus:
def __init__(self):
self.queues = {}
self.lock = threading.Lock()
def create_queue(self, queue_name):
with self.lock:
if queue_name not in self.queues:
self.queues[queue_name] = queue.Queue()
return True
return False
def delete_queue(self, queue_name):
with self.lock:
if queue_name in self.queues:
del self.queues[queue_name]
return True
return False
def send_message(self, queue_name, message):
with self.lock:
if queue_name in self.queues:
self.queues[queue_name].put(message)
return True
return False
def receive_message(self, queue_name, timeout=None):
with self.lock:
if queue_name in self.queues:
try:
return self.queues[queue_name].get(timeout=timeout)
except queue.Empty:
return None
return None
message_bus = MessageBus()
def do_create_queue(args):
"""Create a new message queue: create_queue <queue_name>"""
queue_name = args.strip()
if message_bus.create_queue(queue_name):
return f"Queue '{queue_name}' created successfully"
else:
return f"Queue '{queue_name}' already exists"
def do_delete_queue(args):
"""Delete a message queue: delete_queue <queue_name>"""
queue_name = args.strip()
if message_bus.delete_queue(queue_name):
return f"Queue '{queue_name}' deleted successfully"
else:
return f"Queue '{queue_name}' does not exist"
def do_send_message(args):
"""Send a message to a queue: send_message <queue_name> <message>"""
parts = args.split(maxsplit=1)
if len(parts) != 2:
return "Invalid arguments. Usage: send_message <queue_name> <message>"
queue_name, message = parts
if message_bus.send_message(queue_name, message):
return f"Message sent to queue '{queue_name}'"
else:
return f"Queue '{queue_name}' does not exist"
def do_receive_message(args):
"""Receive a message from a queue: receive_message <queue_name> [timeout]"""
parts = args.split()
if len(parts) not in (1, 2):
return "Invalid arguments. Usage: receive_message <queue_name> [timeout]"
queue_name = parts[0]
timeout = float(parts[1]) if len(parts) == 2 else None
message = message_bus.receive_message(queue_name, timeout)
if message is not None:
return f"Received message from queue '{queue_name}': {message}"
else:
return f"No message available in queue '{queue_name}'"
commands = {
'create_queue': do_create_queue,
'delete_queue': do_delete_queue,
'send_message': do_send_message,
'receive_message': do_receive_message,
}
def get_commands():
return commands
def initialize():
print("Message bus module initialized")
def shutdown():
print("Message bus module shut down")