core_daemon/modules/user_auth.py

126 lines
4.1 KiB
Python
Raw Permalink Normal View History

import hashlib
import uuid
import time
class UserAuth:
def __init__(self):
self.users = {} # Store users as {username: {password_hash, salt}}
self.sessions = {} # Store sessions as {session_id: {username, expiry}}
self.session_duration = 3600 # 1 hour
def hash_password(self, password, salt=None):
if salt is None:
salt = uuid.uuid4().hex
return hashlib.sha256((password + salt).encode()).hexdigest(), salt
def register_user(self, username, password):
if username in self.users:
return False, "User already exists"
password_hash, salt = self.hash_password(password)
self.users[username] = {"password_hash": password_hash, "salt": salt}
return True, "User registered successfully"
def authenticate(self, username, password):
if username not in self.users:
return False, "User not found"
user = self.users[username]
password_hash, _ = self.hash_password(password, user["salt"])
if password_hash == user["password_hash"]:
session_id = uuid.uuid4().hex
expiry = time.time() + self.session_duration
self.sessions[session_id] = {"username": username, "expiry": expiry}
return True, session_id
return False, "Invalid password"
def authenticate_request(self, session_id, action):
if action in ['register', 'login']: # These actions don't require authentication
return True, None
success, result = self.validate_session(session_id)
if not success:
return False, "Authentication required"
return True, result # result here is the username
def validate_session(self, session_id):
if session_id not in self.sessions:
return False, "Invalid session"
session = self.sessions[session_id]
if time.time() > session["expiry"]:
del self.sessions[session_id]
return False, "Session expired"
return True, session["username"]
def logout(self, session_id):
if session_id in self.sessions:
del self.sessions[session_id]
return True, "Logged out successfully"
return False, "Invalid session"
user_auth = UserAuth()
def do_register(args):
"""Register a new user: register <username> <password>"""
try:
username, password = args.split()
success, message = user_auth.register_user(username, password)
return message
except ValueError:
return "Invalid arguments. Usage: register <username> <password>"
def do_login(args):
"""Login a user: login <username> <password>"""
try:
username, password = args.split()
success, result = user_auth.authenticate(username, password)
if success:
return f"Login successful. Session ID: {result}"
return result
except ValueError:
return "Invalid arguments. Usage: login <username> <password>"
def do_validate(args):
"""Validate a session: validate <session_id>"""
success, result = user_auth.validate_session(args)
if success:
return f"Valid session for user: {result}"
return result
def do_logout(args):
"""Logout a user: logout <session_id>"""
success, message = user_auth.logout(args)
return message
def auth_pre_command_hook(command):
action = command.get('action')
session_id = command.get('session_id')
if action in ['register', 'login']:
return True
success, result = user_auth.validate_session(session_id)
if not success:
return False
return True
commands = {
'register': do_register,
'login': do_login,
'validate': do_validate,
'logout': do_logout,
}
def get_commands():
return commands
def initialize():
print("User authentication module initialized")
def shutdown():
print("User authentication module shut down")
def register_hooks(hook_manager):
hook_manager.register_hook('pre_command', auth_pre_command_hook)
def unregister_hooks(hook_manager):
hook_manager.unregister_hook('pre_command', auth_pre_command_hook)