""" This module provides functions for managing and saving data in a dictionary object. It also supports encryption and decryption of the data when saving and loading it to/from disk. The functions in this module include: set_encryption_key(key: bytes): sets the encryption key to be used for encrypting the data. add_data(service_tag: str, key: str, md5sum: str) -> str: adds data for a service to the data dictionary. save_data(filename: str, data: dict, key=None): writes the data dictionary to disk as a JSON object. If a key is provided, it encrypts the data using the AES symmetric encryption algorithm before writing to disk. encrypt_data(data: bytes) -> bytes: encrypts data using the AES symmetric encryption algorithm. load_data(file_path: str) -> Dict: loads the data from a file and returns it as a dictionary. If the file is encrypted, it uses the provided key to decrypt it before returning the data. This module depends on the following packages: hashlib, Crypto, and collections. """ import os import json import hashlib import time from collections import defaultdict from Cryptodome.Cipher import AES from Cryptodome.Util import Padding from cryptography.fernet import Fernet data = defaultdict(lambda: {"timestamp": 0, "services": {}}) _ENCRYPTION_KEY = None def set_encryption_key(key: bytes): global _ENCRYPTION_KEY _ENCRYPTION_KEY = key def add_data(service_tag: str, key: str, md5sum: str) -> str: """ Adds data for a service to the `data` dictionary. Parameters: service_tag (str): A string representing the service being added. key (str): A string representing the key for the service being added. md5sum (str): A string representing the MD5 checksum for the service being added. Returns: str: A string representing the unique ID of the run that the data was added to. """ # Generate a unique ID for the run run_id = f"run-{hashlib.sha256(str(data).encode()).hexdigest()[:6]}" timestamp = int(time.time()) # Add the service data to the run data[run_id]["timestamp"] = timestamp data[run_id]["services"][service_tag] = {"key": key, "md5sum": md5sum} return run_id def save_data(filename: str, data: dict, key=None): """ Writes the data dictionary to disk as a JSON object. Parameters: filename (str): A string representing the filename to write the data to. data (dict): A dictionary representing the data to be written to disk. key (bytes): Optional bytes representing a key to use for encryption. """ with open(filename, "w") as f: # Serialize the data dictionary as a JSON object json_data = json.dumps(data) # If a key is provided, encrypt the JSON data if _ENCRYPTION_KEY: # Encrypt the data using the key encrypted_data = encrypt_data(json_data.encode()) # Write the encrypted data to the file f.write(encrypted_data.hex()) else: # Write the unencrypted JSON data to the file print("you need to set the encryption key first.") def encrypt_data(data: bytes) -> bytes: """ Encrypts data using the AES symmetric encryption algorithm. Parameters: data (bytes): A bytes object representing the data to be encrypted. Returns: bytes: A bytes object representing the encrypted data. """ # Generate a random initialization vector (IV) iv = os.urandom(AES.block_size) # Pad the data to a multiple of the block size padded_data = Padding.pad(data, AES.block_size) # Create an AES cipher object cipher = AES.new(_ENCRYPTION_KEY, AES.MODE_CBC, iv) # Encrypt the data using CBC mode encrypted_data = cipher.encrypt(padded_data) # Prepend the IV to the encrypted data return iv + encrypted_data def load_data(file_path: str, key=None) -> dict: """ Load the data from a file and return it as a dictionary. :param file_path: The path to the file to load. :param key: The key to use to decrypt the file. :return: A dictionary representing the data from the file. """ if _ENCRYPTION_KEY: with open(file_path, "rb") as f: ciphertext = f.read() fernet = Fernet(_ENCRYPTION_KEY.encode()) plaintext = fernet.decrypt(ciphertext) return json.loads(plaintext.decode()) else: print("you need to set the encryption key first.") return '{"you need to set the encryption key first."}'