diff --git a/aws/src/integration/java/org/apache/iceberg/aws/TestKeyManagementClient.java b/aws/src/integration/java/org/apache/iceberg/aws/TestKeyManagementClient.java new file mode 100644 index 000000000000..83bacf2601cd --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/TestKeyManagementClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.NullSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.CreateKeyRequest; +import software.amazon.awssdk.services.kms.model.CreateKeyResponse; +import software.amazon.awssdk.services.kms.model.DataKeySpec; +import software.amazon.awssdk.services.kms.model.KeySpec; +import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest; +import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionResponse; + +@EnabledIfEnvironmentVariables({ + @EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_ACCESS_KEY_ID, matches = ".*"), + @EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SECRET_ACCESS_KEY, matches = ".*"), + @EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SESSION_TOKEN, matches = ".*"), + @EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_TEST_ACCOUNT_ID, matches = "\\d{12}") +}) +public class TestKeyManagementClient { + + private static final Logger LOG = LoggerFactory.getLogger(TestKeyManagementClient.class); + + private static KmsClient kmsClient; + private static String keyId; + + @BeforeAll + public static void beforeClass() { + kmsClient = AwsClientFactories.defaultFactory().kms(); + CreateKeyRequest createKeyRequest = + CreateKeyRequest.builder() + .keySpec(KeySpec.SYMMETRIC_DEFAULT) + .description( + "Iceberg integration test key for " + TestKeyManagementClient.class.getName()) + .build(); + CreateKeyResponse response = kmsClient.createKey(createKeyRequest); + keyId = response.keyMetadata().keyId(); + } + + @AfterAll + public static void afterClass() { + // AWS KMS doesn't allow instant deletion. Keys can be put to pendingDeletion state instead, + // with a minimum of 7 days until final removal. + ScheduleKeyDeletionRequest deletionRequest = + ScheduleKeyDeletionRequest.builder().keyId(keyId).pendingWindowInDays(7).build(); + + ScheduleKeyDeletionResponse deletionResponse = kmsClient.scheduleKeyDeletion(deletionRequest); + LOG.info( + "Deletion of test key {} will be finalized at {}", keyId, deletionResponse.deletionDate()); + + try { + kmsClient.close(); + } catch (Exception e) { + LOG.error("Error closing KMS client", e); + } + } + + @Test + public void testKeyWrapping() { + try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) { + keyManagementClient.initialize(ImmutableMap.of()); + + ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes()); + ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyId); + + assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key); + } + } + + @ParameterizedTest + @NullSource + @EnumSource( + value = DataKeySpec.class, + names = {"AES_128", "AES_256"}) + public void testKeyGeneration(DataKeySpec dataKeySpec) { + try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) { + Map properties = + dataKeySpec == null + ? ImmutableMap.of() + : ImmutableMap.of(AwsProperties.KMS_DATA_KEY_SPEC, dataKeySpec.name()); + keyManagementClient.initialize(properties); + KeyManagementClient.KeyGenerationResult result = keyManagementClient.generateKey(keyId); + + assertThat(keyManagementClient.unwrapKey(result.wrappedKey(), keyId)).isEqualTo(result.key()); + assertThat(result.key().limit()).isEqualTo(expectedLength(dataKeySpec)); + } + } + + private static int expectedLength(DataKeySpec spec) { + if (DataKeySpec.AES_128.equals(spec)) { + return 128 / 8; + } else { + return 256 / 8; + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsKeyManagementClient.java b/aws/src/main/java/org/apache/iceberg/aws/AwsKeyManagementClient.java new file mode 100644 index 000000000000..6d2671f4e26d --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsKeyManagementClient.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.aws; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.encryption.KeyManagementClient; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.model.DataKeySpec; +import software.amazon.awssdk.services.kms.model.DecryptRequest; +import software.amazon.awssdk.services.kms.model.DecryptResponse; +import software.amazon.awssdk.services.kms.model.EncryptRequest; +import software.amazon.awssdk.services.kms.model.EncryptResponse; +import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec; +import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest; +import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse; + +/** + * Key management client implementation that uses AWS Key Management Service. To be used for + * encrypting/decrypting keys with a KMS-managed master key, (by referencing its key ID), and for + * the generation of new encryption keys. + */ +public class AwsKeyManagementClient implements KeyManagementClient { + + private KmsClient kmsClient; + private EncryptionAlgorithmSpec encryptionAlgorithmSpec; + private DataKeySpec dataKeySpec; + + @Override + public void initialize(Map properties) { + AwsClientFactory clientFactory = AwsClientFactories.from(properties); + this.kmsClient = clientFactory.kms(); + + AwsProperties awsProperties = new AwsProperties(properties); + this.encryptionAlgorithmSpec = awsProperties.kmsEncryptionAlgorithmSpec(); + this.dataKeySpec = awsProperties.kmsDataKeySpec(); + } + + @Override + public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) { + EncryptRequest request = + EncryptRequest.builder() + .keyId(wrappingKeyId) + .encryptionAlgorithm(encryptionAlgorithmSpec) + .plaintext(SdkBytes.fromByteBuffer(key)) + .build(); + + EncryptResponse result = kmsClient.encrypt(request); + return result.ciphertextBlob().asByteBuffer(); + } + + @Override + public boolean supportsKeyGeneration() { + return true; + } + + @Override + public KeyGenerationResult generateKey(String wrappingKeyId) { + GenerateDataKeyRequest request = + GenerateDataKeyRequest.builder().keyId(wrappingKeyId).keySpec(dataKeySpec).build(); + + GenerateDataKeyResponse response = kmsClient.generateDataKey(request); + KeyGenerationResult result = + new KeyGenerationResult( + response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer()); + return result; + } + + @Override + public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) { + DecryptRequest request = + DecryptRequest.builder() + .keyId(wrappingKeyId) + .encryptionAlgorithm(encryptionAlgorithmSpec) + .ciphertextBlob(SdkBytes.fromByteBuffer(wrappedKey)) + .build(); + + DecryptResponse result = kmsClient.decrypt(request); + return result.plaintext().asByteBuffer(); + } + + @Override + public void close() { + if (kmsClient != null) { + kmsClient.close(); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 1a8db990578a..62d541da0c54 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -41,6 +41,8 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder; import software.amazon.awssdk.services.glue.GlueClientBuilder; +import software.amazon.awssdk.services.kms.model.DataKeySpec; +import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec; public class AwsProperties implements Serializable { @@ -206,6 +208,17 @@ public class AwsProperties implements Serializable { */ public static final String REST_SESSION_TOKEN = "rest.session-token"; + /** Encryption algorithm used to encrypt/decrypt master table keys */ + public static final String KMS_ENCRYPTION_ALGORITHM_SPEC = "kms.encryption-algorithm-spec"; + + public static final EncryptionAlgorithmSpec KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT = + EncryptionAlgorithmSpec.SYMMETRIC_DEFAULT; + + /** Length of data key generated by KMS */ + public static final String KMS_DATA_KEY_SPEC = "kms.data-key-spec"; + + public static final DataKeySpec KMS_DATA_KEY_SPEC_DEFAULT = DataKeySpec.AES_256; + private final Set stsClientAssumeRoleTags; private final String clientAssumeRoleArn; @@ -230,6 +243,8 @@ public class AwsProperties implements Serializable { private String restAccessKeyId; private String restSecretAccessKey; private String restSessionToken; + private EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec; + private DataKeySpec kmsDataKeySpec; public AwsProperties() { this.stsClientAssumeRoleTags = Sets.newHashSet(); @@ -252,6 +267,9 @@ public AwsProperties() { this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT; this.restSigningName = REST_SIGNING_NAME_DEFAULT; + + this.kmsEncryptionAlgorithmSpec = KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT; + this.kmsDataKeySpec = KMS_DATA_KEY_SPEC_DEFAULT; } @SuppressWarnings("MethodLength") @@ -293,6 +311,14 @@ public AwsProperties(Map properties) { this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID); this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY); this.restSessionToken = properties.get(REST_SESSION_TOKEN); + + this.kmsEncryptionAlgorithmSpec = + EncryptionAlgorithmSpec.fromValue( + properties.getOrDefault( + KMS_ENCRYPTION_ALGORITHM_SPEC, KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT.toString())); + this.kmsDataKeySpec = + DataKeySpec.fromValue( + properties.getOrDefault(KMS_DATA_KEY_SPEC, KMS_DATA_KEY_SPEC_DEFAULT.toString())); } public Set stsClientAssumeRoleTags() { @@ -402,6 +428,14 @@ public AwsCredentialsProvider restCredentialsProvider() { this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken); } + public EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec() { + return this.kmsEncryptionAlgorithmSpec; + } + + public DataKeySpec kmsDataKeySpec() { + return this.kmsDataKeySpec; + } + private Set toStsTags( Map properties, String prefix) { return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream() diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java index a7fb494cc8e1..2d97def180af 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java @@ -24,7 +24,7 @@ import java.util.Map; /** A minimum client interface to connect to a key management service (KMS). */ -interface KeyManagementClient extends Serializable, Closeable { +public interface KeyManagementClient extends Serializable, Closeable { /** * Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID. @@ -94,7 +94,7 @@ class KeyGenerationResult { private final ByteBuffer key; private final ByteBuffer wrappedKey; - KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) { + public KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) { this.key = key; this.wrappedKey = wrappedKey; }