Compare commits

..

No commits in common. "multithread_rewrite" and "main" have entirely different histories.

5 changed files with 206 additions and 367 deletions

View File

@ -1,74 +0,0 @@
import queue
import os
import time
import threading
import re
def main():
print("Hello There")
que = queue.Queue()
event = threading.Event()
config = {'output': '/tmp/test_out', 'input': '/tmp/test_in'}
#input(que)
input_thread = threading.Thread(target=handle_input, args=(que, event, config))
input_thread.daemon = True
input_thread.start()
#output(que)
output_thread = threading.Thread(target=handle_output, args=(que, event, config))
output_thread.daemon = True
output_thread.start()
countdown(config, event)
def countdown(config, event):
print(f"Speak to: {config['input']}")
print(f"Output is going to be: {config['output']}")
num = 0
while not event.is_set():
print(f"wait till full: {num}/~", end='\r')
time.sleep(0.5)
num = num + 1
print("\nDone!")
def handle_input(que, event, config):
file = config['input']
try:
os.mkfifo(file)
except FileExistsError:
pass
with open(file, 'r') as in_fifo:
while not event.is_set():
fifo_read = in_fifo.readline().rsplit('\n')
if not fifo_read:
continue
message = fifo_read
que.put(message)
time.sleep(1)
def handle_output(que, event, config):
file = config['output']
with open(file, 'a') as out_file:
while not event.is_set():
try:
message = que.get()
except queue.Empty:
continue
if message == "":
continue
elif 'EXIT' in message:
event.set()
out_file.write(f"{time.time()} | Bye!\r\n")
out_file.flush()
else:
out_file.write(f"{time.time()} | {message[0]}\r\n")
out_file.flush()
time.sleep(1)
if __name__ == "__main__":
main()
print("Bye now!")
exit(0)

View File

@ -1,74 +0,0 @@
"""
My attempt to mimic Suckless.org's ii (irc it) software
"""
import sys
import time
import multiprocessing
from ircthing_core import irc_router, connect_to_irc_server
from ircthing_utils import read_config, cli_args, base_path
Processes = {}
Stop_Toggle = multiprocessing.Event()
def clean_exit():
"""
Sets the Stop_Toggle event to signal clean exit.
"""
Stop_Toggle.set()
def main():
"""
Main function to initialize irc connections specified on the configuration file.
This will run each irc server connection in invidual sub process.
Hold the process handlers on list.
Kill them with Stop_Toggle.
"""
root_path = base_path()
# Get configuration file path if given
config_path = "config.ini"
argument = cli_args()
if argument:
config_path = argument
# Read configuration
network_configs = read_config(config_path)
## Get irc socket for each network in configuration
## Start thread for each socket
for network in network_configs:
net_name = network["net_name"]
server = network["server"]
port = network["port"]
nickname = network["nickname"]
password = network["password"]
print(f"{time.time()} | Found configs for {net_name} network.")
irc_socket, fifo_files, network_dir = connect_to_irc_server(
root_path, net_name, server, port, nickname, password
)
router_instance = irc_router(
fifo_files, irc_socket, server, nickname, network_dir
)
Processes[net_name] = multiprocessing.Process(target=router_instance.start)
Processes[net_name].daemon = True
Processes[net_name].start()
# Wait for all the sub processes to end.
for process in Processes.values():
process.join()
if __name__ == "__main__":
print(f"{time.time()} | Lets start!")
try:
main()
except Exception as e:
print(f"Got error {e}")
finally:
clean_exit()
print(f"{time.time()} | Bye!")
sys.exit(0)

138
ircthing3.py Normal file
View File

@ -0,0 +1,138 @@
import threading
import os
import sys
import time
import socket
import configparser
from ircthing_core import irc_router
Threads = {}
Stop_Toggle = threading.Event()
def read_config(config_path):
config = configparser.ConfigParser()
config.read(config_path)
network_configs = []
try:
# Collect information from each topic / network
for topic in config.sections():
network = config[topic]
server = network.get("server")
port = network.getint("port")
channels = network.get("channels", fallback=None)
nickname = network.get("nickname")
password = network.get("password", fallback=None)
network_config = {
"net_name": network.name,
"server": server,
"port": port,
"channels": channels,
"nickname": nickname,
"password": password,
}
network_configs.append(network_config)
return network_configs
except Exception as e:
print(f"Failure while reading configuration file. {e}")
exit(1)
def clean_exit(fifo_files, socket):
socket.send(bytes(f"QUIT :Bye\r\n", "UTF-8"))
Stop_Toggle.set()
for file in fifo_files:
try:
os.unlink(file)
except FileNotFoundError:
# We are okay if some fifo files has been removed before this.
pass
def make_files(path, net_name):
os.makedirs(path, exist_ok=True)
server_dir = os.path.join(path, net_name)
os.makedirs(server_dir, exist_ok=True)
try:
os.mkfifo(f"{server_dir}/in")
except FileExistsError:
pass
try:
os.mkfifo(f"{server_dir}/out")
except FileExistsError:
pass
fifo_files = []
fifo_files.append(f"{server_dir}/in")
fifo_files.append(f"{server_dir}/out")
return fifo_files
def connect_to_irc_server(
base_path,
net_name,
server,
port,
nickname,
password=None,
):
print(f"Going to connect to: {server}")
# Create a socket connection to the IRC server
irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
irc_socket.connect((server, port))
# Send the server password if provided
if password:
irc_socket.send(bytes(f"PASS {password}\r\n", "UTF-8"))
print(f"Going to use username: {nickname}")
# Send user and nickname information to the server
irc_socket.send(bytes(f"USER {nickname} 0 * :{nickname}\r\n", "UTF-8"))
irc_socket.send(bytes(f"NICK {nickname}\r\n", "UTF-8"))
# Create directories for the server and channel
fifo_files = make_files(base_path, net_name)
return irc_socket, fifo_files
def main():
my_name = sys.argv[0]
my_name_pyless, _ = os.path.splitext(my_name)
base_path = f'/tmp/{my_name_pyless}'
config_path = '../ircthing3.ini'
# Read configurations for all topics
network_configs = read_config(config_path)
## Get irc socket for each network in configuration
## Start thread for each socket
for network in network_configs:
net_name = network["net_name"]
print(f"{time.time()} | Found configs for {net_name} network.")
server = network["server"]
port = network["port"]
nickname = network["nickname"]
password = network["password"]
irc_socket, fifo_files = connect_to_irc_server(base_path, net_name, server, port, nickname, password)
router_instance = irc_router(fifo_files, irc_socket, server, nickname)
Threads[net_name] = threading.Thread(target=router_instance.start)
Threads[net_name].daemon = True
Threads[net_name].start()
for thread in Threads.values():
print(thread)
thread.join()
if __name__ == "__main__":
print(f"{time.time()} | Lets start!")
try:
main()
except Exception as e:
print(f"Got error {e}")
finally:
print(f"{time.time()} | Bye!")
exit(0)

View File

@ -1,141 +1,92 @@
import time
import re
import socket
import asyncio
import os
from ircthing_utils import make_files
from threading import Thread, Event
def connect_to_irc_server(
base_path,
net_name,
server,
port,
nickname,
password=None,
):
print(f"Going to connect to: {server}")
# Create a socket connection to the IRC server
irc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
irc_socket.connect((server, port))
# Send the server password if provided
if password:
irc_socket.send(bytes(f"PASS {password}\r\n", "UTF-8"))
print(f"Going to use username: {nickname}")
# Send user and nickname information to the server
irc_socket.send(bytes(f"USER {nickname} 0 * :{nickname}\r\n", "UTF-8"))
irc_socket.send(bytes(f"NICK {nickname}\r\n", "UTF-8"))
# Create directories for the server and channel
fifo_files, network_path = make_files(base_path, net_name)
return irc_socket, fifo_files, network_path
class irc_router:
def __init__(self, fifo_files, irc_socket, server, nickname, network_dir):
def __init__(self, fifo_files, irc_socket, server, nickname):
self.fifo_files = fifo_files
self.irc_socket = irc_socket
self.server = server
self.nickname = nickname
self.network_dir = network_dir
self.channels = []
self.threads = []
self.disconnect_toggle = Event()
self._loop = asyncio.new_event_loop()
def start(self):
self.start_threads()
try:
asyncio.set_event_loop(self._loop)
tasks = [self.user_listener(), self.irc_message_loop()]
self._loop.run_until_complete(asyncio.gather(*tasks))
finally:
for file in self.fifo_files:
try:
os.close(file)
except:
# Errors are okay at this point.
pass
self.irc_send("QUIT")
self._loop.close()
def start_threads(self):
user_listener_thread = Thread(target=self.user_listener)
irc_listener_thread = Thread(target=self.irc_message_loop)
## Main loop for reading irc inflow and routing to correct _out fifos.
## Also handle ping/pong here
async def irc_message_loop(self):
output_files=[]
loop = asyncio.get_running_loop()
while True:
self.threads.append(user_listener_thread)
self.threads.append(irc_listener_thread)
# start the threads
for thread in self.threads:
thread.start()
# wait for all the threads to reach end
for thread in self.threads:
thread.join()
def get_output_fifos(self):
output_fifos = []
for fifo in self.fifo_files:
if fifo.endswith("out") and fifo not in output_fifos:
output_fifos.append(fifo)
return output_fifos
def write_to_server_out(self, message):
server_out = os.path.join(self.network_dir, "out")
if os.access(server_out, os.W_OK):
self.write_to_out(server_out, message)
else:
print(f"{time.time()} | {self.server} | {message}")
def irc_message_loop(self):
while not self.disconnect_toggle.is_set():
for output in self.fifo_files:
if output.endswith('out'):
if output not in output_files:
print(f"{time.time()} | {self.server} has output file: {output}")
output_files.append(output)
try:
irc_input = self.irc_socket.recv(2048)
irc_input = await loop.sock_recv(self.irc_socket, 2048)
irc_input = irc_input.decode("utf-8")
except Exception as e:
print(f"Error reading from socket: {e}")
continue
if irc_input:
self.handle_irc_input(irc_input)
time.sleep(0.5)
def handle_irc_input(self, irc_input):
##
## The main logic hapens here.
## support for channel joins
## and spliting the input to channel specifig channel/[in, out] locations
##
if "PING" in irc_input:
self.irc_send(f"PONG :{self.server}")
self.write_to_server_out("ping/pong")
elif f":{self.server} 353" in irc_input:
self.handle_channel_join(irc_input)
elif "PRIVMSG" in irc_input:
self.handle_privmsg(irc_input)
else:
self.write_to_server_out(irc_input)
self.write_to_out(output_files[0], f"{time.time()} | ping/pong")
continue
def handle_channel_join(self, irc_input):
if f":{self.server} 353" in irc_input:
## We have joined a channel!
channel = re.search(r"353 \S+ = (#\S+)", irc_input).group(1)
self.make_files(channel)
match = re.search(r'353 \S+ = (#\S+)', irc_input)
channel_join = match.group(1)
await self.make_files(channel_join)
continue
def handle_privmsg(self, irc_input):
if not self.input_belongs_to_channel(irc_input):
self.write_to_server_out(irc_input)
if "PRIVMSG" in irc_input:
if self.if_input_belongs_to_channel(irc_input):
continue
def input_belongs_to_channel(self, input):
output_files = self.get_output_fifos()
self.write_to_out(output_files[0], irc_input)
await asyncio.sleep(0.1)
def if_input_belongs_to_channel(self, input):
# We know that input includes "PRIVMSG"
# But to where it shoudl go?
message = re.search(r":.*:(.*)", input).group(1)
input_user = re.search(r"^:(\S+)", input).group(1)
input_channel = re.search(r"PRIVMSG (\S+) :", input).group(1)
message = re.search(r':.*:(.*)', input).group(1)
input_user = re.search(r'^:(\S+)', input).group(1)
input_channel = re.search(r'PRIVMSG (\S+) :', input)
done=False
for file in output_files:
if input_channel in file:
for file in self.fifo_files:
if input_channel.group(1) in file:
if file.endswith("out"):
self.write_to_out(file, f"{input_user}: {message}")
done=True
return done
def make_files(self, channel):
async def make_files(self, channel):
network_path = os.path.dirname(self.fifo_files[1])
channel_dir = os.path.join(network_path, channel)
os.makedirs(channel_dir, exist_ok=True)
@ -151,6 +102,7 @@ class irc_router:
fifo_files.append(f"{channel_dir}/in")
fifo_files.append(f"{channel_dir}/out")
self.fifo_files.extend(fifo_files)
await asyncio.sleep(1)
def write_to_out(self, file, input):
if file == None:
@ -165,28 +117,29 @@ class irc_router:
self.irc_socket.send(bytes(f"{message}\r\n", "utf-8"))
print(f"{time.time()} | Send: {message}")
def async_readline(self, file, queue):
async def async_readline(self, file, queue):
print(f"Trying to read from file {file}")
try:
while True:
with open(file, mode="r") as in_file:
with open(file, mode='r') as in_file:
line = in_file.readline().strip()
if not line:
print("There was no line!")
continue
queue.put(line)
await queue.put(line)
in_file.close()
await asyncio.sleep(0.5)
except Exception as e:
print(f"ERROR IN async_readline func: {e}")
def process_user_input(self, queue):
async def process_user_input(self, queue):
print("Processing of user input may start!")
while True:
line = await queue.get()
print(f"{time.time()} | trying to send: {line} to {self.server}")
self.irc_send(line)
time.sleep(0.5)
await asyncio.sleep(0.2)
async def user_listener(self):
print("User Listener is HERE!")
@ -204,4 +157,5 @@ class irc_router:
# Wait for all tasks to complete
await asyncio.gather(*tasks)
finally:
self.irc_send("QUIT")
self.irc_send('QUIT')

View File

@ -1,105 +0,0 @@
"""
ircthing_utils
Some functions that make life nicer.
"""
import configparser
import argparse
import os
import sys
from typing import Union, List, Tuple
def base_path() -> str:
"""
Returns location that the script will use as root for its dir structure
Returns:
str: path for run time files
"""
my_name = sys.argv[0]
my_name_pyless, _ = os.path.splitext(my_name)
return f"/tmp/{my_name_pyless}"
def cli_args() -> Union[str, None]:
"""
Parse arguments and provide small usage message
Returns:
str or None: path to config file
"""
parser = argparse.ArgumentParser(
description="Usage: python3.11 ircthing.py /myconfig.ini"
)
parser.add_argument("config", help="Path to the configuration file.")
args = parser.parse_args()
if args.config:
return args.config
return None
def read_config(config_path: str) -> List[dict]:
"""
Read configuration file and return list of dicts.
Dictionaries hold information about irc networks/server to connecto to.
args:
str: Path to the configuration file.
Returns:
list[{}, {}]: List of dictionaries.
"""
config = configparser.ConfigParser()
config.read(config_path)
network_configs = []
try:
# Collect information from each topic / network
for topic in config.sections():
network = config[topic]
server = network.get("server")
port = network.getint("port")
channels = network.get("channels", fallback=None)
nickname = network.get("nickname")
password = network.get("password", fallback=None)
network_config = {
"net_name": network.name,
"server": server,
"port": port,
"channels": channels,
"nickname": nickname,
"password": password,
}
network_configs.append(network_config)
return network_configs
except Exception as config_read_error:
print(f"Failure while reading configuration file. {config_read_error}")
sys.exit(1)
def make_files(path: str, net_name: str) -> Tuple[List[str], str]:
"""
Make directories and fifo files need to make irc server connection.
Args:
str: The root path for run time files.
str: The network name of the irc server we are joining
Returns:
list: List containing the fio files created.
str: Path to the directory created for the network connection.
"""
os.makedirs(path, exist_ok=True)
server_dir = os.path.join(path, net_name)
os.makedirs(server_dir, exist_ok=True)
try:
os.mkfifo(f"{server_dir}/in")
except FileExistsError:
pass
try:
os.mkfifo(f"{server_dir}/out")
except FileExistsError:
pass
fifo_files = []
fifo_files.append(f"{server_dir}/in")
fifo_files.append(f"{server_dir}/out")
return fifo_files, server_dir