From ef53c2de056243de7cb9a1cd99e33360d3ce35a8 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 20 Feb 2023 08:35:53 +0200 Subject: [PATCH 01/14] initial commit --- .../encryption/DefaultEncryptionManager.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java diff --git a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java new file mode 100644 index 000000000000..96cb6196c449 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.Map; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; + +public class DefaultEncryptionManager implements EncryptionManager { + public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key.id"; + + public static final String ENCRYPTION_DEK_LENGTH = "encryption.data.key.length"; + public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; + + public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + /** Implementation of the KMS client for envelope encryption */ + public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; + + private final KeyManagementClient kmsClient; + private String tableKeyId; + private int dataKeyLength; + private boolean kmsGeneratedKeys; + + private transient volatile SecureRandom workerRNG = null; + + /** + * @param tableKeyId table encryption key id + * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption + * @param encryptionProperties encryption properties + */ + public DefaultEncryptionManager( + String tableKeyId, KeyManagementClient kmsClient, Map encryptionProperties) { + Preconditions.checkNotNull( + tableKeyId, + "Cannot create EnvelopeEncryptionManager because table encryption key ID is not specified"); + Preconditions.checkNotNull( + kmsClient, "Cannot create EnvelopeEncryptionManager because kmsClient is null"); + Preconditions.checkNotNull( + encryptionProperties, + "Cannot create EnvelopeEncryptionManager because encryptionProperties are not passed"); + this.tableKeyId = tableKeyId; + this.kmsClient = kmsClient; + this.kmsGeneratedKeys = kmsClient.supportsKeyGeneration(); + + this.dataKeyLength = + PropertyUtil.propertyAsInt( + encryptionProperties, ENCRYPTION_DEK_LENGTH, ENCRYPTION_DEK_LENGTH_DEFAULT); + } + + @Override + public EncryptedOutputFile encrypt(OutputFile rawOutput) { + if (null == workerRNG) { + workerRNG = new SecureRandom(); + } + + ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); + workerRNG.nextBytes(fileDek.array()); + + ByteBuffer aadPrefix = ByteBuffer.allocate(ENCRYPTION_AAD_LENGTH_DEFAULT); + workerRNG.nextBytes(aadPrefix.array()); + + KeyMetadata fileEnvelopeMetadata = new KeyMetadata(fileDek, null, aadPrefix); + + return new BaseEncryptedOutputFile(rawOutput, fileEnvelopeMetadata); + } + + @Override + public InputFile decrypt(EncryptedInputFile encrypted) { + if (encrypted.keyMetadata() == null || encrypted.keyMetadata().buffer() == null) { + throw new RuntimeException( + "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); + } + return encrypted.encryptedInputFile(); + } +} From 20fa8cd5f1cc6201b9c6de87ff094d59bdd82580 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 8 Mar 2023 10:37:07 +0200 Subject: [PATCH 02/14] Update and add factory Co-Authored-By: Jian Tang --- .../encryption/DefaultEncryptionManager.java | 25 +++-- .../DefaultEncryptionManagerFactory.java | 97 +++++++++++++++++++ .../encryption/EncryptionManagerFactory.java | 54 +++++++++++ 3 files changed, 163 insertions(+), 13 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java diff --git a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java index 96cb6196c449..1fd07233efa6 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java @@ -27,16 +27,6 @@ import org.apache.iceberg.util.PropertyUtil; public class DefaultEncryptionManager implements EncryptionManager { - public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key.id"; - - public static final String ENCRYPTION_DEK_LENGTH = "encryption.data.key.length"; - public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; - - public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; - - /** Implementation of the KMS client for envelope encryption */ - public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; - private final KeyManagementClient kmsClient; private String tableKeyId; private int dataKeyLength; @@ -65,19 +55,21 @@ public DefaultEncryptionManager( this.dataKeyLength = PropertyUtil.propertyAsInt( - encryptionProperties, ENCRYPTION_DEK_LENGTH, ENCRYPTION_DEK_LENGTH_DEFAULT); + encryptionProperties, + EncryptionProperties.ENCRYPTION_DEK_LENGTH, + EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); } @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { if (null == workerRNG) { - workerRNG = new SecureRandom(); + createSecureRandomGenerator(); } ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); workerRNG.nextBytes(fileDek.array()); - ByteBuffer aadPrefix = ByteBuffer.allocate(ENCRYPTION_AAD_LENGTH_DEFAULT); + ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); workerRNG.nextBytes(aadPrefix.array()); KeyMetadata fileEnvelopeMetadata = new KeyMetadata(fileDek, null, aadPrefix); @@ -91,6 +83,13 @@ public InputFile decrypt(EncryptedInputFile encrypted) { throw new RuntimeException( "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); } + + // Native decryption: simply return the input file. Parquet decryption will get the key from key + // metadata. return encrypted.encryptedInputFile(); } + + private void createSecureRandomGenerator() { + workerRNG = new SecureRandom(); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java new file mode 100644 index 000000000000..76de31163fd5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; + +public class DefaultEncryptionManagerFactory implements EncryptionManagerFactory { + private KeyManagementClient kmsClient; + private Map catalogPropertyMap; + + @Override + public void initialize(Map catalogProperties) { + this.catalogPropertyMap = catalogProperties; + } + + @Override + public EncryptionManager create(TableMetadata tableMetadata) { + if (tableMetadata == null) { + return PlaintextEncryptionManager.INSTANCE; + } + + Map tableProperties = tableMetadata.properties(); + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); + } + + final Map encryptionProperties = Maps.newHashMap(); + encryptionProperties.putAll(tableProperties); + + // Important: put catalog properties after table properties. Former overrides the latter. + encryptionProperties.putAll(catalogPropertyMap); + + String tableKeyId = encryptionProperties.get(EncryptionProperties.ENCRYPTION_TABLE_KEY); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.INSTANCE; + } else { + return new DefaultEncryptionManager( + tableKeyId, kmsClient(encryptionProperties), encryptionProperties); + } + } + + private synchronized KeyManagementClient kmsClient(Map encryptionProperties) { + if (kmsClient == null) { + String kmsImpl = encryptionProperties.get(EncryptionProperties.ENCRYPTION_KMS_CLIENT_IMPL); + + Preconditions.checkArgument( + null != kmsImpl, + "KMS Client implementation class is not set (via " + + EncryptionProperties.ENCRYPTION_KMS_CLIENT_IMPL + + " catalog property or table property)"); + + kmsClient = EncryptionUtil.createKmsClient(kmsImpl); + kmsClient.initialize(encryptionProperties); + } + + return kmsClient; + } + + @Override + public synchronized void close() throws IOException { + if (kmsClient != null) { + kmsClient.close(); + kmsClient = null; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java new file mode 100644 index 000000000000..a146faa4f2e7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.TableMetadata; + +public interface EncryptionManagerFactory extends Closeable { + + EncryptionManagerFactory NO_ENCRYPTION = tableMetadata -> PlaintextEncryptionManager.INSTANCE; + + /** + * Initialize EncryptionManagerFactory from catalog properties. + * + * @param properties catalog properties + */ + default void initialize(Map properties) {} + + /** + * Create encryption manager from table metadata. + * + * @param tableMetadata table metadata + * @return created encryption manager instance. + */ + EncryptionManager create(TableMetadata tableMetadata); + + /** + * Close EncryptionManagerFactory to release underlying resources. + * + *

Calling this method is only required when this EncryptionManagerFactory instance is no + * longer expected to be used, and the resources it holds need to be explicitly released to avoid + * resource leaks. + */ + @Override + default void close() throws IOException {} +} From 94ab0cc7f35d8d552caaf129535118cb2f89e2c1 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 8 Mar 2023 10:39:26 +0200 Subject: [PATCH 03/14] Encryption properties class --- .../encryption/EncryptionProperties.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java new file mode 100644 index 000000000000..0596ad3ccf97 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +public class EncryptionProperties { + + private EncryptionProperties() {} + + public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key.id"; + + public static final String ENCRYPTION_DEK_LENGTH = "encryption.data.key.length"; + public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; + + public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + /** Implementation of the KMS client for envelope encryption */ + public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; +} From c2907f80b1c83a09901a08d675863098c3b8a32e Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Tue, 21 Mar 2023 11:29:03 +0200 Subject: [PATCH 04/14] add plaintext mng instance --- .../apache/iceberg/encryption/PlaintextEncryptionManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index 4d8d8aa7aff9..33f19224e69c 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory; public class PlaintextEncryptionManager implements EncryptionManager { + public static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); + private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); @Override From ee70332b03b172d405dd7cf4e891c3f49e484eb8 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 20 Apr 2023 15:01:21 +0300 Subject: [PATCH 05/14] post-review changes --- ...er.java => StandardEncryptionManager.java} | 41 +++++++++++++++---- ... => StandardEncryptionManagerFactory.java} | 4 +- 2 files changed, 35 insertions(+), 10 deletions(-) rename core/src/main/java/org/apache/iceberg/encryption/{DefaultEncryptionManager.java => StandardEncryptionManager.java} (66%) rename core/src/main/java/org/apache/iceberg/encryption/{DefaultEncryptionManagerFactory.java => StandardEncryptionManagerFactory.java} (96%) diff --git a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java similarity index 66% rename from core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java rename to core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 1fd07233efa6..ae5f5c4d9f29 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -26,7 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; -public class DefaultEncryptionManager implements EncryptionManager { +public class StandardEncryptionManager implements EncryptionManager { private final KeyManagementClient kmsClient; private String tableKeyId; private int dataKeyLength; @@ -39,7 +39,7 @@ public class DefaultEncryptionManager implements EncryptionManager { * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption * @param encryptionProperties encryption properties */ - public DefaultEncryptionManager( + public StandardEncryptionManager( String tableKeyId, KeyManagementClient kmsClient, Map encryptionProperties) { Preconditions.checkNotNull( tableKeyId, @@ -72,9 +72,24 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); workerRNG.nextBytes(aadPrefix.array()); - KeyMetadata fileEnvelopeMetadata = new KeyMetadata(fileDek, null, aadPrefix); + // For data files + KeyMetadata dataEncryptionMetadata = new KeyMetadata(fileDek, null, aadPrefix); - return new BaseEncryptedOutputFile(rawOutput, fileEnvelopeMetadata); + // For metadata files + // This is an expensive operation, RPC to KMS server + ByteBuffer wrappedDek = kmsClient.wrapKey(fileDek, tableKeyId); + KeyMetadata manifestEncryptionMetadata = new KeyMetadata(wrappedDek, tableKeyId, aadPrefix); + + // We don't know which key metadata to use, because we don't know what file we encrypt. + // Works for data files: + // return new BaseEncryptedOutputFile(new AesGcmOutputFile(rawOutput, fileDek, aadPrefix), + // dataEncryptionMetadata, rawOutput); + // Works for manifest files: + // return new BaseEncryptedOutputFile(new AesGcmOutputFile(rawOutput, fileDek, aadPrefix), + // manifestEncryptionMetadata, rawOutput); + + // Temp return, for parquet data only. TODO - remove + return new BaseEncryptedOutputFile(null, dataEncryptionMetadata, rawOutput); } @Override @@ -84,12 +99,22 @@ public InputFile decrypt(EncryptedInputFile encrypted) { "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); } - // Native decryption: simply return the input file. Parquet decryption will get the key from key - // metadata. - return encrypted.encryptedInputFile(); + KeyMetadata keyMetadata = KeyMetadata.parse(encrypted.keyMetadata().buffer()); + + ByteBuffer fileDek; + if (keyMetadata.wrappingKeyId() == null) { + fileDek = keyMetadata.encryptionKey(); + } else { + fileDek = kmsClient.unwrapKey(keyMetadata.encryptionKey(), keyMetadata.wrappingKeyId()); + } + + // return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, keyMetadata.aadPrefix()); + + // Temp return null - decrypt is not called for parquet data files. TODO - remove + return null; } private void createSecureRandomGenerator() { - workerRNG = new SecureRandom(); + this.workerRNG = new SecureRandom(); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java similarity index 96% rename from core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java rename to core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java index 76de31163fd5..58902ac0c54b 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/DefaultEncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java @@ -29,7 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; -public class DefaultEncryptionManagerFactory implements EncryptionManagerFactory { +public class StandardEncryptionManagerFactory implements EncryptionManagerFactory { private KeyManagementClient kmsClient; private Map catalogPropertyMap; @@ -65,7 +65,7 @@ public EncryptionManager create(TableMetadata tableMetadata) { // Unencrypted table return PlaintextEncryptionManager.INSTANCE; } else { - return new DefaultEncryptionManager( + return new StandardEncryptionManager( tableKeyId, kmsClient(encryptionProperties), encryptionProperties); } } From 4c6fb3aefb2384d868c98f81171efbdbe179b435 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 2 Aug 2023 15:16:00 +0300 Subject: [PATCH 06/14] address review comments --- .../encryption/EncryptionManagerFactory.java | 2 +- .../encryption/EncryptionProperties.java | 4 +- .../PlaintextEncryptionManager.java | 14 ++-- .../encryption/StandardEncryptionManager.java | 82 ++++++++----------- .../StandardEncryptionManagerFactory.java | 28 ++++--- 5 files changed, 62 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java index a146faa4f2e7..dad42eb711d7 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java @@ -25,7 +25,7 @@ public interface EncryptionManagerFactory extends Closeable { - EncryptionManagerFactory NO_ENCRYPTION = tableMetadata -> PlaintextEncryptionManager.INSTANCE; + EncryptionManagerFactory NO_ENCRYPTION = tableMetadata -> PlaintextEncryptionManager.instance(); /** * Initialize EncryptionManagerFactory from catalog properties. diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java index 0596ad3ccf97..26e2d120a813 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java @@ -22,9 +22,9 @@ public class EncryptionProperties { private EncryptionProperties() {} - public static final String ENCRYPTION_TABLE_KEY = "encryption.table.key.id"; + public static final String ENCRYPTION_TABLE_KEY = "encryption.table-key-id"; - public static final String ENCRYPTION_DEK_LENGTH = "encryption.data.key.length"; + public static final String ENCRYPTION_DEK_LENGTH = "encryption.data-key-length"; public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index 33f19224e69c..8ab94e6c5c72 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -18,20 +18,24 @@ */ package org.apache.iceberg.encryption; -import java.nio.ByteBuffer; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PlaintextEncryptionManager implements EncryptionManager { - public static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); - + private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + public static EncryptionManager instance() { + return INSTANCE; + } + @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata().buffer() != null) { + if (encrypted.keyMetadata() != null + && encrypted.keyMetadata().buffer() != null + && encrypted.keyMetadata().buffer().capacity() > 0) { LOG.warn( "File encryption key metadata is present, but currently using PlaintextEncryptionManager."); } @@ -40,6 +44,6 @@ public InputFile decrypt(EncryptedInputFile encrypted) { @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { - return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null); + return EncryptedFiles.encryptedOutput(rawOutput, EncryptionKeyMetadata.empty()); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index ae5f5c4d9f29..f5816c95b062 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -24,13 +24,11 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.util.PropertyUtil; public class StandardEncryptionManager implements EncryptionManager { private final KeyManagementClient kmsClient; private String tableKeyId; private int dataKeyLength; - private boolean kmsGeneratedKeys; private transient volatile SecureRandom workerRNG = null; @@ -40,31 +38,22 @@ public class StandardEncryptionManager implements EncryptionManager { * @param encryptionProperties encryption properties */ public StandardEncryptionManager( - String tableKeyId, KeyManagementClient kmsClient, Map encryptionProperties) { - Preconditions.checkNotNull( - tableKeyId, - "Cannot create EnvelopeEncryptionManager because table encryption key ID is not specified"); - Preconditions.checkNotNull( - kmsClient, "Cannot create EnvelopeEncryptionManager because kmsClient is null"); - Preconditions.checkNotNull( - encryptionProperties, - "Cannot create EnvelopeEncryptionManager because encryptionProperties are not passed"); + String tableKeyId, + int dataKeyLength, + KeyManagementClient kmsClient, + Map encryptionProperties) { + Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); + Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); + Preconditions.checkNotNull(encryptionProperties, "Invalid encryption properties: null"); this.tableKeyId = tableKeyId; this.kmsClient = kmsClient; - this.kmsGeneratedKeys = kmsClient.supportsKeyGeneration(); - this.dataKeyLength = - PropertyUtil.propertyAsInt( - encryptionProperties, - EncryptionProperties.ENCRYPTION_DEK_LENGTH, - EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + this.dataKeyLength = dataKeyLength; } @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { - if (null == workerRNG) { - createSecureRandomGenerator(); - } + lazyCreateRNG(); ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); workerRNG.nextBytes(fileDek.array()); @@ -72,49 +61,42 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); workerRNG.nextBytes(aadPrefix.array()); - // For data files - KeyMetadata dataEncryptionMetadata = new KeyMetadata(fileDek, null, aadPrefix); - - // For metadata files - // This is an expensive operation, RPC to KMS server - ByteBuffer wrappedDek = kmsClient.wrapKey(fileDek, tableKeyId); - KeyMetadata manifestEncryptionMetadata = new KeyMetadata(wrappedDek, tableKeyId, aadPrefix); + KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); - // We don't know which key metadata to use, because we don't know what file we encrypt. - // Works for data files: - // return new BaseEncryptedOutputFile(new AesGcmOutputFile(rawOutput, fileDek, aadPrefix), - // dataEncryptionMetadata, rawOutput); - // Works for manifest files: - // return new BaseEncryptedOutputFile(new AesGcmOutputFile(rawOutput, fileDek, aadPrefix), - // manifestEncryptionMetadata, rawOutput); - - // Temp return, for parquet data only. TODO - remove - return new BaseEncryptedOutputFile(null, dataEncryptionMetadata, rawOutput); + return new BaseEncryptedOutputFile( + new AesGcmOutputFile(rawOutput, fileDek.array(), aadPrefix.array()), + encryptionMetadata, + rawOutput); } @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata() == null || encrypted.keyMetadata().buffer() == null) { - throw new RuntimeException( + if (encrypted.keyMetadata() == null + || encrypted.keyMetadata().buffer() == null + || encrypted.keyMetadata().buffer().capacity() == 0) { + throw new SecurityException( "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); } KeyMetadata keyMetadata = KeyMetadata.parse(encrypted.keyMetadata().buffer()); - ByteBuffer fileDek; - if (keyMetadata.wrappingKeyId() == null) { - fileDek = keyMetadata.encryptionKey(); - } else { - fileDek = kmsClient.unwrapKey(keyMetadata.encryptionKey(), keyMetadata.wrappingKeyId()); - } + byte[] fileDek = keyMetadata.encryptionKey().array(); + byte[] aadPrefix = keyMetadata.aadPrefix().array(); - // return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, keyMetadata.aadPrefix()); + return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, aadPrefix); + } + + private void lazyCreateRNG() { + if (this.workerRNG == null) { + this.workerRNG = new SecureRandom(); + } + } - // Temp return null - decrypt is not called for parquet data files. TODO - remove - return null; + public ByteBuffer wrapKey(ByteBuffer secretKey) { + return kmsClient.wrapKey(secretKey, tableKeyId); } - private void createSecureRandomGenerator() { - this.workerRNG = new SecureRandom(); + public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { + return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java index 58902ac0c54b..d5a95058e7b5 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java @@ -41,17 +41,10 @@ public void initialize(Map catalogProperties) { @Override public EncryptionManager create(TableMetadata tableMetadata) { if (tableMetadata == null) { - return PlaintextEncryptionManager.INSTANCE; + return PlaintextEncryptionManager.instance(); } Map tableProperties = tableMetadata.properties(); - String fileFormat = - PropertyUtil.propertyAsString( - tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { - throw new UnsupportedOperationException( - "Iceberg encryption currently supports only parquet format for data files"); - } final Map encryptionProperties = Maps.newHashMap(); encryptionProperties.putAll(tableProperties); @@ -63,10 +56,25 @@ public EncryptionManager create(TableMetadata tableMetadata) { if (null == tableKeyId) { // Unencrypted table - return PlaintextEncryptionManager.INSTANCE; + return PlaintextEncryptionManager.instance(); } else { + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); + } + + int dataKeyLength = + PropertyUtil.propertyAsInt( + encryptionProperties, + EncryptionProperties.ENCRYPTION_DEK_LENGTH, + EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + return new StandardEncryptionManager( - tableKeyId, kmsClient(encryptionProperties), encryptionProperties); + tableKeyId, dataKeyLength, kmsClient(encryptionProperties), encryptionProperties); } } From a9e42711a1d2c30d2f8941ae161bc5323f9a9b56 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Tue, 8 Aug 2023 08:20:41 +0300 Subject: [PATCH 07/14] no encryption in format 1 --- .../iceberg/encryption/PlaintextEncryptionManager.java | 4 +--- .../apache/iceberg/encryption/StandardEncryptionManager.java | 4 +--- .../iceberg/encryption/StandardEncryptionManagerFactory.java | 5 +++++ 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index 8ab94e6c5c72..cc35080f1fb1 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -33,9 +33,7 @@ public static EncryptionManager instance() { @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata() != null - && encrypted.keyMetadata().buffer() != null - && encrypted.keyMetadata().buffer().capacity() > 0) { + if (encrypted.keyMetadata().buffer() != null) { LOG.warn( "File encryption key metadata is present, but currently using PlaintextEncryptionManager."); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index f5816c95b062..edfca5d2099a 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -71,9 +71,7 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata() == null - || encrypted.keyMetadata().buffer() == null - || encrypted.keyMetadata().buffer().capacity() == 0) { + if (encrypted.keyMetadata().buffer() == null) { throw new SecurityException( "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java index d5a95058e7b5..3fb9ef21c076 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java @@ -67,6 +67,11 @@ public EncryptionManager create(TableMetadata tableMetadata) { "Iceberg encryption currently supports only parquet format for data files"); } + if (tableMetadata.formatVersion() < 2) { + throw new UnsupportedOperationException( + "Iceberg encryption works only with table format 2 or higher"); + } + int dataKeyLength = PropertyUtil.propertyAsInt( encryptionProperties, From aa3e56261e47edd2e7a0f2ed832153092ae34d0a Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 4 Sep 2023 09:11:31 +0300 Subject: [PATCH 08/14] null keymetadata --- .../apache/iceberg/encryption/StandardEncryptionManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index edfca5d2099a..04828d5f024f 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -71,10 +71,7 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { @Override public InputFile decrypt(EncryptedInputFile encrypted) { - if (encrypted.keyMetadata().buffer() == null) { - throw new SecurityException( - "Unencrypted file " + encrypted.encryptedInputFile().location() + " in encrypted table"); - } + Preconditions.checkNotNull(encrypted.keyMetadata().buffer(), "Invalid key metadata: null"); KeyMetadata keyMetadata = KeyMetadata.parse(encrypted.keyMetadata().buffer()); From 7e383961ab252eab4c5d4b962200684d76a8fa16 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 20 Sep 2023 09:45:30 +0300 Subject: [PATCH 09/14] address review comments --- .../org/apache/iceberg/CatalogProperties.java | 3 + .../org/apache/iceberg/TableOperations.java | 13 +-- .../iceberg/encryption/EncryptedFiles.java | 11 +++ .../encryption/EncryptionManagerFactory.java | 54 ----------- .../encryption/EncryptionProperties.java | 5 +- .../iceberg/encryption/KeyMetadata.java | 14 +++ .../PlaintextEncryptionManager.java | 6 +- .../encryption/StandardEncryptionManager.java | 62 +++++++----- .../StandardEncryptionManagerFactory.java | 94 +++++++------------ 9 files changed, 107 insertions(+), 155 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index cada23437528..9d350525cbf8 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -156,4 +156,7 @@ private CatalogProperties() {} public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms"; public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1); + public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; + public static final String ENCRYPTION_KMS_CUSTOM_TYPE = "custom"; + public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; } diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 6822809d799e..2280a9417723 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -20,7 +20,6 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.PlaintextEncryptionManager; -import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; @@ -71,7 +70,7 @@ public interface TableOperations { * files. */ default EncryptionManager encryption() { - return new PlaintextEncryptionManager(); + return PlaintextEncryptionManager.instance(); } /** @@ -116,14 +115,4 @@ default TableOperations temp(TableMetadata uncommittedMetadata) { default long newSnapshotId() { return SnapshotIdGeneratorUtil.generateSnapshotID(); } - - /** - * Whether to clean up uncommitted metadata files only when a commit fails with a {@link - * CleanableFailure} exception. - * - *

This defaults to false: any unexpected exception will cause metadata files to be cleaned up. - */ - default boolean requireStrictCleanup() { - return false; - } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index c0fc41ca1385..4014734cdb65 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -45,6 +45,13 @@ public static EncryptedOutputFile encryptedOutput( return new BaseEncryptedOutputFile(encryptingOutputFile, keyMetadata); } + public static EncryptedOutputFile encryptedOutput( + OutputFile encryptingOutputFile, + EncryptionKeyMetadata keyMetadata, + OutputFile rawOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, keyMetadata, rawOutputFile); + } + public static EncryptedOutputFile encryptedOutput( OutputFile encryptingOutputFile, ByteBuffer keyMetadata) { return encryptedOutput( @@ -57,5 +64,9 @@ public static EncryptedOutputFile encryptedOutput( encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata)); } + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); + } + private EncryptedFiles() {} } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java deleted file mode 100644 index dad42eb711d7..000000000000 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagerFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.encryption; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Map; -import org.apache.iceberg.TableMetadata; - -public interface EncryptionManagerFactory extends Closeable { - - EncryptionManagerFactory NO_ENCRYPTION = tableMetadata -> PlaintextEncryptionManager.instance(); - - /** - * Initialize EncryptionManagerFactory from catalog properties. - * - * @param properties catalog properties - */ - default void initialize(Map properties) {} - - /** - * Create encryption manager from table metadata. - * - * @param tableMetadata table metadata - * @return created encryption manager instance. - */ - EncryptionManager create(TableMetadata tableMetadata); - - /** - * Close EncryptionManagerFactory to release underlying resources. - * - *

Calling this method is only required when this EncryptionManagerFactory instance is no - * longer expected to be used, and the resources it holds need to be explicitly released to avoid - * resource leaks. - */ - @Override - default void close() throws IOException {} -} diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java index 26e2d120a813..23398cbc2143 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java @@ -22,13 +22,10 @@ public class EncryptionProperties { private EncryptionProperties() {} - public static final String ENCRYPTION_TABLE_KEY = "encryption.table-key-id"; + public static final String ENCRYPTION_TABLE_KEY = "encryption.key-id"; public static final String ENCRYPTION_DEK_LENGTH = "encryption.data-key-length"; public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; - - /** Implementation of the KMS client for envelope encryption */ - public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java index 0d7ec43f6ebc..02873eb764d6 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java @@ -76,6 +76,20 @@ ByteBuffer aadPrefix() { return aadPrefix; } + static KeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { + if (keyMetadata instanceof KeyMetadata) { + return (KeyMetadata) keyMetadata; + } + + ByteBuffer kmBuffer = keyMetadata.buffer(); + + if (kmBuffer == null) { + throw new IllegalStateException("Null key metadata buffer"); + } + + return parse(kmBuffer); + } + static KeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index cc35080f1fb1..e80c32116ac8 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -27,6 +27,9 @@ public class PlaintextEncryptionManager implements EncryptionManager { private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + @Deprecated + public PlaintextEncryptionManager() {} + public static EncryptionManager instance() { return INSTANCE; } @@ -34,8 +37,7 @@ public static EncryptionManager instance() { @Override public InputFile decrypt(EncryptedInputFile encrypted) { if (encrypted.keyMetadata().buffer() != null) { - LOG.warn( - "File encryption key metadata is present, but currently using PlaintextEncryptionManager."); + LOG.warn("File encryption key metadata is present, but no encryption has been configured."); } return encrypted.encryptedInputFile(); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 04828d5f024f..2956031b81dc 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -20,50 +20,44 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; -import java.util.Map; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { - private final KeyManagementClient kmsClient; - private String tableKeyId; - private int dataKeyLength; + private final transient KeyManagementClient kmsClient; + private final String tableKeyId; + private final int dataKeyLength; - private transient volatile SecureRandom workerRNG = null; + private transient volatile SecureRandom lazyRNG = null; /** * @param tableKeyId table encryption key id + * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption - * @param encryptionProperties encryption properties */ public StandardEncryptionManager( - String tableKeyId, - int dataKeyLength, - KeyManagementClient kmsClient, - Map encryptionProperties) { + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); - Preconditions.checkNotNull(encryptionProperties, "Invalid encryption properties: null"); this.tableKeyId = tableKeyId; this.kmsClient = kmsClient; - this.dataKeyLength = dataKeyLength; } @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { - lazyCreateRNG(); - ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); - workerRNG.nextBytes(fileDek.array()); + workerRNG().nextBytes(fileDek.array()); ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); - workerRNG.nextBytes(aadPrefix.array()); + workerRNG().nextBytes(aadPrefix.array()); KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); - return new BaseEncryptedOutputFile( + return EncryptedFiles.encryptedOutput( new AesGcmOutputFile(rawOutput, fileDek.array(), aadPrefix.array()), encryptionMetadata, rawOutput); @@ -71,27 +65,45 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { @Override public InputFile decrypt(EncryptedInputFile encrypted) { - Preconditions.checkNotNull(encrypted.keyMetadata().buffer(), "Invalid key metadata: null"); + KeyMetadata keyMetadata = KeyMetadata.castOrParse(encrypted.keyMetadata()); - KeyMetadata keyMetadata = KeyMetadata.parse(encrypted.keyMetadata().buffer()); - - byte[] fileDek = keyMetadata.encryptionKey().array(); - byte[] aadPrefix = keyMetadata.aadPrefix().array(); + byte[] fileDek = ByteBuffers.toByteArray(keyMetadata.encryptionKey()); + byte[] aadPrefix = ByteBuffers.toByteArray(keyMetadata.aadPrefix()); return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, aadPrefix); } - private void lazyCreateRNG() { - if (this.workerRNG == null) { - this.workerRNG = new SecureRandom(); + @Override + public Iterable decrypt(Iterable encrypted) { + // Bulk decrypt is only applied to data files. Returning source input files for parquet. + return Iterables.transform(encrypted, this::getSourceFile); + } + + private InputFile getSourceFile(EncryptedInputFile encryptedFile) { + return encryptedFile.encryptedInputFile(); + } + + private SecureRandom workerRNG() { + if (this.lazyRNG == null) { + this.lazyRNG = new SecureRandom(); } + + return lazyRNG; } public ByteBuffer wrapKey(ByteBuffer secretKey) { + if (kmsClient == null) { + throw new IllegalStateException("Null KmsClient. WrapKey can't be called from workers"); + } + return kmsClient.wrapKey(secretKey, tableKeyId); } public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { + if (kmsClient == null) { + throw new IllegalStateException("Null KmsClient. UnwrapKey can't be called from workers"); + } + return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java index 3fb9ef21c076..1c24ca964b27 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java @@ -21,90 +21,68 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import java.io.Closeable; import java.io.IOException; import java.util.Map; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; -public class StandardEncryptionManagerFactory implements EncryptionManagerFactory { - private KeyManagementClient kmsClient; - private Map catalogPropertyMap; +public class StandardEncryptionManagerFactory implements Closeable { + private final KeyManagementClient kmsClient; - @Override - public void initialize(Map catalogProperties) { - this.catalogPropertyMap = catalogProperties; - } + public StandardEncryptionManagerFactory(Map catalogProperties) { + String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); - @Override - public EncryptionManager create(TableMetadata tableMetadata) { - if (tableMetadata == null) { - return PlaintextEncryptionManager.instance(); + if (kmsType == null) { + kmsClient = null; + } else if (kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { + String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); + if (kmsClientImpl == null) { + throw new IllegalStateException("Custom KMS client class is not defined"); + } + kmsClient = EncryptionUtil.createKmsClient(kmsClientImpl); + kmsClient.initialize(catalogProperties); + } else { + // Currently support only custom types + throw new UnsupportedOperationException("Undefined KMS type " + kmsType); } + } - Map tableProperties = tableMetadata.properties(); - - final Map encryptionProperties = Maps.newHashMap(); - encryptionProperties.putAll(tableProperties); - - // Important: put catalog properties after table properties. Former overrides the latter. - encryptionProperties.putAll(catalogPropertyMap); - - String tableKeyId = encryptionProperties.get(EncryptionProperties.ENCRYPTION_TABLE_KEY); + public EncryptionManager create(Map tableProperties) { + String tableKeyId = tableProperties.get(EncryptionProperties.ENCRYPTION_TABLE_KEY); if (null == tableKeyId) { // Unencrypted table return PlaintextEncryptionManager.instance(); - } else { - String fileFormat = - PropertyUtil.propertyAsString( - tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - - if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { - throw new UnsupportedOperationException( - "Iceberg encryption currently supports only parquet format for data files"); - } - - if (tableMetadata.formatVersion() < 2) { - throw new UnsupportedOperationException( - "Iceberg encryption works only with table format 2 or higher"); - } - - int dataKeyLength = - PropertyUtil.propertyAsInt( - encryptionProperties, - EncryptionProperties.ENCRYPTION_DEK_LENGTH, - EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); - - return new StandardEncryptionManager( - tableKeyId, dataKeyLength, kmsClient(encryptionProperties), encryptionProperties); } - } - private synchronized KeyManagementClient kmsClient(Map encryptionProperties) { if (kmsClient == null) { - String kmsImpl = encryptionProperties.get(EncryptionProperties.ENCRYPTION_KMS_CLIENT_IMPL); + throw new IllegalStateException("Encrypted table. No KMS client is configured in catalog"); + } - Preconditions.checkArgument( - null != kmsImpl, - "KMS Client implementation class is not set (via " - + EncryptionProperties.ENCRYPTION_KMS_CLIENT_IMPL - + " catalog property or table property)"); + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - kmsClient = EncryptionUtil.createKmsClient(kmsImpl); - kmsClient.initialize(encryptionProperties); + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); } - return kmsClient; + int dataKeyLength = + PropertyUtil.propertyAsInt( + tableProperties, + EncryptionProperties.ENCRYPTION_DEK_LENGTH, + EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); } @Override public synchronized void close() throws IOException { if (kmsClient != null) { kmsClient.close(); - kmsClient = null; } } } From 87ba3ec1210ee31e88bf80c6849f97d4d330dd69 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 20 Sep 2023 09:51:45 +0300 Subject: [PATCH 10/14] cleanup --- .../main/java/org/apache/iceberg/TableOperations.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 2280a9417723..7f3106253ca6 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -20,6 +20,7 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.exceptions.CleanableFailure; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; @@ -115,4 +116,14 @@ default TableOperations temp(TableMetadata uncommittedMetadata) { default long newSnapshotId() { return SnapshotIdGeneratorUtil.generateSnapshotID(); } + + /** + * Whether to clean up uncommitted metadata files only when a commit fails with a {@link + * CleanableFailure} exception. + * + *

This defaults to false: any unexpected exception will cause metadata files to be cleaned up. + */ + default boolean requireStrictCleanup() { + return false; + } } From 925c73a5b406d00ba4439149f5da696eef3cc488 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 20 Sep 2023 11:17:09 +0300 Subject: [PATCH 11/14] enable SEMFactory only if kms type is set --- .../encryption/StandardEncryptionManagerFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java index 1c24ca964b27..9e59fceb3501 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java @@ -35,8 +35,10 @@ public StandardEncryptionManagerFactory(Map catalogProperties) { String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); if (kmsType == null) { - kmsClient = null; - } else if (kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { + throw new IllegalStateException("Cannot create StandardEncryptionManagerFactory without KMS type"); + } + + if (kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); if (kmsClientImpl == null) { throw new IllegalStateException("Custom KMS client class is not defined"); From 726e56822fc3e445ace4c9f8fc1dd6e38b953f33 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 16 Oct 2023 10:12:37 +0300 Subject: [PATCH 12/14] address review comments --- .../org/apache/iceberg/TableProperties.java | 7 + .../iceberg/encryption/EncryptedFiles.java | 7 - .../encryption/EncryptionProperties.java | 31 ---- .../iceberg/encryption/EncryptionUtil.java | 137 ++++++++++++++++++ .../PlaintextEncryptionManager.java | 1 + .../encryption/StandardEncryptionManager.java | 37 ++++- .../StandardEncryptionManagerFactory.java | 90 ------------ 7 files changed, 180 insertions(+), 130 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java delete mode 100644 core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 03e1f3ce8897..fb5da6065839 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -369,4 +369,11 @@ private TableProperties() {} public static final String UPSERT_ENABLED = "write.upsert.enabled"; public static final boolean UPSERT_ENABLED_DEFAULT = false; + + public static final String ENCRYPTION_TABLE_KEY = "encryption.key-id"; + + public static final String ENCRYPTION_DEK_LENGTH = "encryption.data-key-length"; + public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; + + public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index 4014734cdb65..912104a5305d 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -45,13 +45,6 @@ public static EncryptedOutputFile encryptedOutput( return new BaseEncryptedOutputFile(encryptingOutputFile, keyMetadata); } - public static EncryptedOutputFile encryptedOutput( - OutputFile encryptingOutputFile, - EncryptionKeyMetadata keyMetadata, - OutputFile rawOutputFile) { - return new BaseEncryptedOutputFile(encryptingOutputFile, keyMetadata, rawOutputFile); - } - public static EncryptedOutputFile encryptedOutput( OutputFile encryptingOutputFile, ByteBuffer keyMetadata) { return encryptedOutput( diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java deleted file mode 100644 index 23398cbc2143..000000000000 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionProperties.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.encryption; - -public class EncryptionProperties { - - private EncryptionProperties() {} - - public static final String ENCRYPTION_TABLE_KEY = "encryption.key-id"; - - public static final String ENCRYPTION_DEK_LENGTH = "encryption.data-key-length"; - public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; - - public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; -} diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java new file mode 100644 index 000000000000..ec8156b139ca --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH; +import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT; +import static org.apache.iceberg.TableProperties.ENCRYPTION_TABLE_KEY; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.util.PropertyUtil; + +public class EncryptionUtil { + + private EncryptionUtil() {} + + public static KeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) { + return KeyMetadata.parse(metadataBuffer); + } + + public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) { + return new KeyMetadata(key, aadPrefix); + } + + public static long gcmEncryptionLength(long plainFileLength) { + int numberOfFullBlocks = Math.toIntExact(plainFileLength / Ciphers.PLAIN_BLOCK_SIZE); + int plainBytesInLastBlock = + Math.toIntExact(plainFileLength - numberOfFullBlocks * Ciphers.PLAIN_BLOCK_SIZE); + boolean fullBlocksOnly = (0 == plainBytesInLastBlock); + int cipherBytesInLastBlock = + fullBlocksOnly ? 0 : plainBytesInLastBlock + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + int cipherBlockSize = Ciphers.PLAIN_BLOCK_SIZE + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + return (long) Ciphers.GCM_STREAM_HEADER_LENGTH + + numberOfFullBlocks * cipherBlockSize + + cipherBytesInLastBlock; + } + + public static KeyManagementClient createKmsClient(Map catalogProperties) { + String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); + + if (kmsType == null) { + throw new IllegalStateException( + "Cannot create StandardEncryptionManagerFactory without KMS type"); + } + + if (!kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { + // Currently support only custom types + throw new UnsupportedOperationException("Undefined KMS type " + kmsType); + } + + String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); + + if (kmsClientImpl == null) { + throw new IllegalStateException("Custom KMS client class is not defined"); + } + + KeyManagementClient kmsClient; + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsClientImpl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize KeyManagementClient, missing no-arg constructor for class %s", + kmsClientImpl), + e); + } + + try { + kmsClient = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize kms client, %s does not implement KeyManagementClient interface", + kmsClientImpl), + e); + } + + kmsClient.initialize(catalogProperties); + + return kmsClient; + } + + public static EncryptionManager createEncryptionManager( + Map tableProperties, KeyManagementClient kmsClient) { + String tableKeyId = tableProperties.get(ENCRYPTION_TABLE_KEY); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.instance(); + } + + if (kmsClient == null) { + throw new IllegalStateException("Encrypted table. No KMS client is configured in catalog"); + } + + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); + } + + int dataKeyLength = + PropertyUtil.propertyAsInt( + tableProperties, ENCRYPTION_DEK_LENGTH, ENCRYPTION_DEK_LENGTH_DEFAULT); + + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + } + + public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) { + return keyMetadata != null && keyMetadata instanceof KeyMetadata; + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index e80c32116ac8..336ebe9c0af7 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -27,6 +27,7 @@ public class PlaintextEncryptionManager implements EncryptionManager { private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + /** @deprecated will be removed in 1.6.0. use {@link #instance()} instead. */ @Deprecated public PlaintextEncryptionManager() {} diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 2956031b81dc..7f95efd9ea60 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -33,6 +34,38 @@ public class StandardEncryptionManager implements EncryptionManager { private transient volatile SecureRandom lazyRNG = null; + class StandardEncryptedOutputFile implements EncryptedOutputFile { + + private final OutputFile encryptingOutputFile; + + private final EncryptionKeyMetadata keyMetadata; + private final OutputFile rawOutputFile; + + StandardEncryptedOutputFile( + OutputFile encryptingOutputFile, + EncryptionKeyMetadata keyMetadata, + OutputFile rawOutputFile) { + this.encryptingOutputFile = encryptingOutputFile; + this.keyMetadata = keyMetadata; + this.rawOutputFile = rawOutputFile; + } + + @Override + public OutputFile encryptingOutputFile() { + return encryptingOutputFile; + } + + @Override + public EncryptionKeyMetadata keyMetadata() { + return keyMetadata; + } + + @Override + public OutputFile rawOutputFile() { + return rawOutputFile; + } + } + /** * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) @@ -52,12 +85,12 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); workerRNG().nextBytes(fileDek.array()); - ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); + ByteBuffer aadPrefix = ByteBuffer.allocate(TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); workerRNG().nextBytes(aadPrefix.array()); KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); - return EncryptedFiles.encryptedOutput( + return new StandardEncryptedOutputFile( new AesGcmOutputFile(rawOutput, fileDek.array(), aadPrefix.array()), encryptionMetadata, rawOutput); diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java deleted file mode 100644 index 9e59fceb3501..000000000000 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManagerFactory.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.encryption; - -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Map; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.util.PropertyUtil; - -public class StandardEncryptionManagerFactory implements Closeable { - private final KeyManagementClient kmsClient; - - public StandardEncryptionManagerFactory(Map catalogProperties) { - String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); - - if (kmsType == null) { - throw new IllegalStateException("Cannot create StandardEncryptionManagerFactory without KMS type"); - } - - if (kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { - String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); - if (kmsClientImpl == null) { - throw new IllegalStateException("Custom KMS client class is not defined"); - } - kmsClient = EncryptionUtil.createKmsClient(kmsClientImpl); - kmsClient.initialize(catalogProperties); - } else { - // Currently support only custom types - throw new UnsupportedOperationException("Undefined KMS type " + kmsType); - } - } - - public EncryptionManager create(Map tableProperties) { - String tableKeyId = tableProperties.get(EncryptionProperties.ENCRYPTION_TABLE_KEY); - - if (null == tableKeyId) { - // Unencrypted table - return PlaintextEncryptionManager.instance(); - } - - if (kmsClient == null) { - throw new IllegalStateException("Encrypted table. No KMS client is configured in catalog"); - } - - String fileFormat = - PropertyUtil.propertyAsString( - tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - - if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { - throw new UnsupportedOperationException( - "Iceberg encryption currently supports only parquet format for data files"); - } - - int dataKeyLength = - PropertyUtil.propertyAsInt( - tableProperties, - EncryptionProperties.ENCRYPTION_DEK_LENGTH, - EncryptionProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); - - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); - } - - @Override - public synchronized void close() throws IOException { - if (kmsClient != null) { - kmsClient.close(); - } - } -} From 06fbac611795b7526c1511aabac18bac73ee6333 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 16 Oct 2023 10:18:34 +0300 Subject: [PATCH 13/14] add changes in EncryptedOutputFile --- .../org/apache/iceberg/encryption/EncryptedOutputFile.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 1686342c776d..54dce5d3263e 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,4 +37,9 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); + + /** Underlying output file for native encryption. */ + default OutputFile rawOutputFile() { + throw new UnsupportedOperationException("Not implemented"); + }; } From 86d7e17f72aff0271ae7e5fff3b59261f2d2e528 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 10 Dec 2023 15:48:10 -0800 Subject: [PATCH 14/14] Review fixes. --- .../encryption/EncryptedOutputFile.java | 4 +- .../org/apache/iceberg/CatalogProperties.java | 4 +- .../iceberg/encryption/EncryptedFiles.java | 4 - .../iceberg/encryption/EncryptionUtil.java | 82 +++------ .../encryption/KeyMetadataDecoder.java | 12 +- .../encryption/KeyMetadataEncoder.java | 18 +- .../encryption/StandardEncryptionManager.java | 167 ++++++++++++------ ...Metadata.java => StandardKeyMetadata.java} | 24 +-- ...ava => TestStandardKeyMetadataParser.java} | 9 +- 9 files changed, 173 insertions(+), 151 deletions(-) rename core/src/main/java/org/apache/iceberg/encryption/{KeyMetadata.java => StandardKeyMetadata.java} (84%) rename core/src/test/java/org/apache/iceberg/encryption/{TestKeyMetadataParser.java => TestStandardKeyMetadataParser.java} (84%) diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 54dce5d3263e..300c88d18862 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -39,7 +39,7 @@ public interface EncryptedOutputFile { EncryptionKeyMetadata keyMetadata(); /** Underlying output file for native encryption. */ - default OutputFile rawOutputFile() { + default OutputFile plainOutputFile() { throw new UnsupportedOperationException("Not implemented"); - }; + } } diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 9d350525cbf8..b6fd990f0ac6 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -156,7 +156,7 @@ private CatalogProperties() {} public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms"; public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1); + public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; - public static final String ENCRYPTION_KMS_CUSTOM_TYPE = "custom"; - public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; + public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl"; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index 912104a5305d..c0fc41ca1385 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -57,9 +57,5 @@ public static EncryptedOutputFile encryptedOutput( encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata)); } - public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { - return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); - } - private EncryptedFiles() {} } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index ec8156b139ca..ad1deecb8da1 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,72 +18,40 @@ */ package org.apache.iceberg.encryption; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH; -import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT; -import static org.apache.iceberg.TableProperties.ENCRYPTION_TABLE_KEY; - -import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; public class EncryptionUtil { private EncryptionUtil() {} - public static KeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) { - return KeyMetadata.parse(metadataBuffer); - } - - public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) { - return new KeyMetadata(key, aadPrefix); - } - - public static long gcmEncryptionLength(long plainFileLength) { - int numberOfFullBlocks = Math.toIntExact(plainFileLength / Ciphers.PLAIN_BLOCK_SIZE); - int plainBytesInLastBlock = - Math.toIntExact(plainFileLength - numberOfFullBlocks * Ciphers.PLAIN_BLOCK_SIZE); - boolean fullBlocksOnly = (0 == plainBytesInLastBlock); - int cipherBytesInLastBlock = - fullBlocksOnly ? 0 : plainBytesInLastBlock + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; - int cipherBlockSize = Ciphers.PLAIN_BLOCK_SIZE + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; - return (long) Ciphers.GCM_STREAM_HEADER_LENGTH - + numberOfFullBlocks * cipherBlockSize - + cipherBytesInLastBlock; - } - public static KeyManagementClient createKmsClient(Map catalogProperties) { String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); + String kmsImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_IMPL); - if (kmsType == null) { - throw new IllegalStateException( - "Cannot create StandardEncryptionManagerFactory without KMS type"); - } + Preconditions.checkArgument( + kmsType == null || kmsImpl == null, + "Cannot set both KMS type (%s) and KMS impl (%s)", + kmsType, + kmsImpl); - if (!kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { - // Currently support only custom types - throw new UnsupportedOperationException("Undefined KMS type " + kmsType); - } - - String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); - - if (kmsClientImpl == null) { - throw new IllegalStateException("Custom KMS client class is not defined"); - } + // TODO: Add KMS implementations + Preconditions.checkArgument(kmsType == null, "Unsupported KMS type: %s", kmsType); KeyManagementClient kmsClient; DynConstructors.Ctor ctor; try { - ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsClientImpl).buildChecked(); + ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsImpl).buildChecked(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException( String.format( "Cannot initialize KeyManagementClient, missing no-arg constructor for class %s", - kmsClientImpl), + kmsImpl), e); } @@ -93,7 +61,7 @@ public static KeyManagementClient createKmsClient(Map catalogPro throw new IllegalArgumentException( String.format( "Cannot initialize kms client, %s does not implement KeyManagementClient interface", - kmsClientImpl), + kmsImpl), e); } @@ -104,20 +72,19 @@ public static KeyManagementClient createKmsClient(Map catalogPro public static EncryptionManager createEncryptionManager( Map tableProperties, KeyManagementClient kmsClient) { - String tableKeyId = tableProperties.get(ENCRYPTION_TABLE_KEY); + Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); + String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); if (null == tableKeyId) { // Unencrypted table return PlaintextEncryptionManager.instance(); } - if (kmsClient == null) { - throw new IllegalStateException("Encrypted table. No KMS client is configured in catalog"); - } - String fileFormat = PropertyUtil.propertyAsString( - tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + tableProperties, + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { throw new UnsupportedOperationException( @@ -126,12 +93,15 @@ public static EncryptionManager createEncryptionManager( int dataKeyLength = PropertyUtil.propertyAsInt( - tableProperties, ENCRYPTION_DEK_LENGTH, ENCRYPTION_DEK_LENGTH_DEFAULT); + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); - } + Preconditions.checkState( + dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, + "Invalid data key length: %s (must be 16, 24, or 32)", + dataKeyLength); - public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) { - return keyMetadata != null && keyMetadata instanceof KeyMetadata; + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java index 674685c30164..7e57163d73ea 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java @@ -28,9 +28,9 @@ import org.apache.iceberg.data.avro.RawDecoder; import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; -class KeyMetadataDecoder extends MessageDecoder.BaseDecoder { +class KeyMetadataDecoder extends MessageDecoder.BaseDecoder { private final org.apache.iceberg.Schema readSchema; - private final Map> decoders = new MapMaker().makeMap(); + private final Map> decoders = new MapMaker().makeMap(); /** * Creates a new decoder that constructs key metadata instances described by schema version. @@ -39,11 +39,11 @@ class KeyMetadataDecoder extends MessageDecoder.BaseDecoder { * instances created by this class will are described by the expected schema. */ KeyMetadataDecoder(byte readSchemaVersion) { - this.readSchema = KeyMetadata.supportedSchemaVersions().get(readSchemaVersion); + this.readSchema = StandardKeyMetadata.supportedSchemaVersions().get(readSchemaVersion); } @Override - public KeyMetadata decode(InputStream stream, KeyMetadata reuse) { + public StandardKeyMetadata decode(InputStream stream, StandardKeyMetadata reuse) { byte writeSchemaVersion; try { @@ -56,14 +56,14 @@ public KeyMetadata decode(InputStream stream, KeyMetadata reuse) { throw new RuntimeException("Version byte - end of stream reached"); } - Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(writeSchemaVersion); + Schema writeSchema = StandardKeyMetadata.supportedAvroSchemaVersions().get(writeSchemaVersion); if (writeSchema == null) { throw new UnsupportedOperationException( "Cannot resolve schema for version: " + writeSchemaVersion); } - RawDecoder decoder = decoders.get(writeSchemaVersion); + RawDecoder decoder = decoders.get(writeSchemaVersion); if (decoder == null) { decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java index faab6a47c814..6f9d1769f07d 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java @@ -29,18 +29,18 @@ import org.apache.avro.message.MessageEncoder; import org.apache.iceberg.avro.GenericAvroWriter; -class KeyMetadataEncoder implements MessageEncoder { +class KeyMetadataEncoder implements MessageEncoder { private static final ThreadLocal TEMP = ThreadLocal.withInitial(BufferOutputStream::new); private static final ThreadLocal ENCODER = new ThreadLocal<>(); private final byte schemaVersion; private final boolean copyOutputBytes; - private final DatumWriter writer; + private final DatumWriter writer; /** - * Creates a new {@link MessageEncoder} that will deconstruct {@link KeyMetadata} instances - * described by the schema version. + * Creates a new {@link MessageEncoder} that will deconstruct {@link StandardKeyMetadata} + * instances described by the schema version. * *

Buffers returned by {@code encode} are copied and will not be modified by future calls to * {@code encode}. @@ -50,8 +50,8 @@ class KeyMetadataEncoder implements MessageEncoder { } /** - * Creates a new {@link MessageEncoder} that will deconstruct {@link KeyMetadata} instances - * described by the schema version. + * Creates a new {@link MessageEncoder} that will deconstruct {@link StandardKeyMetadata} + * instances described by the schema version. * *

If {@code shouldCopy} is true, then buffers returned by {@code encode} are copied and will * not be modified by future calls to {@code encode}. @@ -62,7 +62,7 @@ class KeyMetadataEncoder implements MessageEncoder { * next call to {@code encode}. */ KeyMetadataEncoder(byte schemaVersion, boolean shouldCopy) { - Schema writeSchema = KeyMetadata.supportedAvroSchemaVersions().get(schemaVersion); + Schema writeSchema = StandardKeyMetadata.supportedAvroSchemaVersions().get(schemaVersion); if (writeSchema == null) { throw new UnsupportedOperationException( @@ -75,7 +75,7 @@ class KeyMetadataEncoder implements MessageEncoder { } @Override - public ByteBuffer encode(KeyMetadata datum) throws IOException { + public ByteBuffer encode(StandardKeyMetadata datum) throws IOException { BufferOutputStream temp = TEMP.get(); temp.reset(); temp.write(schemaVersion); @@ -89,7 +89,7 @@ public ByteBuffer encode(KeyMetadata datum) throws IOException { } @Override - public void encode(KeyMetadata datum, OutputStream stream) throws IOException { + public void encode(StandardKeyMetadata datum, OutputStream stream) throws IOException { BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); ENCODER.set(encoder); writer.write(datum, encoder); diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 7f95efd9ea60..63f89e7661b3 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -23,6 +23,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.ByteBuffers; @@ -34,38 +35,6 @@ public class StandardEncryptionManager implements EncryptionManager { private transient volatile SecureRandom lazyRNG = null; - class StandardEncryptedOutputFile implements EncryptedOutputFile { - - private final OutputFile encryptingOutputFile; - - private final EncryptionKeyMetadata keyMetadata; - private final OutputFile rawOutputFile; - - StandardEncryptedOutputFile( - OutputFile encryptingOutputFile, - EncryptionKeyMetadata keyMetadata, - OutputFile rawOutputFile) { - this.encryptingOutputFile = encryptingOutputFile; - this.keyMetadata = keyMetadata; - this.rawOutputFile = rawOutputFile; - } - - @Override - public OutputFile encryptingOutputFile() { - return encryptingOutputFile; - } - - @Override - public EncryptionKeyMetadata keyMetadata() { - return keyMetadata; - } - - @Override - public OutputFile rawOutputFile() { - return rawOutputFile; - } - } - /** * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) @@ -74,6 +43,10 @@ public OutputFile rawOutputFile() { public StandardEncryptionManager( String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); + Preconditions.checkArgument( + dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, + "Invalid data key length: %s (must be 16, 24, or 32)", + dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; this.kmsClient = kmsClient; @@ -81,39 +54,20 @@ public StandardEncryptionManager( } @Override - public EncryptedOutputFile encrypt(OutputFile rawOutput) { - ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); - workerRNG().nextBytes(fileDek.array()); - - ByteBuffer aadPrefix = ByteBuffer.allocate(TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); - workerRNG().nextBytes(aadPrefix.array()); - - KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); - - return new StandardEncryptedOutputFile( - new AesGcmOutputFile(rawOutput, fileDek.array(), aadPrefix.array()), - encryptionMetadata, - rawOutput); + public EncryptedOutputFile encrypt(OutputFile plainOutput) { + return new StandardEncryptedOutputFile(plainOutput, dataKeyLength); } @Override public InputFile decrypt(EncryptedInputFile encrypted) { - KeyMetadata keyMetadata = KeyMetadata.castOrParse(encrypted.keyMetadata()); - - byte[] fileDek = ByteBuffers.toByteArray(keyMetadata.encryptionKey()); - byte[] aadPrefix = ByteBuffers.toByteArray(keyMetadata.aadPrefix()); - - return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, aadPrefix); + // this input file will lazily parse key metadata in case the file is not an AES GCM stream. + return new StandardDecryptedInputFile(encrypted); } @Override public Iterable decrypt(Iterable encrypted) { // Bulk decrypt is only applied to data files. Returning source input files for parquet. - return Iterables.transform(encrypted, this::getSourceFile); - } - - private InputFile getSourceFile(EncryptedInputFile encryptedFile) { - return encryptedFile.encryptedInputFile(); + return Iterables.transform(encrypted, this::decrypt); } private SecureRandom workerRNG() { @@ -126,7 +80,8 @@ private SecureRandom workerRNG() { public ByteBuffer wrapKey(ByteBuffer secretKey) { if (kmsClient == null) { - throw new IllegalStateException("Null KmsClient. WrapKey can't be called from workers"); + throw new IllegalStateException( + "Cannot wrap key after called after serialization (missing KMS client)"); } return kmsClient.wrapKey(secretKey, tableKeyId); @@ -134,9 +89,105 @@ public ByteBuffer wrapKey(ByteBuffer secretKey) { public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { if (kmsClient == null) { - throw new IllegalStateException("Null KmsClient. UnwrapKey can't be called from workers"); + throw new IllegalStateException( + "Cannot wrap key after called after serialization (missing KMS client)"); } return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } + + private class StandardEncryptedOutputFile implements EncryptedOutputFile { + private final OutputFile plainOutputFile; + private final int dataKeyLength; + private StandardKeyMetadata lazyKeyMetadata = null; + private OutputFile lazyEncryptingOutputFile = null; + + StandardEncryptedOutputFile(OutputFile plainOutputFile, int dataKeyLength) { + this.plainOutputFile = plainOutputFile; + this.dataKeyLength = dataKeyLength; + } + + @Override + public StandardKeyMetadata keyMetadata() { + if (null == lazyKeyMetadata) { + byte[] fileDek = new byte[dataKeyLength]; + workerRNG().nextBytes(fileDek); + + byte[] aadPrefix = new byte[TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT]; + workerRNG().nextBytes(aadPrefix); + + this.lazyKeyMetadata = new StandardKeyMetadata(fileDek, aadPrefix); + } + + return lazyKeyMetadata; + } + + @Override + public OutputFile encryptingOutputFile() { + if (null == lazyEncryptingOutputFile) { + this.lazyEncryptingOutputFile = + new AesGcmOutputFile( + plainOutputFile, + ByteBuffers.toByteArray(keyMetadata().encryptionKey()), + ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + } + + return lazyEncryptingOutputFile; + } + + @Override + public OutputFile plainOutputFile() { + return plainOutputFile; + } + } + + private static class StandardDecryptedInputFile implements InputFile { + private final EncryptedInputFile encryptedInputFile; + private StandardKeyMetadata lazyKeyMetadata = null; + private AesGcmInputFile lazyDecryptedInputFile = null; + + private StandardDecryptedInputFile(EncryptedInputFile encryptedInputFile) { + this.encryptedInputFile = encryptedInputFile; + } + + private StandardKeyMetadata keyMetadata() { + if (null == lazyKeyMetadata) { + this.lazyKeyMetadata = StandardKeyMetadata.castOrParse(encryptedInputFile.keyMetadata()); + } + + return lazyKeyMetadata; + } + + private AesGcmInputFile decrypted() { + if (null == lazyDecryptedInputFile) { + this.lazyDecryptedInputFile = + new AesGcmInputFile( + encryptedInputFile.encryptedInputFile(), + ByteBuffers.toByteArray(keyMetadata().encryptionKey()), + ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + } + + return lazyDecryptedInputFile; + } + + @Override + public long getLength() { + return decrypted().getLength(); + } + + @Override + public SeekableInputStream newStream() { + return decrypted().newStream(); + } + + @Override + public String location() { + return decrypted().location(); + } + + @Override + public boolean exists() { + return decrypted().exists(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java similarity index 84% rename from core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java rename to core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 02873eb764d6..2ac70aebc316 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -31,14 +31,14 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; -class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { +class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord { private static final byte V1 = 1; private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), optional(1, "aad_prefix", Types.BinaryType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = - AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName()); + AvroSchemaUtil.convert(SCHEMA_V1, StandardKeyMetadata.class.getCanonicalName()); private static final Map schemaVersions = ImmutableMap.of(V1, SCHEMA_V1); private static final Map avroSchemaVersions = @@ -52,9 +52,14 @@ class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { private org.apache.avro.Schema avroSchema; /** Used by Avro reflection to instantiate this class * */ - KeyMetadata() {} + StandardKeyMetadata() {} - KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + StandardKeyMetadata(byte[] key, byte[] aad) { + this.encryptionKey = ByteBuffer.wrap(key); + this.aadPrefix = ByteBuffer.wrap(aad); + } + + private StandardKeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { this.encryptionKey = encryptionKey; this.aadPrefix = aadPrefix; this.avroSchema = AVRO_SCHEMA_V1; @@ -76,9 +81,9 @@ ByteBuffer aadPrefix() { return aadPrefix; } - static KeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { - if (keyMetadata instanceof KeyMetadata) { - return (KeyMetadata) keyMetadata; + static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { + if (keyMetadata instanceof StandardKeyMetadata) { + return (StandardKeyMetadata) keyMetadata; } ByteBuffer kmBuffer = keyMetadata.buffer(); @@ -90,7 +95,7 @@ static KeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { return parse(kmBuffer); } - static KeyMetadata parse(ByteBuffer buffer) { + static StandardKeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); } catch (IOException e) { @@ -109,8 +114,7 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix()); - return metadata; + return new StandardKeyMetadata(encryptionKey(), aadPrefix()); } @Override diff --git a/core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java b/core/src/test/java/org/apache/iceberg/encryption/TestStandardKeyMetadataParser.java similarity index 84% rename from core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java rename to core/src/test/java/org/apache/iceberg/encryption/TestStandardKeyMetadataParser.java index 27b00b1829c6..889506cb93e4 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/TestKeyMetadataParser.java +++ b/core/src/test/java/org/apache/iceberg/encryption/TestStandardKeyMetadataParser.java @@ -24,16 +24,17 @@ import org.junit.Assert; import org.junit.Test; -public class TestKeyMetadataParser { +public class TestStandardKeyMetadataParser { @Test public void testParser() { ByteBuffer encryptionKey = ByteBuffer.wrap("0123456789012345".getBytes(StandardCharsets.UTF_8)); ByteBuffer aadPrefix = ByteBuffer.wrap("1234567890123456".getBytes(StandardCharsets.UTF_8)); - KeyMetadata metadata = new KeyMetadata(encryptionKey, aadPrefix); + StandardKeyMetadata metadata = + new StandardKeyMetadata(encryptionKey.array(), aadPrefix.array()); ByteBuffer serialized = metadata.buffer(); - KeyMetadata parsedMetadata = KeyMetadata.parse(serialized); + StandardKeyMetadata parsedMetadata = StandardKeyMetadata.parse(serialized); Assert.assertEquals(parsedMetadata.encryptionKey(), encryptionKey); Assert.assertEquals(parsedMetadata.aadPrefix(), aadPrefix); } @@ -41,7 +42,7 @@ public void testParser() { @Test public void testUnsupportedVersion() { ByteBuffer badBuffer = ByteBuffer.wrap(new byte[] {0x02}); - Assertions.assertThatThrownBy(() -> KeyMetadata.parse(badBuffer)) + Assertions.assertThatThrownBy(() -> StandardKeyMetadata.parse(badBuffer)) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot resolve schema for version: 2"); }