Description
Is your idea related to a problem? Please describe.
the problem is that awswrangler does not support Client Side Encryption for Parquet format although Pyarrow supports this feature : https://arrow.apache.org/docs/python/parquet.html#kms-connection-configuration
This feature is very important to support for sensitive data writing to S3.
Describe the solution you'd like
Since Pyarrow supports this feature, I don't think it is very costly to implement it in awswrangler.
writing client side encrypted parquet from s3
The s3.to_parquet
method already exposes pyarrow_additional_kwargs
parameter. Through this parameter we can include a encryption_properties
with a custom implementation of pyarrow KmsClient
(https://arrow.apache.org/docs/python/generated/pyarrow.parquet.encryption.KmsClient.html#).
I already tested this and it works when writing the dataframe to one single file. When writing concurrently, this throws an error OSError: Re-using encryption properties for another file
because the same writer with the same encryption configuration is used to write all chunks (https://github.com/aws/aws-sdk-pandas/blob/main/awswrangler/s3/_write_parquet.py#L116) and this is not permitted by Pyarrow.
reading client side encrypted parquet from s3
In the same logic Pyarrow exposes a decryption configuration that can be passed to the Pyarrow reader (https://arrow.apache.org/docs/python/parquet.html#decryption-configuration). The pyarrow_additional_kwargs
parameter is exposed in awswrangler.s3.read_parquet
however it is only forwarded to to_pandas
method.
An example of Pyarrow KmsClient implementation using AWS KMS :
class AwsKmsClient(pe.KmsClient):
def __init__(self, kms_connection_config):
pe.KmsClient.__init__(self)
self.kms_client = boto3.client(
"kms",
region_name=kms_connection_config.custom_kms_conf[
"aws_region_name"
],
)
def wrap_key(
self, key_bytes: bytes, master_key_identifier: str
) -> bytes:
try:
response = self.kms_client.encrypt(
KeyId=master_key_identifier, Plaintext=key_bytes
)
cipher_text_blob = response["CiphertextBlob"]
return base64.b64encode(cipher_text_blob)
except Exception as e:
raise AwsKmsClientException(
f"Failed to wrap key with master key {master_key_identifier}. Error: {e}"
)
def unwrap_key(
self, wrapped_key: str, master_key_identifier: str
) -> str:
try:
wrapped_key = base64.b64decode(wrapped_key)
response = self.kms_client.decrypt(
CiphertextBlob=wrapped_key,
KeyId=master_key_identifier,
)
return response["Plaintext"]
except Exception as e:
raise AwsKmsClientException(
f"Failed to unwrap key with master key {master_key_identifier}. Error: {e}"
)
I could propose a PR to address this if you agree with my investigation and with including this feature to awswrangler.s3
.
Thank you!