chatgpt/pastedb/pastedb01/data/data.py

123 lines
4.4 KiB
Python

"""
This module provides functions for managing and saving data in a dictionary object. It also supports encryption and decryption of the data when saving and loading it to/from disk. The functions in this module include:
set_encryption_key(key: bytes): sets the encryption key to be used for encrypting the data.
add_data(service_tag: str, key: str, md5sum: str) -> str: adds data for a service to the data dictionary.
save_data(filename: str, data: dict, key=None): writes the data dictionary to disk as a JSON object. If a key is provided, it encrypts the data using the AES symmetric encryption algorithm before writing to disk.
encrypt_data(data: bytes) -> bytes: encrypts data using the AES symmetric encryption algorithm.
load_data(file_path: str) -> Dict: loads the data from a file and returns it as a dictionary. If the file is encrypted, it uses the provided key to decrypt it before returning the data.
This module depends on the following packages: hashlib, Crypto, and collections.
"""
import os
import json
import hashlib
import time
from collections import defaultdict
from Cryptodome.Cipher import AES
from Cryptodome.Util import Padding
from cryptography.fernet import Fernet
data = defaultdict(lambda: {"timestamp": 0, "services": {}})
_ENCRYPTION_KEY = None
def set_encryption_key(key: bytes):
global _ENCRYPTION_KEY
_ENCRYPTION_KEY = key
def add_data(service_tag: str, key: str, md5sum: str) -> str:
"""
Adds data for a service to the `data` dictionary.
Parameters:
service_tag (str): A string representing the service being added.
key (str): A string representing the key for the service being added.
md5sum (str): A string representing the MD5 checksum for the service being added.
Returns:
str: A string representing the unique ID of the run that the data was added to.
"""
# Generate a unique ID for the run
run_id = f"run-{hashlib.sha256(str(data).encode()).hexdigest()[:6]}"
timestamp = int(time.time())
# Add the service data to the run
data[run_id]["timestamp"] = timestamp
data[run_id]["services"][service_tag] = {"key": key, "md5sum": md5sum}
return run_id
def save_data(filename: str, data: dict, key=None):
"""
Writes the data dictionary to disk as a JSON object.
Parameters:
filename (str): A string representing the filename to write the data to.
data (dict): A dictionary representing the data to be written to disk.
key (bytes): Optional bytes representing a key to use for encryption.
"""
with open(filename, "w") as f:
# Serialize the data dictionary as a JSON object
json_data = json.dumps(data)
# If a key is provided, encrypt the JSON data
if _ENCRYPTION_KEY:
# Encrypt the data using the key
encrypted_data = encrypt_data(json_data.encode())
# Write the encrypted data to the file
f.write(encrypted_data.hex())
else:
# Write the unencrypted JSON data to the file
print("you need to set the encryption key first.")
def encrypt_data(data: bytes) -> bytes:
"""
Encrypts data using the AES symmetric encryption algorithm.
Parameters:
data (bytes): A bytes object representing the data to be encrypted.
Returns:
bytes: A bytes object representing the encrypted data.
"""
# Generate a random initialization vector (IV)
iv = os.urandom(AES.block_size)
# Pad the data to a multiple of the block size
padded_data = Padding.pad(data, AES.block_size)
# Create an AES cipher object
cipher = AES.new(_ENCRYPTION_KEY, AES.MODE_CBC, iv)
# Encrypt the data using CBC mode
encrypted_data = cipher.encrypt(padded_data)
# Prepend the IV to the encrypted data
return iv + encrypted_data
def load_data(file_path: str, key=None) -> dict:
"""
Load the data from a file and return it as a dictionary.
:param file_path: The path to the file to load.
:param key: The key to use to decrypt the file.
:return: A dictionary representing the data from the file.
"""
if _ENCRYPTION_KEY:
with open(file_path, "rb") as f:
ciphertext = f.read()
fernet = Fernet(_ENCRYPTION_KEY.encode())
plaintext = fernet.decrypt(ciphertext)
return json.loads(plaintext.decode())
else:
print("you need to set the encryption key first.")
return '{"you need to set the encryption key first."}'