95 lines
3.0 KiB
Python
95 lines
3.0 KiB
Python
|
import queue
|
||
|
import threading
|
||
|
|
||
|
class MessageBus:
|
||
|
def __init__(self):
|
||
|
self.queues = {}
|
||
|
self.lock = threading.Lock()
|
||
|
|
||
|
def create_queue(self, queue_name):
|
||
|
with self.lock:
|
||
|
if queue_name not in self.queues:
|
||
|
self.queues[queue_name] = queue.Queue()
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def delete_queue(self, queue_name):
|
||
|
with self.lock:
|
||
|
if queue_name in self.queues:
|
||
|
del self.queues[queue_name]
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def send_message(self, queue_name, message):
|
||
|
with self.lock:
|
||
|
if queue_name in self.queues:
|
||
|
self.queues[queue_name].put(message)
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def receive_message(self, queue_name, timeout=None):
|
||
|
with self.lock:
|
||
|
if queue_name in self.queues:
|
||
|
try:
|
||
|
return self.queues[queue_name].get(timeout=timeout)
|
||
|
except queue.Empty:
|
||
|
return None
|
||
|
return None
|
||
|
|
||
|
message_bus = MessageBus()
|
||
|
|
||
|
def do_create_queue(args):
|
||
|
"""Create a new message queue: create_queue <queue_name>"""
|
||
|
queue_name = args.strip()
|
||
|
if message_bus.create_queue(queue_name):
|
||
|
return f"Queue '{queue_name}' created successfully"
|
||
|
else:
|
||
|
return f"Queue '{queue_name}' already exists"
|
||
|
|
||
|
def do_delete_queue(args):
|
||
|
"""Delete a message queue: delete_queue <queue_name>"""
|
||
|
queue_name = args.strip()
|
||
|
if message_bus.delete_queue(queue_name):
|
||
|
return f"Queue '{queue_name}' deleted successfully"
|
||
|
else:
|
||
|
return f"Queue '{queue_name}' does not exist"
|
||
|
|
||
|
def do_send_message(args):
|
||
|
"""Send a message to a queue: send_message <queue_name> <message>"""
|
||
|
parts = args.split(maxsplit=1)
|
||
|
if len(parts) != 2:
|
||
|
return "Invalid arguments. Usage: send_message <queue_name> <message>"
|
||
|
queue_name, message = parts
|
||
|
if message_bus.send_message(queue_name, message):
|
||
|
return f"Message sent to queue '{queue_name}'"
|
||
|
else:
|
||
|
return f"Queue '{queue_name}' does not exist"
|
||
|
|
||
|
def do_receive_message(args):
|
||
|
"""Receive a message from a queue: receive_message <queue_name> [timeout]"""
|
||
|
parts = args.split()
|
||
|
if len(parts) not in (1, 2):
|
||
|
return "Invalid arguments. Usage: receive_message <queue_name> [timeout]"
|
||
|
queue_name = parts[0]
|
||
|
timeout = float(parts[1]) if len(parts) == 2 else None
|
||
|
message = message_bus.receive_message(queue_name, timeout)
|
||
|
if message is not None:
|
||
|
return f"Received message from queue '{queue_name}': {message}"
|
||
|
else:
|
||
|
return f"No message available in queue '{queue_name}'"
|
||
|
|
||
|
commands = {
|
||
|
'create_queue': do_create_queue,
|
||
|
'delete_queue': do_delete_queue,
|
||
|
'send_message': do_send_message,
|
||
|
'receive_message': do_receive_message,
|
||
|
}
|
||
|
|
||
|
def get_commands():
|
||
|
return commands
|
||
|
|
||
|
def initialize():
|
||
|
print("Message bus module initialized")
|
||
|
|
||
|
def shutdown():
|
||
|
print("Message bus module shut down")
|