139 lines
5.5 KiB
Python
139 lines
5.5 KiB
Python
|
import reedsolo
|
||
|
from Cryptodome.Cipher import AES
|
||
|
from Cryptodome.Random import get_random_bytes
|
||
|
from Cryptodome.Protocol.KDF import scrypt
|
||
|
from Cryptodome.Util.Padding import pad
|
||
|
|
||
|
# Parameters for key derivation function (KDF)
|
||
|
KDF_SALT = b"salt_for_key_derivation"
|
||
|
KDF_N = 2 ** 14 # CPU/memory cost parameter for scrypt KDF
|
||
|
KDF_r = 8 # Block size parameter for scrypt KDF
|
||
|
KDF_p = 1 # Parallelization parameter for scrypt KDF
|
||
|
KDF_KEY_LEN = 32 # Length of derived encryption key
|
||
|
|
||
|
|
||
|
def encrypt_data(data, number, password=None, keyfile=None):
|
||
|
"""
|
||
|
Encrypts data using Reed-Solomon coding and AES encryption.
|
||
|
|
||
|
Args:
|
||
|
data (bytes): The data to encrypt.
|
||
|
number (int): The number of pieces to split the data into using Reed-Solomon coding.
|
||
|
password (bytes or None): The password to use for key derivation using the scrypt KDF. If None,
|
||
|
the key is read from the file specified by keyfile.
|
||
|
keyfile (str or None): The path to the file containing the encryption key. If None, the key
|
||
|
is derived from the password using the scrypt KDF.
|
||
|
Returns:
|
||
|
A list of bytes objects, each of which is an encrypted piece of the input data.
|
||
|
Raises:
|
||
|
ValueError: If the data cannot be split into the requested number of pieces using Reed-Solomon
|
||
|
coding, or if an error occurs during key derivation.
|
||
|
"""
|
||
|
# Use Reed-Solomon to split data into number pieces
|
||
|
reed_rs = reedsolo.RSCodec(number)
|
||
|
try:
|
||
|
data_pieces = reed_rs.encode(data)
|
||
|
except reedsolo.ReedSolomonError as reed_error:
|
||
|
raise ValueError("Error during Reed-Solomon encoding: {}".format(str(reed_error)))
|
||
|
|
||
|
# Derive encryption key from password using scrypt KDF, or read from file if specified
|
||
|
if keyfile is None:
|
||
|
try:
|
||
|
key = scrypt(password, KDF_SALT, KDF_KEY_LEN, N=KDF_N, r=KDF_r, p=KDF_p)
|
||
|
except ValueError as reed_error:
|
||
|
raise ValueError("Error during key derivation: {}".format(str(reed_error)))
|
||
|
else:
|
||
|
key = read_key_from_file(keyfile)
|
||
|
|
||
|
# Encrypt each piece using AES in GCM mode with derived key
|
||
|
encrypted_pieces = []
|
||
|
for piece in data_pieces:
|
||
|
aes = AES.new(key, AES.MODE_GCM, nonce=get_random_bytes(12))
|
||
|
ciphertext, tag = aes.encrypt_and_digest(pad(piece, AES.block_size))
|
||
|
encrypted_pieces.append(ciphertext)
|
||
|
|
||
|
return encrypted_pieces
|
||
|
|
||
|
|
||
|
def read_key_from_file(keyfile):
|
||
|
"""
|
||
|
Reads an encryption key from a file.
|
||
|
Args:
|
||
|
keyfile (str): The path to the file containing the encryption key.
|
||
|
Returns:
|
||
|
A bytes object containing the encryption key.
|
||
|
Raises:
|
||
|
ValueError: If the key file does not exist or the key length is invalid.
|
||
|
"""
|
||
|
with open(keyfile, "rb") as key_file:
|
||
|
key = key_file.read().strip()
|
||
|
if len(key) != KDF_KEY_LEN:
|
||
|
raise ValueError("Invalid key length")
|
||
|
return key
|
||
|
|
||
|
|
||
|
def decrypt_data(data_pieces, key, keyfile=None, password=None):
|
||
|
"""
|
||
|
Decrypts a list of encrypted data pieces using AES in CBC mode with PKCS7 padding,
|
||
|
and returns the original data by reassembling the pieces using Reed-Solomon decoding.
|
||
|
|
||
|
Args:
|
||
|
data_pieces (list of bytes): The list of encrypted data pieces to decrypt and reassemble.
|
||
|
key (bytes): The encryption key to use for decrypting the data pieces.
|
||
|
keyfile (str, optional): Path to a file containing the encryption key. If specified,
|
||
|
the key will be read from this file instead of the `key` argument.
|
||
|
password (str, optional): Password to use for decrypting the encryption key. If
|
||
|
specified, the password will be used to derive the key from the keyfile.
|
||
|
Returns:
|
||
|
bytes: The original data obtained by reassembling the decrypted data pieces using
|
||
|
Reed-Solomon decoding.
|
||
|
Raises:
|
||
|
ValueError: If both `key` and `keyfile` arguments are None, or if both are specified.
|
||
|
IOError: If the keyfile cannot be read.
|
||
|
ValueError: If the password is incorrect or the keyfile does not contain a valid key.
|
||
|
"""
|
||
|
# Determine the encryption key
|
||
|
if keyfile is not None and key is None:
|
||
|
with open(keyfile, "rb") as f:
|
||
|
key = f.read()
|
||
|
if password is not None:
|
||
|
key = scrypt(
|
||
|
password.encode(), KDF_SALT, KDF_KEY_LEN, N=KDF_N, r=KDF_r, p=KDF_p
|
||
|
)
|
||
|
if key != fernet.decrypt(key):
|
||
|
raise ValueError("Incorrect password or invalid key file")
|
||
|
elif key is not None and keyfile is None:
|
||
|
pass
|
||
|
else:
|
||
|
raise ValueError("Must specify either key or keyfile, but not both")
|
||
|
|
||
|
# Decrypt each data piece and reassemble the original data
|
||
|
piece_size = len(data_pieces[0])
|
||
|
decoded_pieces = reedsolo.RSCodec(len(data_pieces)).decode(data_pieces)
|
||
|
cipher = AES.new(key, AES.MODE_CBC, iv=decoded_pieces[0][:16])
|
||
|
decrypted_data = b""
|
||
|
for piece in decoded_pieces:
|
||
|
decrypted_data += unpad(cipher.decrypt(piece), AES.block_size)
|
||
|
return decrypted_data
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# Read encryption key from file
|
||
|
key = read_key_from_file("./my_key.key")
|
||
|
|
||
|
# Read data from file
|
||
|
with open("./cat.png", "rb") as f:
|
||
|
data = f.read()
|
||
|
|
||
|
# Encrypt data and get nonce and encrypted pieces
|
||
|
nonce, encrypted_pieces = encrypt_data(data, 6, keyfile="./my_key.key")
|
||
|
|
||
|
# Decrypt data pieces
|
||
|
decrypted_data = decrypt_data(
|
||
|
[nonce] + encrypted_pieces, key, keyfile="./my_key.key"
|
||
|
)
|
||
|
|
||
|
# Write decrypted data to file
|
||
|
with open("restored_cat.png", "wb") as f:
|
||
|
f.write(decrypted_data)
|