diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 1686342c776d..54dce5d3263e 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,4 +37,9 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); + + /** Underlying output file for native encryption. */ + default OutputFile rawOutputFile() { + throw new UnsupportedOperationException("Not implemented"); + }; } diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index cada23437528..9d350525cbf8 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -156,4 +156,7 @@ private CatalogProperties() {} public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms"; public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1); + public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; + public static final String ENCRYPTION_KMS_CUSTOM_TYPE = "custom"; + public static final String ENCRYPTION_KMS_CLIENT_IMPL = "encryption.kms.client-impl"; } diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 6822809d799e..7f3106253ca6 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -71,7 +71,7 @@ public interface TableOperations { * files. */ default EncryptionManager encryption() { - return new PlaintextEncryptionManager(); + return PlaintextEncryptionManager.instance(); } /** diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 03e1f3ce8897..fb5da6065839 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -369,4 +369,11 @@ private TableProperties() {} public static final String UPSERT_ENABLED = "write.upsert.enabled"; public static final boolean UPSERT_ENABLED_DEFAULT = false; + + public static final String ENCRYPTION_TABLE_KEY = "encryption.key-id"; + + public static final String ENCRYPTION_DEK_LENGTH = "encryption.data-key-length"; + public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; + + public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index c0fc41ca1385..912104a5305d 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -57,5 +57,9 @@ public static EncryptedOutputFile encryptedOutput( encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata)); } + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); + } + private EncryptedFiles() {} } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java new file mode 100644 index 000000000000..ec8156b139ca --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH; +import static org.apache.iceberg.TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT; +import static org.apache.iceberg.TableProperties.ENCRYPTION_TABLE_KEY; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.util.PropertyUtil; + +public class EncryptionUtil { + + private EncryptionUtil() {} + + public static KeyMetadata parseKeyMetadata(ByteBuffer metadataBuffer) { + return KeyMetadata.parse(metadataBuffer); + } + + public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) { + return new KeyMetadata(key, aadPrefix); + } + + public static long gcmEncryptionLength(long plainFileLength) { + int numberOfFullBlocks = Math.toIntExact(plainFileLength / Ciphers.PLAIN_BLOCK_SIZE); + int plainBytesInLastBlock = + Math.toIntExact(plainFileLength - numberOfFullBlocks * Ciphers.PLAIN_BLOCK_SIZE); + boolean fullBlocksOnly = (0 == plainBytesInLastBlock); + int cipherBytesInLastBlock = + fullBlocksOnly ? 0 : plainBytesInLastBlock + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + int cipherBlockSize = Ciphers.PLAIN_BLOCK_SIZE + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + return (long) Ciphers.GCM_STREAM_HEADER_LENGTH + + numberOfFullBlocks * cipherBlockSize + + cipherBytesInLastBlock; + } + + public static KeyManagementClient createKmsClient(Map catalogProperties) { + String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); + + if (kmsType == null) { + throw new IllegalStateException( + "Cannot create StandardEncryptionManagerFactory without KMS type"); + } + + if (!kmsType.equals(CatalogProperties.ENCRYPTION_KMS_CUSTOM_TYPE)) { + // Currently support only custom types + throw new UnsupportedOperationException("Undefined KMS type " + kmsType); + } + + String kmsClientImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_CLIENT_IMPL); + + if (kmsClientImpl == null) { + throw new IllegalStateException("Custom KMS client class is not defined"); + } + + KeyManagementClient kmsClient; + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsClientImpl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize KeyManagementClient, missing no-arg constructor for class %s", + kmsClientImpl), + e); + } + + try { + kmsClient = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize kms client, %s does not implement KeyManagementClient interface", + kmsClientImpl), + e); + } + + kmsClient.initialize(catalogProperties); + + return kmsClient; + } + + public static EncryptionManager createEncryptionManager( + Map tableProperties, KeyManagementClient kmsClient) { + String tableKeyId = tableProperties.get(ENCRYPTION_TABLE_KEY); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.instance(); + } + + if (kmsClient == null) { + throw new IllegalStateException("Encrypted table. No KMS client is configured in catalog"); + } + + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); + } + + int dataKeyLength = + PropertyUtil.propertyAsInt( + tableProperties, ENCRYPTION_DEK_LENGTH, ENCRYPTION_DEK_LENGTH_DEFAULT); + + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + } + + public static boolean useNativeEncryption(EncryptionKeyMetadata keyMetadata) { + return keyMetadata != null && keyMetadata instanceof KeyMetadata; + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java index 0d7ec43f6ebc..02873eb764d6 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java @@ -76,6 +76,20 @@ ByteBuffer aadPrefix() { return aadPrefix; } + static KeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { + if (keyMetadata instanceof KeyMetadata) { + return (KeyMetadata) keyMetadata; + } + + ByteBuffer kmBuffer = keyMetadata.buffer(); + + if (kmBuffer == null) { + throw new IllegalStateException("Null key metadata buffer"); + } + + return parse(kmBuffer); + } + static KeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); diff --git a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java index 4d8d8aa7aff9..336ebe9c0af7 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/PlaintextEncryptionManager.java @@ -18,26 +18,33 @@ */ package org.apache.iceberg.encryption; -import java.nio.ByteBuffer; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PlaintextEncryptionManager implements EncryptionManager { + private static final EncryptionManager INSTANCE = new PlaintextEncryptionManager(); private static final Logger LOG = LoggerFactory.getLogger(PlaintextEncryptionManager.class); + /** @deprecated will be removed in 1.6.0. use {@link #instance()} instead. */ + @Deprecated + public PlaintextEncryptionManager() {} + + public static EncryptionManager instance() { + return INSTANCE; + } + @Override public InputFile decrypt(EncryptedInputFile encrypted) { if (encrypted.keyMetadata().buffer() != null) { - LOG.warn( - "File encryption key metadata is present, but currently using PlaintextEncryptionManager."); + LOG.warn("File encryption key metadata is present, but no encryption has been configured."); } return encrypted.encryptedInputFile(); } @Override public EncryptedOutputFile encrypt(OutputFile rawOutput) { - return EncryptedFiles.encryptedOutput(rawOutput, (ByteBuffer) null); + return EncryptedFiles.encryptedOutput(rawOutput, EncryptionKeyMetadata.empty()); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java new file mode 100644 index 000000000000..7f95efd9ea60 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.ByteBuffers; + +public class StandardEncryptionManager implements EncryptionManager { + private final transient KeyManagementClient kmsClient; + private final String tableKeyId; + private final int dataKeyLength; + + private transient volatile SecureRandom lazyRNG = null; + + class StandardEncryptedOutputFile implements EncryptedOutputFile { + + private final OutputFile encryptingOutputFile; + + private final EncryptionKeyMetadata keyMetadata; + private final OutputFile rawOutputFile; + + StandardEncryptedOutputFile( + OutputFile encryptingOutputFile, + EncryptionKeyMetadata keyMetadata, + OutputFile rawOutputFile) { + this.encryptingOutputFile = encryptingOutputFile; + this.keyMetadata = keyMetadata; + this.rawOutputFile = rawOutputFile; + } + + @Override + public OutputFile encryptingOutputFile() { + return encryptingOutputFile; + } + + @Override + public EncryptionKeyMetadata keyMetadata() { + return keyMetadata; + } + + @Override + public OutputFile rawOutputFile() { + return rawOutputFile; + } + } + + /** + * @param tableKeyId table encryption key id + * @param dataKeyLength length of data encryption key (16/24/32 bytes) + * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption + */ + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); + Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); + this.tableKeyId = tableKeyId; + this.kmsClient = kmsClient; + this.dataKeyLength = dataKeyLength; + } + + @Override + public EncryptedOutputFile encrypt(OutputFile rawOutput) { + ByteBuffer fileDek = ByteBuffer.allocate(dataKeyLength); + workerRNG().nextBytes(fileDek.array()); + + ByteBuffer aadPrefix = ByteBuffer.allocate(TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); + workerRNG().nextBytes(aadPrefix.array()); + + KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); + + return new StandardEncryptedOutputFile( + new AesGcmOutputFile(rawOutput, fileDek.array(), aadPrefix.array()), + encryptionMetadata, + rawOutput); + } + + @Override + public InputFile decrypt(EncryptedInputFile encrypted) { + KeyMetadata keyMetadata = KeyMetadata.castOrParse(encrypted.keyMetadata()); + + byte[] fileDek = ByteBuffers.toByteArray(keyMetadata.encryptionKey()); + byte[] aadPrefix = ByteBuffers.toByteArray(keyMetadata.aadPrefix()); + + return new AesGcmInputFile(encrypted.encryptedInputFile(), fileDek, aadPrefix); + } + + @Override + public Iterable decrypt(Iterable encrypted) { + // Bulk decrypt is only applied to data files. Returning source input files for parquet. + return Iterables.transform(encrypted, this::getSourceFile); + } + + private InputFile getSourceFile(EncryptedInputFile encryptedFile) { + return encryptedFile.encryptedInputFile(); + } + + private SecureRandom workerRNG() { + if (this.lazyRNG == null) { + this.lazyRNG = new SecureRandom(); + } + + return lazyRNG; + } + + public ByteBuffer wrapKey(ByteBuffer secretKey) { + if (kmsClient == null) { + throw new IllegalStateException("Null KmsClient. WrapKey can't be called from workers"); + } + + return kmsClient.wrapKey(secretKey, tableKeyId); + } + + public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { + if (kmsClient == null) { + throw new IllegalStateException("Null KmsClient. UnwrapKey can't be called from workers"); + } + + return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + } +}