123 lines
4.4 KiB
Python
123 lines
4.4 KiB
Python
"""
|
|
This module provides functions for managing and saving data in a dictionary object. It also supports encryption and decryption of the data when saving and loading it to/from disk. The functions in this module include:
|
|
|
|
set_encryption_key(key: bytes): sets the encryption key to be used for encrypting the data.
|
|
add_data(service_tag: str, key: str, md5sum: str) -> str: adds data for a service to the data dictionary.
|
|
save_data(filename: str, data: dict, key=None): writes the data dictionary to disk as a JSON object. If a key is provided, it encrypts the data using the AES symmetric encryption algorithm before writing to disk.
|
|
encrypt_data(data: bytes) -> bytes: encrypts data using the AES symmetric encryption algorithm.
|
|
load_data(file_path: str) -> Dict: loads the data from a file and returns it as a dictionary. If the file is encrypted, it uses the provided key to decrypt it before returning the data.
|
|
|
|
This module depends on the following packages: hashlib, Crypto, and collections.
|
|
"""
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import time
|
|
from collections import defaultdict
|
|
from Cryptodome.Cipher import AES
|
|
from Cryptodome.Util import Padding
|
|
from cryptography.fernet import Fernet
|
|
|
|
data = defaultdict(lambda: {"timestamp": 0, "services": {}})
|
|
|
|
_ENCRYPTION_KEY = None
|
|
|
|
def set_encryption_key(key: bytes):
|
|
global _ENCRYPTION_KEY
|
|
_ENCRYPTION_KEY = key
|
|
|
|
|
|
def add_data(service_tag: str, key: str, md5sum: str) -> str:
|
|
"""
|
|
Adds data for a service to the `data` dictionary.
|
|
|
|
Parameters:
|
|
service_tag (str): A string representing the service being added.
|
|
key (str): A string representing the key for the service being added.
|
|
md5sum (str): A string representing the MD5 checksum for the service being added.
|
|
|
|
Returns:
|
|
str: A string representing the unique ID of the run that the data was added to.
|
|
"""
|
|
|
|
# Generate a unique ID for the run
|
|
run_id = f"run-{hashlib.sha256(str(data).encode()).hexdigest()[:6]}"
|
|
timestamp = int(time.time())
|
|
|
|
# Add the service data to the run
|
|
data[run_id]["timestamp"] = timestamp
|
|
data[run_id]["services"][service_tag] = {"key": key, "md5sum": md5sum}
|
|
|
|
return run_id
|
|
|
|
|
|
def save_data(filename: str, data: dict, key=None):
|
|
"""
|
|
Writes the data dictionary to disk as a JSON object.
|
|
|
|
Parameters:
|
|
filename (str): A string representing the filename to write the data to.
|
|
data (dict): A dictionary representing the data to be written to disk.
|
|
key (bytes): Optional bytes representing a key to use for encryption.
|
|
"""
|
|
with open(filename, "w") as f:
|
|
# Serialize the data dictionary as a JSON object
|
|
json_data = json.dumps(data)
|
|
|
|
# If a key is provided, encrypt the JSON data
|
|
if _ENCRYPTION_KEY:
|
|
# Encrypt the data using the key
|
|
encrypted_data = encrypt_data(json_data.encode())
|
|
|
|
# Write the encrypted data to the file
|
|
f.write(encrypted_data.hex())
|
|
else:
|
|
# Write the unencrypted JSON data to the file
|
|
print("you need to set the encryption key first.")
|
|
|
|
|
|
def encrypt_data(data: bytes) -> bytes:
|
|
"""
|
|
Encrypts data using the AES symmetric encryption algorithm.
|
|
|
|
Parameters:
|
|
data (bytes): A bytes object representing the data to be encrypted.
|
|
|
|
Returns:
|
|
bytes: A bytes object representing the encrypted data.
|
|
"""
|
|
# Generate a random initialization vector (IV)
|
|
iv = os.urandom(AES.block_size)
|
|
|
|
# Pad the data to a multiple of the block size
|
|
padded_data = Padding.pad(data, AES.block_size)
|
|
|
|
# Create an AES cipher object
|
|
cipher = AES.new(_ENCRYPTION_KEY, AES.MODE_CBC, iv)
|
|
|
|
# Encrypt the data using CBC mode
|
|
encrypted_data = cipher.encrypt(padded_data)
|
|
|
|
# Prepend the IV to the encrypted data
|
|
return iv + encrypted_data
|
|
|
|
|
|
def load_data(file_path: str, key=None) -> dict:
|
|
"""
|
|
Load the data from a file and return it as a dictionary.
|
|
|
|
:param file_path: The path to the file to load.
|
|
:param key: The key to use to decrypt the file.
|
|
:return: A dictionary representing the data from the file.
|
|
"""
|
|
if _ENCRYPTION_KEY:
|
|
with open(file_path, "rb") as f:
|
|
ciphertext = f.read()
|
|
|
|
fernet = Fernet(_ENCRYPTION_KEY.encode())
|
|
plaintext = fernet.decrypt(ciphertext)
|
|
return json.loads(plaintext.decode())
|
|
else:
|
|
print("you need to set the encryption key first.")
|
|
return '{"you need to set the encryption key first."}'
|