Compare commits

...

2 Commits

Author SHA1 Message Date
kalzu rekku
63c10720c6 Foobar WTF 2024-02-11 09:05:43 +02:00
kalzu rekku
1f542b1720 First test runs with multiprocessing. 2024-02-09 09:14:24 +02:00
4 changed files with 240 additions and 110 deletions

49
foobar/kiss.py Normal file
View File

@ -0,0 +1,49 @@
import queue
import time
import threading
def main():
print("Hello There")
que = queue.Queue()
event = threading.Event()
#input(que)
input_thread = threading.Thread(target=input, args=(que,event))
input_thread.daemon = True
input_thread.start()
#output(que)
output_thread = threading.Thread(target=output, args=(que,event))
output_thread.daemon = True
output_thread.start()
for num in range(21):
print(f"wait till full: {num}/20", end='\r')
time.sleep(0.5)
print("\nDone!")
event.set()
def input(que, event):
loop = 0
while not event:
message = f"{time.time()} | Hi! {loop}"
que.put(message)
loop = loop + 1
time.sleep(1)
def output(que, event):
file = '/tmp/test_out'
with open(file, 'a') as out_file:
while not event:
message = que.get()
out_file.write(f"{message}\r\n")
out_file.flush()
time.sleep(1)
out_file.close()
if __name__ == "__main__":
main()
print("Bye now!")
exit(0)

View File

@ -1,54 +1,65 @@
import threading
import os
"""
My attempt to mimic Suckless.org's ii (irc it) software
"""
import sys
import time
import socket
import multiprocessing
from ircthing_core import irc_router, connect_to_irc_server
from ircthing_utils import read_config
from ircthing_utils import read_config, cli_args, base_path
Threads = {}
Stop_Toggle = threading.Event()
Processes = {}
Stop_Toggle = multiprocessing.Event()
def clean_exit(fifo_files, socket):
socket.send(bytes(f"QUIT :Bye\r\n", "UTF-8"))
def clean_exit():
"""
Sets the Stop_Toggle event to signal clean exit.
"""
Stop_Toggle.set()
for file in fifo_files:
try:
os.unlink(file)
except FileNotFoundError:
# We are okay if some fifo files has been removed before this.
pass
def main():
my_name = sys.argv[0]
my_name_pyless, _ = os.path.splitext(my_name)
base_path = f'/tmp/{my_name_pyless}'
"""
Main function to initialize irc connections specified on the configuration file.
config_path = '../ircthing3.ini'
This will run each irc server connection in invidual sub process.
Hold the process handlers on list.
Kill them with Stop_Toggle.
"""
root_path = base_path()
# Read configurations for all topics
# Get configuration file path if given
config_path = "config.ini"
argument = cli_args()
if argument:
config_path = argument
# Read configuration
network_configs = read_config(config_path)
## Get irc socket for each network in configuration
## Start thread for each socket
## Start thread for each socket
for network in network_configs:
net_name = network["net_name"]
print(f"{time.time()} | Found configs for {net_name} network.")
server = network["server"]
port = network["port"]
nickname = network["nickname"]
password = network["password"]
irc_socket, fifo_files = connect_to_irc_server(base_path, net_name, server, port, nickname, password)
router_instance = irc_router(fifo_files, irc_socket, server, nickname)
Threads[net_name] = threading.Thread(target=router_instance.start)
Threads[net_name].daemon = True
Threads[net_name].start()
print(f"{time.time()} | Found configs for {net_name} network.")
irc_socket, fifo_files, network_dir = connect_to_irc_server(
root_path, net_name, server, port, nickname, password
)
for thread in Threads.values():
print(thread)
thread.join()
router_instance = irc_router(
fifo_files, irc_socket, server, nickname, network_dir
)
Processes[net_name] = multiprocessing.Process(target=router_instance.start)
Processes[net_name].daemon = True
Processes[net_name].start()
# Wait for all the sub processes to end.
for process in Processes.values():
process.join()
if __name__ == "__main__":
@ -58,5 +69,6 @@ if __name__ == "__main__":
except Exception as e:
print(f"Got error {e}")
finally:
clean_exit()
print(f"{time.time()} | Bye!")
exit(0)
sys.exit(0)

View File

@ -1,9 +1,10 @@
import time
import re
import asyncio
import socket
import os
from ircthing_utils import make_files
from threading import Thread, Event
def connect_to_irc_server(
base_path,
@ -29,95 +30,112 @@ def connect_to_irc_server(
irc_socket.send(bytes(f"NICK {nickname}\r\n", "UTF-8"))
# Create directories for the server and channel
fifo_files = make_files(base_path, net_name)
fifo_files, network_path = make_files(base_path, net_name)
return irc_socket, fifo_files
return irc_socket, fifo_files, network_path
class irc_router:
def __init__(self, fifo_files, irc_socket, server, nickname):
def __init__(self, fifo_files, irc_socket, server, nickname, network_dir):
self.fifo_files = fifo_files
self.irc_socket = irc_socket
self.server = server
self.nickname = nickname
self.network_dir = network_dir
self.channels = []
self._loop = asyncio.new_event_loop()
self.threads = []
self.disconnect_toggle = Event()
def start(self):
try:
asyncio.set_event_loop(self._loop)
tasks = [self.user_listener(), self.irc_message_loop()]
self._loop.run_until_complete(asyncio.gather(*tasks))
finally:
for file in self.fifo_files:
try:
os.close(file)
except:
# Errors are okay at this point.
pass
self._loop.close()
self.start_threads()
## Main loop for reading irc inflow and routing to correct _out fifos.
## Also handle ping/pong here
async def irc_message_loop(self):
output_files=[]
loop = asyncio.get_running_loop()
while True:
for file in self.fifo_files:
try:
os.close(file)
except:
# Errors are okay at this point.
pass
self.irc_send("QUIT")
for output in self.fifo_files:
if output.endswith('out'):
if output not in output_files:
print(f"{time.time()} | {self.server} has output file: {output}")
output_files.append(output)
def start_threads(self):
user_listener_thread = Thread(target=self.user_listener)
irc_listener_thread = Thread(target=self.irc_message_loop)
self.threads.append(user_listener_thread)
self.threads.append(irc_listener_thread)
# start the threads
for thread in self.threads:
thread.start()
# wait for all the threads to reach end
for thread in self.threads:
thread.join()
def get_output_fifos(self):
output_fifos = []
for fifo in self.fifo_files:
if fifo.endswith("out") and fifo not in output_fifos:
output_fifos.append(fifo)
return output_fifos
def write_to_server_out(self, message):
server_out = os.path.join(self.network_dir, "out")
if os.access(server_out, os.W_OK):
self.write_to_out(server_out, message)
else:
print(f"{time.time()} | {self.server} | {message}")
def irc_message_loop(self):
while not self.disconnect_toggle.is_set():
try:
irc_input = await loop.sock_recv(self.irc_socket, 2048)
irc_input = self.irc_socket.recv(2048)
irc_input = irc_input.decode("utf-8")
except Exception as e:
print(f"Error reading from socket: {e}")
continue
if irc_input:
##
## The main logic hapens here.
## support for channel joins
## and spliting the input to channel specifig channel/[in, out] locations
##
if "PING" in irc_input:
self.irc_send(f"PONG :{self.server}")
self.write_to_out(output_files[0], f"{time.time()} | ping/pong")
continue
self.handle_irc_input(irc_input)
time.sleep(0.5)
if f":{self.server} 353" in irc_input:
## We have joined a channel!
match = re.search(r'353 \S+ = (#\S+)', irc_input)
channel_join = match.group(1)
await self.make_files(channel_join)
continue
def handle_irc_input(self, irc_input):
if "PING" in irc_input:
self.irc_send(f"PONG :{self.server}")
self.write_to_server_out("ping/pong")
elif f":{self.server} 353" in irc_input:
self.handle_channel_join(irc_input)
elif "PRIVMSG" in irc_input:
self.handle_privmsg(irc_input)
else:
self.write_to_server_out(irc_input)
if "PRIVMSG" in irc_input:
if self.if_input_belongs_to_channel(irc_input):
continue
def handle_channel_join(self, irc_input):
## We have joined a channel!
channel = re.search(r"353 \S+ = (#\S+)", irc_input).group(1)
self.make_files(channel)
self.write_to_out(output_files[0], irc_input)
def handle_privmsg(self, irc_input):
if not self.input_belongs_to_channel(irc_input):
self.write_to_server_out(irc_input)
await asyncio.sleep(0.1)
def if_input_belongs_to_channel(self, input):
def input_belongs_to_channel(self, input):
output_files = self.get_output_fifos()
# We know that input includes "PRIVMSG"
# But to where it shoudl go?
message = re.search(r':.*:(.*)', input).group(1)
input_user = re.search(r'^:(\S+)', input).group(1)
input_channel = re.search(r'PRIVMSG (\S+) :', input)
done=False
for file in self.fifo_files:
if input_channel.group(1) in file:
if file.endswith("out"):
self.write_to_out(file, f"{input_user}: {message}")
done=True
message = re.search(r":.*:(.*)", input).group(1)
input_user = re.search(r"^:(\S+)", input).group(1)
input_channel = re.search(r"PRIVMSG (\S+) :", input).group(1)
done = False
for file in output_files:
if input_channel in file:
self.write_to_out(file, f"{input_user}: {message}")
done = True
return done
async def make_files(self, channel):
def make_files(self, channel):
network_path = os.path.dirname(self.fifo_files[1])
channel_dir = os.path.join(network_path, channel)
os.makedirs(channel_dir, exist_ok=True)
@ -133,7 +151,6 @@ class irc_router:
fifo_files.append(f"{channel_dir}/in")
fifo_files.append(f"{channel_dir}/out")
self.fifo_files.extend(fifo_files)
await asyncio.sleep(1)
def write_to_out(self, file, input):
if file == None:
@ -148,29 +165,28 @@ class irc_router:
self.irc_socket.send(bytes(f"{message}\r\n", "utf-8"))
print(f"{time.time()} | Send: {message}")
async def async_readline(self, file, queue):
def async_readline(self, file, queue):
print(f"Trying to read from file {file}")
try:
while True:
with open(file, mode='r') as in_file:
with open(file, mode="r") as in_file:
line = in_file.readline().strip()
if not line:
print("There was no line!")
continue
await queue.put(line)
queue.put(line)
in_file.close()
await asyncio.sleep(0.5)
except Exception as e:
print(f"ERROR IN async_readline func: {e}")
async def process_user_input(self, queue):
def process_user_input(self, queue):
print("Processing of user input may start!")
while True:
line = await queue.get()
print(f"{time.time()} | trying to send: {line} to {self.server}")
self.irc_send(line)
await asyncio.sleep(0.2)
time.sleep(0.5)
async def user_listener(self):
print("User Listener is HERE!")
@ -188,5 +204,4 @@ class irc_router:
# Wait for all tasks to complete
await asyncio.gather(*tasks)
finally:
self.irc_send('QUIT')
self.irc_send("QUIT")

View File

@ -1,7 +1,53 @@
import configparser
import os
"""
ircthing_utils
def read_config(config_path):
Some functions that make life nicer.
"""
import configparser
import argparse
import os
import sys
from typing import Union, List, Tuple
def base_path() -> str:
"""
Returns location that the script will use as root for its dir structure
Returns:
str: path for run time files
"""
my_name = sys.argv[0]
my_name_pyless, _ = os.path.splitext(my_name)
return f"/tmp/{my_name_pyless}"
def cli_args() -> Union[str, None]:
"""
Parse arguments and provide small usage message
Returns:
str or None: path to config file
"""
parser = argparse.ArgumentParser(
description="Usage: python3.11 ircthing.py /myconfig.ini"
)
parser.add_argument("config", help="Path to the configuration file.")
args = parser.parse_args()
if args.config:
return args.config
return None
def read_config(config_path: str) -> List[dict]:
"""
Read configuration file and return list of dicts.
Dictionaries hold information about irc networks/server to connecto to.
args:
str: Path to the configuration file.
Returns:
list[{}, {}]: List of dictionaries.
"""
config = configparser.ConfigParser()
config.read(config_path)
@ -27,12 +73,21 @@ def read_config(config_path):
}
network_configs.append(network_config)
return network_configs
except Exception as e:
print(f"Failure while reading configuration file. {e}")
exit(1)
except Exception as config_read_error:
print(f"Failure while reading configuration file. {config_read_error}")
sys.exit(1)
def make_files(path, net_name):
def make_files(path: str, net_name: str) -> Tuple[List[str], str]:
"""
Make directories and fifo files need to make irc server connection.
Args:
str: The root path for run time files.
str: The network name of the irc server we are joining
Returns:
list: List containing the fio files created.
str: Path to the directory created for the network connection.
"""
os.makedirs(path, exist_ok=True)
server_dir = os.path.join(path, net_name)
os.makedirs(server_dir, exist_ok=True)
@ -47,5 +102,4 @@ def make_files(path, net_name):
fifo_files = []
fifo_files.append(f"{server_dir}/in")
fifo_files.append(f"{server_dir}/out")
return fifo_files
return fifo_files, server_dir