From 2bbc565f0c55722d5abd74d33be792def636697b Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Mon, 5 Jun 2023 09:00:53 +0300 Subject: [PATCH 1/4] draft commit update snapshot producer update the patch clean up fix for previous patch address review comments move key wrapping to metadata encryption encrypt manifest list key metadata new aad util null key needs no encryption comment; clearer method/var names use key encryption key for manifest list keys add encryption util changes update EncryptionTestHelpers handle api change remove unused lines revert revapi.yml KEK cache unitest update rename var address review comments fix timeout default change writer kek timeout default Updates from review. cache unwrapped keys compile and ci fixes spotless apply re-apply unwrapped cache patch Co-authored-by: Ryan Blue caching limit address review comments spotless apply sync with spec changes add missing method rm empty line pass encrypted output a better unitest rm type, rename class --- .palantir/revapi.yml | 5 + .../org/apache/iceberg/ManifestListFile.java | 40 +++++ .../java/org/apache/iceberg/Snapshot.java | 10 ++ .../iceberg/encryption/EncryptingFileIO.java | 13 +- .../java/org/apache/iceberg/io/FileIO.java | 10 ++ .../apache/iceberg/BaseManifestListFile.java | 70 +++++++++ .../java/org/apache/iceberg/BaseSnapshot.java | 4 +- .../apache/iceberg/ManifestListWriter.java | 54 ++++++- .../org/apache/iceberg/ManifestLists.java | 35 ++++- .../org/apache/iceberg/ManifestWriter.java | 18 ++- .../org/apache/iceberg/SnapshotParser.java | 28 +++- .../org/apache/iceberg/SnapshotProducer.java | 36 ++++- .../iceberg/encryption/AesGcmInputFile.java | 24 ++- .../iceberg/encryption/EncryptionUtil.java | 46 ++++++ .../NativeEncryptionKeyMetadata.java | 17 +++ .../encryption/SnapshotEncryptionKey.java | 57 +++++++ .../encryption/StandardEncryptionManager.java | 109 ++++++++++++-- .../encryption/StandardKeyMetadata.java | 43 +++++- .../iceberg/TestManifestListEncryption.java | 142 ++++++++++++++++++ .../encryption/EncryptionTestHelpers.java | 1 - .../hadoop/TestCatalogUtilDropTable.java | 4 + 21 files changed, 726 insertions(+), 40 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/ManifestListFile.java create mode 100644 core/src/main/java/org/apache/iceberg/BaseManifestListFile.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java create mode 100644 core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 8929c86d55ae..1a2c295eb197 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1063,6 +1063,11 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" justification: "Deprecations for 1.6.0 release" "1.6.0": + org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.encryption.EncryptingFileIO" + new: "class org.apache.iceberg.encryption.EncryptingFileIO" + justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-common: - code: "java.method.removed" old: "method org.apache.iceberg.common.DynFields.StaticField org.apache.iceberg.common.DynFields.Builder::buildStaticChecked()\ diff --git a/api/src/main/java/org/apache/iceberg/ManifestListFile.java b/api/src/main/java/org/apache/iceberg/ManifestListFile.java new file mode 100644 index 000000000000..03519fbeb8d6 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestListFile.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; + +public interface ManifestListFile { + + /** Location of manifest list file. */ + String location(); + + /** Snapshot ID of the manifest list. */ + long snapshotId(); + + /** The manifest list key metadata is encrypted. Returns the ID of the encryption key */ + String metadataEncryptionKeyID(); + + /** Returns the encrypted manifest list key metadata */ + ByteBuffer encryptedKeyMetadata(); + + /** Decrypt and return the encrypted key metadata */ + ByteBuffer decryptKeyMetadata(EncryptionManager em); +} diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 097806639b24..cd47d6901249 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -162,6 +162,16 @@ default Iterable removedDeleteFiles(FileIO io) { */ String manifestListLocation(); + /** + * This snapshot's manifest list file info: size, encryption key metadata and location + * + * @return manifest list file info + */ + default ManifestListFile manifestListFile() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement manifestListFile method"); + } + /** * Return the id of the schema used when this snapshot was created, or null if this information is * not available. diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index d3de7b1f84a3..018034ea9399 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -108,13 +109,21 @@ public InputFile newInputFile(ManifestFile manifest) { } } + @Override + public InputFile newInputFile(ManifestListFile manifestList) { + if (manifestList.encryptedKeyMetadata() != null) { + ByteBuffer keyMetadata = manifestList.decryptKeyMetadata(em); + return newDecryptingInputFile(manifestList.location(), keyMetadata); + } else { + return newInputFile(manifestList.location()); + } + } + public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) { return em.decrypt(wrap(io.newInputFile(path), buffer)); } public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) { - // TODO: is the length correct for the encrypted file? It may be the length of the plaintext - // stream return em.decrypt(wrap(io.newInputFile(path, length), buffer)); } diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index f5404b9e5a78..cc6b689f50e7 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -70,6 +71,15 @@ default InputFile newInputFile(ManifestFile manifest) { return newInputFile(manifest.path(), manifest.length()); } + default InputFile newInputFile(ManifestListFile manifestList) { + Preconditions.checkArgument( + manifestList.encryptedKeyMetadata() == null, + "Cannot decrypt manifest list: %s (use EncryptingFileIO)", + manifestList.location()); + // cannot pass length because it is not tracked outside of key metadata + return newInputFile(manifestList.location()); + } + /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java new file mode 100644 index 000000000000..8b1a0dcc81eb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.util.ByteBuffers; + +class BaseManifestListFile implements ManifestListFile, Serializable { + private final String location; + private final long snapshotId; + private final String metadataEncryptionKeyID; + // stored as a byte array to be Serializable + private final byte[] encryptedKeyMetadata; + + BaseManifestListFile( + String location, long snapshotId, String encryptionKeyId, ByteBuffer encryptedKeyMetadata) { + this.location = location; + this.snapshotId = snapshotId; + this.encryptedKeyMetadata = ByteBuffers.toByteArray(encryptedKeyMetadata); + this.metadataEncryptionKeyID = encryptionKeyId; + } + + @Override + public String location() { + return location; + } + + @Override + public long snapshotId() { + return snapshotId; + } + + @Override + public String metadataEncryptionKeyID() { + return metadataEncryptionKeyID; + } + + @Override + public ByteBuffer encryptedKeyMetadata() { + if (encryptedKeyMetadata == null) { + return null; + } + + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + @Override + public ByteBuffer decryptKeyMetadata(EncryptionManager em) { + return EncryptionUtil.decryptSnapshotKeyMetadata(this, em); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index b97b15d65221..28a45d2c7821 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -182,7 +182,9 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + this.allManifests = + ManifestLists.read( + fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId))); } if (dataManifests == null || deleteManifests == null) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index c2cb1bf8c85d..a7644d3bef2a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -29,9 +33,25 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; + private final StandardEncryptionManager standardEncryptionManager; + private final NativeEncryptionKeyMetadata manifestListKeyMetadata; + private final OutputFile outputFile; + + private ManifestListWriter( + OutputFile file, EncryptionManager encryptionManager, Map meta) { + if (encryptionManager instanceof StandardEncryptionManager) { + // ability to encrypt the manifest list key is introduced for standard encryption. + this.standardEncryptionManager = (StandardEncryptionManager) encryptionManager; + NativeEncryptionOutputFile encryptedFile = this.standardEncryptionManager.encrypt(file); + this.outputFile = encryptedFile.encryptingOutputFile(); + this.manifestListKeyMetadata = encryptedFile.keyMetadata(); + } else { + this.standardEncryptionManager = null; + this.outputFile = file; + this.manifestListKeyMetadata = null; + } - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + this.writer = newAppender(outputFile, meta); } protected abstract ManifestFile prepare(ManifestFile manifest); @@ -73,18 +93,31 @@ public Long nextRowId() { return null; } + public ManifestListFile toManifestListFile() { + if (manifestListKeyMetadata != null && manifestListKeyMetadata.encryptionKey() != null) { + manifestListKeyMetadata.copyWithLength(writer.length()); + String manifestListKeyID = + standardEncryptionManager.addManifestListKeyMetadata(manifestListKeyMetadata); + return new BaseManifestListFile(outputFile.location(), manifestListKeyID); + } else { + return new BaseManifestListFile(outputFile.location(), null); + } + } + static class V3Writer extends ManifestListWriter { private final V3Metadata.ManifestFileWrapper wrapper; private Long nextRowId; V3Writer( OutputFile snapshotFile, + EncryptionManager encryptionManager, long snapshotId, Long parentSnapshotId, long sequenceNumber, long firstRowId) { super( snapshotFile, + encryptionManager, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), @@ -134,15 +167,22 @@ public Long nextRowId() { static class V2Writer extends ManifestListWriter { private final V2Metadata.ManifestFileWrapper wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber) { super( snapshotFile, + encryptionManager, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), "format-version", "2")); this.wrapper = new V2Metadata.ManifestFileWrapper(snapshotId, sequenceNumber); + // todo encryption only in v3? throw exception if e.manager is not plaintext? } @Override @@ -170,13 +210,19 @@ protected FileAppender newAppender(OutputFile file, Map read(InputFile manifestList) { } } + // or should we modify all related tests (to pass PlaintextEncryptionManager)? + @VisibleForTesting static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, @@ -54,19 +59,43 @@ static ManifestListWriter write( Long parentSnapshotId, long sequenceNumber, Long firstRowId) { + return write( + formatVersion, + manifestListFile, + PlaintextEncryptionManager.instance(), + snapshotId, + parentSnapshotId, + sequenceNumber, + firstRowId); + } + + static ManifestListWriter write( + int formatVersion, + OutputFile manifestListFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + Long firstRowId) { switch (formatVersion) { case 1: Preconditions.checkArgument( sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + return new ManifestListWriter.V1Writer( + manifestListFile, encryptionManager, snapshotId, parentSnapshotId); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, encryptionManager, snapshotId, parentSnapshotId, sequenceNumber); case 3: return new ManifestListWriter.V3Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, firstRowId); + manifestListFile, + encryptionManager, + snapshotId, + parentSnapshotId, + sequenceNumber, + firstRowId); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 95064759ebe9..06293964ca39 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -38,7 +40,7 @@ public abstract class ManifestWriter> implements FileAp static final long UNASSIGNED_SEQ = -1L; private final OutputFile file; - private final ByteBuffer keyMetadataBuffer; + private final EncryptionKeyMetadata keyMetadata; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -65,7 +67,7 @@ private ManifestWriter( new GenericManifestEntry<>(V1Metadata.entrySchema(spec.partitionType()).asStruct()); this.stats = new PartitionSummary(spec); this.firstRowId = firstRowId; - this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); + this.keyMetadata = file.keyMetadata(); } protected abstract ManifestEntry prepare(ManifestEntry entry); @@ -192,6 +194,18 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); + + // if key metadata can store the length, add it + ByteBuffer keyMetadataBuffer; + if (keyMetadata instanceof NativeEncryptionKeyMetadata) { + keyMetadataBuffer = + ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer(); + } else if (keyMetadata != null) { + keyMetadataBuffer = keyMetadata.buffer(); + } else { + keyMetadataBuffer = null; + } + // if the minSequenceNumber is null, then no manifests with a sequence number have been written, // so the min data sequence number is the one that will be assigned when this is committed. // pass UNASSIGNED_SEQ to inherit it. diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index 53cec16dcd87..df44a27ef6fe 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; +import org.apache.iceberg.encryption.SnapshotEncryptionKey; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -82,10 +84,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeEndObject(); } - String manifestList = snapshot.manifestListLocation(); - if (manifestList != null) { + ManifestListFile manifestList = snapshot.manifestListFile(); + if (manifestList.location() != null) { // write just the location. manifests should not be embedded in JSON along with a list - generator.writeStringField(MANIFEST_LIST, manifestList); + generator.writeStringField(MANIFEST_LIST, manifestList.location()); } else { // embed the manifest list in the JSON, v1 only JsonUtil.writeStringArray( @@ -122,6 +124,10 @@ public static String toJson(Snapshot snapshot, boolean pretty) { } static Snapshot fromJson(JsonNode node) { + return fromJson(node, null); + } + + static Snapshot fromJson(JsonNode node, Map encryptionKeys) { Preconditions.checkArgument( node.isObject(), "Cannot parse table version from a non-object: %s", node); @@ -179,6 +185,20 @@ static Snapshot fromJson(JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); + + // If manifest list is encrypted, its key metadata are taken from encryption keys table + String encryptionKeyId = null; + ByteBuffer encryptedKeyMetadata = null; + if (encryptionKeys != null) { + String snapshotKeyID = Long.toString(snapshotId); + SnapshotEncryptionKey snapshotKey = encryptionKeys.get(snapshotKeyID); + encryptionKeyId = snapshotKey.encryptionKeyID(); + encryptedKeyMetadata = snapshotKey.keyPayloadBytes(); + } + + ManifestListFile manifestListFile = + new BaseManifestListFile(manifestList, snapshotId, encryptionKeyId, encryptedKeyMetadata); + return new BaseSnapshot( sequenceNumber, snapshotId, @@ -187,7 +207,7 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList, + manifestListFile, firstRowId, addedRows, keyId); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 118ae0b328a5..6cb7d0b00d83 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -266,6 +266,7 @@ public Snapshot apply() { ManifestLists.write( ops.current().formatVersion(), manifestList, + ops.encryption(), snapshotId(), parentSnapshotId, sequenceNumber, @@ -323,7 +324,7 @@ public Snapshot apply() { manifestList.location(), nextRowId, assignedRows, - null); + writer.toManifestListFile().encryptionKeyID()); } protected abstract Map summary(); @@ -799,4 +800,37 @@ private static void updateTotal( } } } + + /** + * A wrapper to set the dataSequenceNumber of a DeleteFile. + * + * @deprecated will be removed in 1.10.0; use {@link Delegates#pendingDeleteFile(DeleteFile, + * Long)} instead. + */ + @Deprecated + protected static class PendingDeleteFile extends Delegates.PendingDeleteFile { + /** + * Wrap a delete file for commit with a given data sequence number. + * + * @param deleteFile delete file + * @param dataSequenceNumber data sequence number to apply + */ + PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) { + super(deleteFile, dataSequenceNumber); + } + + /** + * Wrap a delete file for commit with the latest sequence number. + * + * @param deleteFile delete file + */ + PendingDeleteFile(DeleteFile deleteFile) { + super(deleteFile, null); + } + + @Override + PendingDeleteFile wrap(DeleteFile file) { + return new PendingDeleteFile(file, dataSequenceNumber()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java index a43643fcc779..b03944859b6e 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java @@ -26,20 +26,34 @@ public class AesGcmInputFile implements InputFile { private final InputFile sourceFile; private final byte[] dataKey; private final byte[] fileAADPrefix; - private long plaintextLength; + private Long encryptedLength; + private Long plaintextLength; public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix) { + this(sourceFile, dataKey, fileAADPrefix, null); + } + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix, Long length) { this.sourceFile = sourceFile; this.dataKey = dataKey; this.fileAADPrefix = fileAADPrefix; - this.plaintextLength = -1; + this.encryptedLength = length; + this.plaintextLength = null; + } + + private long encryptedLength() { + if (encryptedLength == null) { + this.encryptedLength = sourceFile.getLength(); + } + + return encryptedLength; } @Override public long getLength() { - if (plaintextLength == -1) { + if (plaintextLength == null) { // Presumes all streams use hard-coded plaintext block size. - plaintextLength = AesGcmInputStream.calculatePlaintextLength(sourceFile.getLength()); + plaintextLength = AesGcmInputStream.calculatePlaintextLength(encryptedLength()); } return plaintextLength; @@ -47,7 +61,7 @@ public long getLength() { @Override public SeekableInputStream newStream() { - long ciphertextLength = sourceFile.getLength(); + long ciphertextLength = encryptedLength(); Preconditions.checkState( ciphertextLength >= Ciphers.MIN_STREAM_LENGTH, "Invalid encrypted stream: %d is shorter than the minimum possible stream length", diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 922cac455d3d..976d0d626790 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.encryption; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; public class EncryptionUtil { @@ -110,4 +114,46 @@ static EncryptionManager createEncryptionManager( public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); } + + /** + * Decrypt the key metadata for a manifest list. + * + * @param manifestList a ManifestListFile + * @param em the table's EncryptionManager + * @return a decrypted key metadata buffer + */ + public static ByteBuffer decryptManifestListKeyMetadata( + ManifestListFile manifestList, EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Snapshot key metadata encryption requires a StandardEncryptionManager"); + StandardEncryptionManager sem = (StandardEncryptionManager) em; + String manifestListKeyId = manifestList.encryptionKeyID(); + ByteBuffer keyEncryptionKey = sem.encryptedById(manifestListKeyId); + ByteBuffer encryptedKeyMetadata = sem.encryptedKeyMetadata(manifestListKeyId); + + Ciphers.AesGcmDecryptor decryptor = + new Ciphers.AesGcmDecryptor(ByteBuffers.toByteArray(keyEncryptionKey)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(encryptedKeyMetadata); + byte[] decryptedKeyMetadata = + decryptor.decrypt(keyMetadataBytes, manifestListKeyId.getBytes(StandardCharsets.UTF_8)); + return ByteBuffer.wrap(decryptedKeyMetadata); + } + + /** + * Encrypts the key metadata for a snapshot. + * + * @param key key encryption key bytes + * @param keyId ID of the manifest list key + * @param keyMetadata manifest list EncryptionKeyMetadata buffer + * @return encrypted key metadata + */ + static ByteBuffer encryptManifestListKeyMetadata( + ByteBuffer key, String keyId, EncryptionKeyMetadata keyMetadata) { + Ciphers.AesGcmEncryptor encryptor = new Ciphers.AesGcmEncryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(keyMetadata.buffer()); + byte[] encryptedKeyMetadata = + encryptor.encrypt(keyMetadataBytes, keyId.getBytes(StandardCharsets.UTF_8)); + return ByteBuffer.wrap(encryptedKeyMetadata); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java index c2ed9d564d1e..2188378a4e87 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -27,4 +27,21 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { /** Additional authentication data as a {@link ByteBuffer} */ ByteBuffer aadPrefix(); + + /** Encrypted file length */ + default Long fileLength() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement fileLength"); + } + + /** + * Copy this key metadata and set the file length. + * + * @param length length of the encrypted file in bytes + * @return a copy of this key metadata (key and AAD) with the file length + */ + default NativeEncryptionKeyMetadata copyWithLength(long length) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement copyWithLength"); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java new file mode 100644 index 000000000000..e57647d569cc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Base64; + +/** + * Encryption keys and metadata required for decrypting the manifest list files in snapshots of + * encrypted tables. + */ +public class SnapshotEncryptionKey implements Serializable { + private final String id; + private final String keyPayload; + private final ByteBuffer keyPayloadBytes; + private final String encryptionKeyID; + + public SnapshotEncryptionKey(String id, String keyPayload, String encryptionKeyID) { + this.id = id; + this.keyPayload = keyPayload; + this.keyPayloadBytes = ByteBuffer.wrap(Base64.getDecoder().decode(keyPayload)); + this.encryptionKeyID = encryptionKeyID; + } + + public String id() { + return id; + } + + public String keyPayload() { + return keyPayload; + } + + public ByteBuffer keyPayloadBytes() { + return keyPayloadBytes; + } + + public String encryptionKeyID() { + return encryptionKeyID; + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 119d2a5f9ae2..75229f4e331e 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -18,21 +18,47 @@ */ package org.apache.iceberg.encryption; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import java.nio.ByteBuffer; import java.security.SecureRandom; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { - private final transient KeyManagementClient kmsClient; private final String tableKeyId; private final int dataKeyLength; + // a holder class of metadata that is not available after serialization + private class TransientEncryptionState { + private final KeyManagementClient kmsClient; + private final Map encryptionKeys; + private final LoadingCache unwrappedKeyCache; + private String currentKeyID; + + private TransientEncryptionState(KeyManagementClient kmsClient) { + this.kmsClient = kmsClient; + this.encryptionKeys = Maps.newLinkedHashMap(); + this.unwrappedKeyCache = + Caffeine.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .build( + keyId -> + kmsClient.unwrapKey(encryptionKeys.get(keyId).keyPayloadBytes(), tableKeyId)); + } + } + + private final transient TransientEncryptionState transientState; + private transient volatile SecureRandom lazyRNG = null; /** @@ -49,7 +75,7 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.kmsClient = kmsClient; + this.transientState = new TransientEncryptionState(kmsClient); this.dataKeyLength = dataKeyLength; } @@ -81,22 +107,86 @@ private SecureRandom workerRNG() { return lazyRNG; } + /** + * @deprecated will be removed in 1.9.0. + */ + @Deprecated public ByteBuffer wrapKey(ByteBuffer secretKey) { - if (kmsClient == null) { + if (transientState == null) { throw new IllegalStateException( "Cannot wrap key after called after serialization (missing KMS client)"); } - return kmsClient.wrapKey(secretKey, tableKeyId); + return transientState.kmsClient.wrapKey(secretKey, tableKeyId); } + /** + * @deprecated will be removed in 1.9.0; use {@link #unwrapKey(String)}} instead. + */ + @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (kmsClient == null) { - throw new IllegalStateException( - "Cannot wrap key after called after serialization (missing KMS client)"); + throw new UnsupportedOperationException("Use unwrapKey(String) instead"); + } + + public ByteBuffer unwrapKey(String keyId) { + if (transientState == null) { + throw new IllegalStateException("Cannot unwrap key after serialization"); + } + + return transientState.unwrappedKeyCache.get(keyId); + } + + public String currentKeyID() { + if (transientState == null) { + throw new IllegalStateException("Cannot return the current key after serialization"); + } + + if (transientState.currentKeyID == null) { + createNewEncryptionKey(); + } + + return transientState.currentKeyID; + } + + public void addSnapshotKeyMetadata(SnapshotEncryptionKey key) { + if (transientState == null) { + throw new IllegalStateException("Cannot add key metadata after serialization"); } - return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + if (transientState.encryptionKeys.containsKey(key.id())) { + throw new IllegalStateException("key metadata already exists for snapshot " + key.id()); + } + + transientState.encryptionKeys.put(key.id(), key); + } + + private ByteBuffer newKey() { + byte[] newKey = new byte[dataKeyLength]; + workerRNG().nextBytes(newKey); + return ByteBuffer.wrap(newKey); + } + + private String newKeyId() { + byte[] idBytes = new byte[8]; + workerRNG().nextBytes(idBytes); + return "k" + Base64.getEncoder().encodeToString(idBytes); + } + + private void createNewEncryptionKey() { + if (transientState == null) { + throw new IllegalStateException("Cannot create encryption keys after serialization"); + } + + ByteBuffer unwrapped = newKey(); + ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); + SnapshotEncryptionKey key = + new SnapshotEncryptionKey( + newKeyId(), Base64.getEncoder().encodeToString(ByteBuffers.toByteArray(wrapped)), ""); + + // update internal tracking + transientState.unwrappedKeyCache.put(key.id(), unwrapped); + transientState.encryptionKeys.put(key.id(), key); + transientState.currentKeyID = key.id(); } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { @@ -173,7 +263,8 @@ private AesGcmInputFile decrypted() { new AesGcmInputFile( encryptedInputFile.encryptedInputFile(), ByteBuffers.toByteArray(keyMetadata().encryptionKey()), - ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + ByteBuffers.toByteArray(keyMetadata().aadPrefix()), + keyMetadata().fileLength()); } return lazyDecryptedInputFile; diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 98f87c65d95f..6ddea184d8c4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -36,7 +36,8 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); + optional(1, "aad_prefix", Types.BinaryType.get()), + optional(2, "file_length", Types.LongType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = AvroSchemaUtil.convert(SCHEMA_V1, StandardKeyMetadata.class.getCanonicalName()); @@ -49,20 +50,31 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private ByteBuffer encryptionKey; private ByteBuffer aadPrefix; - private org.apache.avro.Schema avroSchema; + private Long fileLength; /** Used by Avro reflection to instantiate this class * */ StandardKeyMetadata() {} StandardKeyMetadata(byte[] key, byte[] aad) { + this(key, aad, null); + } + + StandardKeyMetadata(byte[] key, byte[] aad, Long fileLength) { this.encryptionKey = ByteBuffer.wrap(key); this.aadPrefix = ByteBuffer.wrap(aad); + this.fileLength = fileLength; } - private StandardKeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { - this.encryptionKey = encryptionKey; - this.aadPrefix = aadPrefix; - this.avroSchema = AVRO_SCHEMA_V1; + /** + * Copy constructor. + * + * @param toCopy a StandardKeymetadata to copy + * @param fileLength file length that overrides toCopy if not null + */ + private StandardKeyMetadata(StandardKeyMetadata toCopy, Long fileLength) { + this.encryptionKey = toCopy.encryptionKey; + this.aadPrefix = toCopy.aadPrefix; + this.fileLength = fileLength != null ? fileLength : toCopy.fileLength; } static Map supportedSchemaVersions() { @@ -83,6 +95,11 @@ public ByteBuffer aadPrefix() { return aadPrefix; } + @Override + public Long fileLength() { + return fileLength; + } + static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { if (keyMetadata instanceof StandardKeyMetadata) { return (StandardKeyMetadata) keyMetadata; @@ -116,7 +133,12 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - return new StandardKeyMetadata(encryptionKey(), aadPrefix()); + return new StandardKeyMetadata(this, null); + } + + @Override + public NativeEncryptionKeyMetadata copyWithLength(long length) { + return new StandardKeyMetadata(this, length); } @Override @@ -128,6 +150,9 @@ public void put(int i, Object v) { case 1: this.aadPrefix = (ByteBuffer) v; return; + case 2: + this.fileLength = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -140,6 +165,8 @@ public Object get(int i) { return encryptionKey; case 1: return aadPrefix; + case 2: + return fileLength; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -147,6 +174,6 @@ public Object get(int i) { @Override public org.apache.avro.Schema getSchema() { - return avroSchema; + return AVRO_SCHEMA_V1; } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java new file mode 100644 index 000000000000..22e227ebb9ce --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestManifestListEncryption { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + + private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 10); + private static final ByteBuffer FIRST_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 100); + private static final ByteBuffer SECOND_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 20); + private static final ByteBuffer SECOND_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 200); + + private static final List PARTITION_SUMMARIES = + Lists.newArrayList( + new GenericPartitionFieldSummary( + false, FIRST_SUMMARY_LOWER_BOUND, FIRST_SUMMARY_UPPER_BOUND), + new GenericPartitionFieldSummary( + true, false, SECOND_SUMMARY_LOWER_BOUND, SECOND_SUMMARY_UPPER_BOUND)); + private static final ByteBuffer MANIFEST_KEY_METADATA = ByteBuffer.allocate(100); + + private static final ManifestFile TEST_MANIFEST = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA); + + private static final EncryptionManager ENCRYPTION_MANAGER = + EncryptionTestHelpers.createEncryptionManager(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadEncryptedManifestList(); + + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat((long) manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat((int) manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat((long) manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat((int) manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat((long) manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat((int) manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat((long) manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + } + + private ManifestFile writeAndReadEncryptedManifestList() throws IOException { + FileIO io = new InMemoryFileIO(); + EncryptingFileIO encryptingFileIO = EncryptingFileIO.combine(io, ENCRYPTION_MANAGER); + OutputFile outputFile = io.newOutputFile("memory:" + UUID.randomUUID()); + + ManifestListWriter writer = + ManifestLists.write( + 2, + encryptingFileIO.encryptionManager(), + outputFile, + SNAPSHOT_ID, + SNAPSHOT_ID - 1, + SEQ_NUM); + writer.add(TEST_MANIFEST); + writer.close(); + ManifestListFile manifestListFile = writer.toManifestListFile(); + + // First try to read without decryption + assertThatThrownBy(() -> ManifestLists.read(outputFile.toInputFile())) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + List manifests = + ManifestLists.read(encryptingFileIO.newInputFile(manifestListFile)); + assertThat(manifests.size()).isEqualTo(1); + + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index 6d4be7671157..901f8080ff1e 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -34,7 +34,6 @@ public static EncryptionManager createEncryptionManager() { CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); Map tableProperties = Maps.newHashMap(); tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); return EncryptionUtil.createEncryptionManager( List.of(), tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index a107a72ce63c..bd10f476f9d8 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -37,6 +37,7 @@ import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; @@ -197,6 +198,9 @@ private static FileIO createMockFileIO(FileIO wrapped) { .thenAnswer( invocation -> wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + Mockito.when(mockIO.newInputFile(Mockito.any(ManifestListFile.class))) + .thenAnswer( + invocation -> wrapped.newInputFile((ManifestListFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) .thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class))) From 95e97d5023b49381bbae9739c4db7f64f14046b8 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Fri, 9 May 2025 11:23:13 +0300 Subject: [PATCH 2/4] post rebase changes --- .palantir/revapi.yml | 9 +- .../org/apache/iceberg/ManifestListFile.java | 12 +-- .../java/org/apache/iceberg/Snapshot.java | 10 -- .../iceberg/encryption/EncryptingFileIO.java | 2 +- .../java/org/apache/iceberg/io/FileIO.java | 2 +- .../apache/iceberg/BaseManifestListFile.java | 33 ++----- .../apache/iceberg/RewriteTablePathUtil.java | 9 ++ .../org/apache/iceberg/SnapshotParser.java | 28 +----- .../encryption/SnapshotEncryptionKey.java | 57 ----------- .../encryption/StandardEncryptionManager.java | 94 +++++++++++-------- .../iceberg/TestManifestListEncryption.java | 8 +- .../org/apache/iceberg/TestSnapshotJson.java | 1 - 12 files changed, 88 insertions(+), 177 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1a2c295eb197..acd1866ba59b 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1063,11 +1063,6 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" justification: "Deprecations for 1.6.0 release" "1.6.0": - org.apache.iceberg:iceberg-api: - - code: "java.class.defaultSerializationChanged" - old: "class org.apache.iceberg.encryption.EncryptingFileIO" - new: "class org.apache.iceberg.encryption.EncryptingFileIO" - justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-common: - code: "java.method.removed" old: "method org.apache.iceberg.common.DynFields.StaticField org.apache.iceberg.common.DynFields.Builder::buildStaticChecked()\ @@ -1182,6 +1177,10 @@ acceptedBreaks: old: "class org.apache.iceberg.Metrics" new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.encryption.EncryptingFileIO" + new: "class org.apache.iceberg.encryption.EncryptingFileIO" + justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-core: - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ diff --git a/api/src/main/java/org/apache/iceberg/ManifestListFile.java b/api/src/main/java/org/apache/iceberg/ManifestListFile.java index 03519fbeb8d6..e727a35a4e09 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestListFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestListFile.java @@ -26,15 +26,9 @@ public interface ManifestListFile { /** Location of manifest list file. */ String location(); - /** Snapshot ID of the manifest list. */ - long snapshotId(); + /** The manifest list key metadata can be encrypted. Returns ID of encryption key */ + String encryptionKeyID(); - /** The manifest list key metadata is encrypted. Returns the ID of the encryption key */ - String metadataEncryptionKeyID(); - - /** Returns the encrypted manifest list key metadata */ - ByteBuffer encryptedKeyMetadata(); - - /** Decrypt and return the encrypted key metadata */ + /** Decrypt and return the manifest list key metadata */ ByteBuffer decryptKeyMetadata(EncryptionManager em); } diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index cd47d6901249..097806639b24 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -162,16 +162,6 @@ default Iterable removedDeleteFiles(FileIO io) { */ String manifestListLocation(); - /** - * This snapshot's manifest list file info: size, encryption key metadata and location - * - * @return manifest list file info - */ - default ManifestListFile manifestListFile() { - throw new UnsupportedOperationException( - this.getClass().getName() + " doesn't implement manifestListFile method"); - } - /** * Return the id of the schema used when this snapshot was created, or null if this information is * not available. diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index 018034ea9399..a4c708570dba 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -111,7 +111,7 @@ public InputFile newInputFile(ManifestFile manifest) { @Override public InputFile newInputFile(ManifestListFile manifestList) { - if (manifestList.encryptedKeyMetadata() != null) { + if (manifestList.encryptionKeyID() != null) { ByteBuffer keyMetadata = manifestList.decryptKeyMetadata(em); return newDecryptingInputFile(manifestList.location(), keyMetadata); } else { diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index cc6b689f50e7..78b61f60be6b 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -73,7 +73,7 @@ default InputFile newInputFile(ManifestFile manifest) { default InputFile newInputFile(ManifestListFile manifestList) { Preconditions.checkArgument( - manifestList.encryptedKeyMetadata() == null, + manifestList.encryptionKeyID() == null, "Cannot decrypt manifest list: %s (use EncryptingFileIO)", manifestList.location()); // cannot pass length because it is not tracked outside of key metadata diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java index 8b1a0dcc81eb..e0ecfd50c863 100644 --- a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -22,21 +22,14 @@ import java.nio.ByteBuffer; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.encryption.EncryptionUtil; -import org.apache.iceberg.util.ByteBuffers; class BaseManifestListFile implements ManifestListFile, Serializable { private final String location; - private final long snapshotId; - private final String metadataEncryptionKeyID; - // stored as a byte array to be Serializable - private final byte[] encryptedKeyMetadata; + private final String encryptionKeyID; - BaseManifestListFile( - String location, long snapshotId, String encryptionKeyId, ByteBuffer encryptedKeyMetadata) { + BaseManifestListFile(String location, String encryptionKeyID) { this.location = location; - this.snapshotId = snapshotId; - this.encryptedKeyMetadata = ByteBuffers.toByteArray(encryptedKeyMetadata); - this.metadataEncryptionKeyID = encryptionKeyId; + this.encryptionKeyID = encryptionKeyID; } @Override @@ -45,26 +38,12 @@ public String location() { } @Override - public long snapshotId() { - return snapshotId; - } - - @Override - public String metadataEncryptionKeyID() { - return metadataEncryptionKeyID; - } - - @Override - public ByteBuffer encryptedKeyMetadata() { - if (encryptedKeyMetadata == null) { - return null; - } - - return ByteBuffer.wrap(encryptedKeyMetadata); + public String encryptionKeyID() { + return encryptionKeyID; } @Override public ByteBuffer decryptKeyMetadata(EncryptionManager em) { - return EncryptionUtil.decryptSnapshotKeyMetadata(this, em); + return EncryptionUtil.decryptManifestListKeyMetadata(this, em); } } diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index bd57c6277528..8cb32e395d20 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -30,6 +30,9 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -248,10 +251,16 @@ public static RewriteResult rewriteManifestList( mf.path(), sourcePrefix)); + EncryptionManager encryptionManager = + (io instanceof EncryptingFileIO) + ? ((EncryptingFileIO) io).encryptionManager() + : PlaintextEncryptionManager.instance(); + try (FileAppender writer = ManifestLists.write( tableMetadata.formatVersion(), outputFile, + encryptionManager, snapshot.snapshotId(), snapshot.parentId(), snapshot.sequenceNumber(), diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index df44a27ef6fe..53cec16dcd87 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -21,10 +21,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; -import org.apache.iceberg.encryption.SnapshotEncryptionKey; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -84,10 +82,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeEndObject(); } - ManifestListFile manifestList = snapshot.manifestListFile(); - if (manifestList.location() != null) { + String manifestList = snapshot.manifestListLocation(); + if (manifestList != null) { // write just the location. manifests should not be embedded in JSON along with a list - generator.writeStringField(MANIFEST_LIST, manifestList.location()); + generator.writeStringField(MANIFEST_LIST, manifestList); } else { // embed the manifest list in the JSON, v1 only JsonUtil.writeStringArray( @@ -124,10 +122,6 @@ public static String toJson(Snapshot snapshot, boolean pretty) { } static Snapshot fromJson(JsonNode node) { - return fromJson(node, null); - } - - static Snapshot fromJson(JsonNode node, Map encryptionKeys) { Preconditions.checkArgument( node.isObject(), "Cannot parse table version from a non-object: %s", node); @@ -185,20 +179,6 @@ static Snapshot fromJson(JsonNode node, Map encry if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); - - // If manifest list is encrypted, its key metadata are taken from encryption keys table - String encryptionKeyId = null; - ByteBuffer encryptedKeyMetadata = null; - if (encryptionKeys != null) { - String snapshotKeyID = Long.toString(snapshotId); - SnapshotEncryptionKey snapshotKey = encryptionKeys.get(snapshotKeyID); - encryptionKeyId = snapshotKey.encryptionKeyID(); - encryptedKeyMetadata = snapshotKey.keyPayloadBytes(); - } - - ManifestListFile manifestListFile = - new BaseManifestListFile(manifestList, snapshotId, encryptionKeyId, encryptedKeyMetadata); - return new BaseSnapshot( sequenceNumber, snapshotId, @@ -207,7 +187,7 @@ static Snapshot fromJson(JsonNode node, Map encry operation, summary, schemaId, - manifestListFile, + manifestList, firstRowId, addedRows, keyId); diff --git a/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java b/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java deleted file mode 100644 index e57647d569cc..000000000000 --- a/core/src/main/java/org/apache/iceberg/encryption/SnapshotEncryptionKey.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.encryption; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Base64; - -/** - * Encryption keys and metadata required for decrypting the manifest list files in snapshots of - * encrypted tables. - */ -public class SnapshotEncryptionKey implements Serializable { - private final String id; - private final String keyPayload; - private final ByteBuffer keyPayloadBytes; - private final String encryptionKeyID; - - public SnapshotEncryptionKey(String id, String keyPayload, String encryptionKeyID) { - this.id = id; - this.keyPayload = keyPayload; - this.keyPayloadBytes = ByteBuffer.wrap(Base64.getDecoder().decode(keyPayload)); - this.encryptionKeyID = encryptionKeyID; - } - - public String id() { - return id; - } - - public String keyPayload() { - return keyPayload; - } - - public ByteBuffer keyPayloadBytes() { - return keyPayloadBytes; - } - - public String encryptionKeyID() { - return encryptionKeyID; - } -} diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 75229f4e331e..9283a402475c 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -35,15 +35,16 @@ import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { + private static final String KEY_ENCRYPTION_KEY_ID = "KEY_ENCRYPTION_KEY_ID"; + private final String tableKeyId; private final int dataKeyLength; // a holder class of metadata that is not available after serialization private class TransientEncryptionState { private final KeyManagementClient kmsClient; - private final Map encryptionKeys; + private final Map encryptionKeys; private final LoadingCache unwrappedKeyCache; - private String currentKeyID; private TransientEncryptionState(KeyManagementClient kmsClient) { this.kmsClient = kmsClient; @@ -53,7 +54,8 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { .expireAfterWrite(1, TimeUnit.HOURS) .build( keyId -> - kmsClient.unwrapKey(encryptionKeys.get(keyId).keyPayloadBytes(), tableKeyId)); + kmsClient.unwrapKey( + encryptionKeys.get(keyId).encryptedKeyMetadata(), tableKeyId)); } } @@ -108,7 +110,7 @@ private SecureRandom workerRNG() { } /** - * @deprecated will be removed in 1.9.0. + * @deprecated will be removed in 1.11.0. */ @Deprecated public ByteBuffer wrapKey(ByteBuffer secretKey) { @@ -121,72 +123,86 @@ public ByteBuffer wrapKey(ByteBuffer secretKey) { } /** - * @deprecated will be removed in 1.9.0; use {@link #unwrapKey(String)}} instead. + * @deprecated will be removed in 1.11.0. */ @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - throw new UnsupportedOperationException("Use unwrapKey(String) instead"); - } - - public ByteBuffer unwrapKey(String keyId) { if (transientState == null) { throw new IllegalStateException("Cannot unwrap key after serialization"); } - return transientState.unwrappedKeyCache.get(keyId); + return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } - public String currentKeyID() { + private String keyEncryptionKeyID() { if (transientState == null) { throw new IllegalStateException("Cannot return the current key after serialization"); } - if (transientState.currentKeyID == null) { - createNewEncryptionKey(); + if (!transientState.encryptionKeys.containsKey(KEY_ENCRYPTION_KEY_ID)) { + ByteBuffer unwrapped = newKey(); + ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); + EncryptedKey key = new BaseEncryptedKey(KEY_ENCRYPTION_KEY_ID, wrapped, tableKeyId, null); + + // update internal tracking + transientState.unwrappedKeyCache.put(key.keyId(), unwrapped); + transientState.encryptionKeys.put(key.keyId(), key); } - return transientState.currentKeyID; + return KEY_ENCRYPTION_KEY_ID; } - public void addSnapshotKeyMetadata(SnapshotEncryptionKey key) { + ByteBuffer encryptedById(String manifestListKeyID) { if (transientState == null) { - throw new IllegalStateException("Cannot add key metadata after serialization"); + throw new IllegalStateException("Cannot find key encryption key after serialization"); } - if (transientState.encryptionKeys.containsKey(key.id())) { - throw new IllegalStateException("key metadata already exists for snapshot " + key.id()); + EncryptedKey encryptedKeyMetadata = transientState.encryptionKeys.get(manifestListKeyID); + if (encryptedKeyMetadata == null) { + throw new IllegalStateException( + "Cannot find manifest list key metadata with id " + manifestListKeyID); } - transientState.encryptionKeys.put(key.id(), key); + return transientState.unwrappedKeyCache.get(encryptedKeyMetadata.encryptedById()); } - private ByteBuffer newKey() { - byte[] newKey = new byte[dataKeyLength]; - workerRNG().nextBytes(newKey); - return ByteBuffer.wrap(newKey); - } + ByteBuffer encryptedKeyMetadata(String manifestListKeyID) { + if (transientState == null) { + throw new IllegalStateException("Cannot find encrypted key metadata after serialization"); + } - private String newKeyId() { - byte[] idBytes = new byte[8]; - workerRNG().nextBytes(idBytes); - return "k" + Base64.getEncoder().encodeToString(idBytes); + return transientState.encryptionKeys.get(manifestListKeyID).encryptedKeyMetadata(); } - private void createNewEncryptionKey() { + public String addManifestListKeyMetadata(NativeEncryptionKeyMetadata keyMetadata) { if (transientState == null) { - throw new IllegalStateException("Cannot create encryption keys after serialization"); + throw new IllegalStateException("Cannot add key metadata after serialization"); } - ByteBuffer unwrapped = newKey(); - ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); - SnapshotEncryptionKey key = - new SnapshotEncryptionKey( - newKeyId(), Base64.getEncoder().encodeToString(ByteBuffers.toByteArray(wrapped)), ""); + String manifestListKeyID = generateKeyId(); + ByteBuffer encryptedKeyMetadata = + EncryptionUtil.encryptManifestListKeyMetadata( + transientState.unwrappedKeyCache.get(keyEncryptionKeyID()), + manifestListKeyID, + keyMetadata); + BaseEncryptedKey key = + new BaseEncryptedKey(manifestListKeyID, encryptedKeyMetadata, keyEncryptionKeyID(), null); + + transientState.encryptionKeys.put(key.keyId(), key); + + return manifestListKeyID; + } + + private String generateKeyId() { + byte[] idBytes = new byte[16]; + workerRNG().nextBytes(idBytes); + return Base64.getEncoder().encodeToString(idBytes); + } - // update internal tracking - transientState.unwrappedKeyCache.put(key.id(), unwrapped); - transientState.encryptionKeys.put(key.id(), key); - transientState.currentKeyID = key.id(); + private ByteBuffer newKey() { + byte[] newKey = new byte[dataKeyLength]; + workerRNG().nextBytes(newKey); + return ByteBuffer.wrap(newKey); } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java index 22e227ebb9ce..f05fb1b95e67 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -51,6 +51,7 @@ public class TestManifestListEncryption { private static final long EXISTING_ROWS = 857273L; private static final int DELETED_FILES = 1; private static final long DELETED_ROWS = 22910L; + private static final long SNAPSHOT_FIRST_ROW_ID = 130L; private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = Conversions.toByteBuffer(Types.IntegerType.get(), 10); @@ -117,12 +118,13 @@ private ManifestFile writeAndReadEncryptedManifestList() throws IOException { ManifestListWriter writer = ManifestLists.write( - 2, - encryptingFileIO.encryptionManager(), + 3, outputFile, + encryptingFileIO.encryptionManager(), SNAPSHOT_ID, SNAPSHOT_ID - 1, - SEQ_NUM); + SEQ_NUM, + SNAPSHOT_FIRST_ROW_ID); writer.add(TEST_MANIFEST); writer.close(); ManifestListFile manifestListFile = writer.toManifestListFile(); diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index b7f60920a6a5..926667d9476e 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -59,7 +59,6 @@ public void testJsonConversion() throws IOException { Snapshot snapshot = SnapshotParser.fromJson(json); assertThat(snapshot.snapshotId()).isEqualTo(expected.snapshotId()); - assertThat(snapshot.allManifests(ops.io())).isEqualTo(expected.allManifests(ops.io())); assertThat(snapshot.operation()).isNull(); assertThat(snapshot.summary()).isNull(); assertThat(snapshot.schemaId()).isEqualTo(1); From 836a9c0780df2b831572bd26a2a5fbf4dc6ada32 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Fri, 9 May 2025 11:44:52 +0300 Subject: [PATCH 3/4] clean up --- .../org/apache/iceberg/encryption/EncryptionUtil.java | 6 +++--- .../iceberg/encryption/StandardEncryptionManager.java | 10 ++++++++-- .../org/apache/iceberg/TestManifestListEncryption.java | 6 ++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 976d0d626790..380a257bdfd3 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -129,7 +129,7 @@ public static ByteBuffer decryptManifestListKeyMetadata( "Snapshot key metadata encryption requires a StandardEncryptionManager"); StandardEncryptionManager sem = (StandardEncryptionManager) em; String manifestListKeyId = manifestList.encryptionKeyID(); - ByteBuffer keyEncryptionKey = sem.encryptedById(manifestListKeyId); + ByteBuffer keyEncryptionKey = sem.encryptedByKey(manifestListKeyId); ByteBuffer encryptedKeyMetadata = sem.encryptedKeyMetadata(manifestListKeyId); Ciphers.AesGcmDecryptor decryptor = @@ -141,11 +141,11 @@ public static ByteBuffer decryptManifestListKeyMetadata( } /** - * Encrypts the key metadata for a snapshot. + * Encrypts the key metadata for a manifest list. * * @param key key encryption key bytes * @param keyId ID of the manifest list key - * @param keyMetadata manifest list EncryptionKeyMetadata buffer + * @param keyMetadata manifest list key metadata * @return encrypted key metadata */ static ByteBuffer encryptManifestListKeyMetadata( diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 9283a402475c..8ffe964015e3 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -152,7 +152,7 @@ private String keyEncryptionKeyID() { return KEY_ENCRYPTION_KEY_ID; } - ByteBuffer encryptedById(String manifestListKeyID) { + ByteBuffer encryptedByKey(String manifestListKeyID) { if (transientState == null) { throw new IllegalStateException("Cannot find key encryption key after serialization"); } @@ -171,7 +171,13 @@ ByteBuffer encryptedKeyMetadata(String manifestListKeyID) { throw new IllegalStateException("Cannot find encrypted key metadata after serialization"); } - return transientState.encryptionKeys.get(manifestListKeyID).encryptedKeyMetadata(); + EncryptedKey encryptedKeyMetadata = transientState.encryptionKeys.get(manifestListKeyID); + if (encryptedKeyMetadata == null) { + throw new IllegalStateException( + "Cannot find manifest list key metadata with id " + manifestListKeyID); + } + + return encryptedKeyMetadata.encryptedKeyMetadata(); } public String addManifestListKeyMetadata(NativeEncryptionKeyMetadata keyMetadata) { diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java index f05fb1b95e67..8682e56d4a2f 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -51,6 +51,7 @@ public class TestManifestListEncryption { private static final long EXISTING_ROWS = 857273L; private static final int DELETED_FILES = 1; private static final long DELETED_ROWS = 22910L; + private static final long FIRST_ROW_ID = 100L; private static final long SNAPSHOT_FIRST_ROW_ID = 130L; private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = @@ -79,14 +80,15 @@ public class TestManifestListEncryption { SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES, - MANIFEST_KEY_METADATA); + FIRST_ROW_ID); private static final EncryptionManager ENCRYPTION_MANAGER = EncryptionTestHelpers.createEncryptionManager(); From 530b0ae076fbdd79bf29a2e602ddfbeb8b81720a Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Tue, 3 Jun 2025 21:57:40 +0100 Subject: [PATCH 4/4] Encryption for REST catalog --- .../iceberg/encryption/EncryptionUtil.java | 11 +- .../encryption/KeyManagementClient.java | 2 +- .../encryption/StandardEncryptionManager.java | 36 ++- .../iceberg/rest/RESTSessionCatalog.java | 11 + .../iceberg/rest/RESTTableOperations.java | 113 ++++++++- .../spark/sql/TestRestTableEncryption.java | 234 ++++++++++++++++++ 6 files changed, 396 insertions(+), 11 deletions(-) create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 380a257bdfd3..359d439e6f95 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -87,7 +87,7 @@ public static EncryptionManager createEncryptionManager( return createEncryptionManager(List.of(), tableProperties, kmsClient); } - static EncryptionManager createEncryptionManager( + public static EncryptionManager createEncryptionManager( List keys, Map tableProperties, KeyManagementClient kmsClient) { Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); @@ -108,7 +108,7 @@ static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength, kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { @@ -156,4 +156,11 @@ static ByteBuffer encryptManifestListKeyMetadata( encryptor.encrypt(keyMetadataBytes, keyId.getBytes(StandardCharsets.UTF_8)); return ByteBuffer.wrap(encryptedKeyMetadata); } + + public static Map encryptionKeys(EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Encryption keys are only available for StandardEncryptionManager"); + return ((StandardEncryptionManager) em).encryptionKeys(); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java index a7fb494cc8e1..6f834c69ed86 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java @@ -24,7 +24,7 @@ import java.util.Map; /** A minimum client interface to connect to a key management service (KMS). */ -interface KeyManagementClient extends Serializable, Closeable { +public interface KeyManagementClient extends Serializable, Closeable { /** * Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID. diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 8ffe964015e3..180114021ceb 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.TableProperties; @@ -46,9 +47,16 @@ private class TransientEncryptionState { private final Map encryptionKeys; private final LoadingCache unwrappedKeyCache; - private TransientEncryptionState(KeyManagementClient kmsClient) { + private TransientEncryptionState(List keys, KeyManagementClient kmsClient) { this.kmsClient = kmsClient; this.encryptionKeys = Maps.newLinkedHashMap(); + + for (EncryptedKey key : keys) { + Preconditions.checkArgument( + key.keyId() != null, "Key id cannot be null"); // Required by spec. + encryptionKeys.put(key.keyId(), key); + } + this.unwrappedKeyCache = Caffeine.newBuilder() .expireAfterWrite(1, TimeUnit.HOURS) @@ -64,12 +72,26 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { private transient volatile SecureRandom lazyRNG = null; /** + * @deprecated will be removed in 1.11.0; use {@link #StandardEncryptionManager(List, String, int, + * KeyManagementClient)} instead. + */ + @Deprecated + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + this(List.of(), tableKeyId, dataKeyLength, kmsClient); + } + + /** + * @param keys a list of existing {@link EncryptedKey}s for this {@link EncryptionManager} to use * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + List keys, + String tableKeyId, + int dataKeyLength, + KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -77,7 +99,7 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.transientState = new TransientEncryptionState(kmsClient); + this.transientState = new TransientEncryptionState(keys, kmsClient); this.dataKeyLength = dataKeyLength; } @@ -199,6 +221,14 @@ public String addManifestListKeyMetadata(NativeEncryptionKeyMetadata keyMetadata return manifestListKeyID; } + public Map encryptionKeys() { + if (transientState == null) { + throw new IllegalStateException("Cannot return encryption keys after serialization"); + } + + return transientState.encryptionKeys; + } + private String generateKeyId() { byte[] idBytes = new byte[16]; workerRNG().nextBytes(idBytes); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index edcbc5229362..85ecff083f8e 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,6 +47,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -158,6 +160,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; + private KeyManagementClient kmsClient = null; enum SnapshotMode { ALL, @@ -251,6 +254,9 @@ public void initialize(String name, Map unresolved) { this.reportingViaRestEnabled = PropertyUtil.propertyAsBoolean(mergedProps, REST_METRICS_REPORTING_ENABLED, true); + if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.kmsClient = EncryptionUtil.createKmsClient(mergedProps); + } super.initialize(name, mergedProps); } @@ -446,6 +452,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, tableMetadata, endpoints); @@ -525,6 +532,7 @@ public Table registerTable( paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -784,6 +792,7 @@ public Table create() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -811,6 +820,7 @@ public Transaction createTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta, @@ -874,6 +884,7 @@ public Transaction replaceTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.REPLACE, changes.build(), base, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 5f6c28b32337..bd12d88f77ec 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -31,12 +31,19 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; @@ -55,20 +62,37 @@ enum UpdateType { private final String path; private final Supplier> headers; private final FileIO io; + private final KeyManagementClient kmsClient; private final List createChanges; private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String encryptionKeyId; + private int encryptionDekLength; + private List encryptedKeysFromMetadata; + RESTTableOperations( RESTClient client, String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, TableMetadata current, Set endpoints) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); + this( + client, + path, + headers, + io, + kmsClient, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); } RESTTableOperations( @@ -76,6 +100,7 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, UpdateType updateType, List createChanges, TableMetadata current, @@ -84,6 +109,7 @@ enum UpdateType { this.path = path; this.headers = headers; this.io = io; + this.kmsClient = kmsClient; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -93,6 +119,10 @@ enum UpdateType { this.current = current; } this.endpoints = endpoints; + + // N.B. We don't use this.current because for tables-to-be-created, because it would be null, + // and ee still want encrypted properties in this case for its TableOperations. + encryptionPropsFromMetadata(current); } @Override @@ -113,6 +143,18 @@ public void commit(TableMetadata base, TableMetadata metadata) { Consumer errorHandler; List requirements; List updates; + + TableMetadata metadataToCommit = metadata; + if (encryption() instanceof StandardEncryptionManager) { + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encryption()).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + metadataToCommit = builder.build(); + // TODO(smaheshwar): Think about requirements. + } + switch (updateType) { case CREATE: Preconditions.checkState( @@ -120,7 +162,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { updates = ImmutableList.builder() .addAll(createChanges) - .addAll(metadata.changes()) + .addAll(metadataToCommit.changes()) .build(); requirements = UpdateRequirements.forCreateTable(updates); errorHandler = ErrorHandlers.tableErrorHandler(); // throws NoSuchTableException @@ -131,7 +173,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { updates = ImmutableList.builder() .addAll(createChanges) - .addAll(metadata.changes()) + .addAll(metadataToCommit.changes()) .build(); // use the original replace base metadata because the transaction will refresh requirements = UpdateRequirements.forReplaceTable(replaceBase, updates); @@ -140,7 +182,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { case SIMPLE: Preconditions.checkState(base != null, "Invalid base metadata: null"); - updates = metadata.changes(); + updates = metadataToCommit.changes(); requirements = UpdateRequirements.forUpdateTable(base, updates); errorHandler = ErrorHandlers.tableCommitHandler(); break; @@ -166,7 +208,67 @@ public void commit(TableMetadata base, TableMetadata metadata) { @Override public FileIO io() { - return io; + if (encryptionKeyId == null) { + return io; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(io, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (encryptionKeyId != null) { + if (kmsClient == null) { + throw new RuntimeException( + "Cant create encryption manager, because key management client is not set"); + } + + Map tableProperties = Maps.newHashMap(); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, encryptionKeyId); + tableProperties.put( + TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength)); + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeysFromMetadata, tableProperties, kmsClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; + } + + private void encryptionPropsFromMetadata(TableMetadata metadata) { + // TODO(smaheshwar): Check generally for changed encryption-related properties! + if (metadata == null || metadata.properties() == null) { + return; + } + + encryptedKeysFromMetadata = metadata.encryptionKeys(); + + Map tableProperties = metadata.properties(); + if (encryptionKeyId == null) { + encryptionKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + } + + if (encryptionKeyId != null && encryptionDekLength <= 0) { + String dekLength = tableProperties.get(TableProperties.ENCRYPTION_DEK_LENGTH); + encryptionDekLength = + (dekLength == null) + ? TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT + : Integer.parseInt(dekLength); + } + + // Force re-creation of encryptingFileIO and encryptionManager + encryptingFileIO = null; + encryptionManager = null; } private TableMetadata updateCurrentMetadata(LoadTableResponse response) { @@ -175,6 +277,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) { // safely ignored. there is no requirement to update config on refresh or commit. if (current == null || !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { + encryptionPropsFromMetadata(response.tableMetadata()); this.current = response.tableMetadata(); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java new file mode 100644 index 000000000000..b3ca0f7e7dee --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.Ciphers; +import org.apache.iceberg.encryption.UnitestKMS; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.types.Types; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; + +// TODO(smaheshwar): This test is taken from https://github.com/apache/iceberg/pull/13066, with the +// exception of testCtas, but adapted for the REST catalog. When that merges, we can directly use +// those tests for the REST catalog as well by adding to the parameters method there, to have a +// single test class for table encryption. +public class TestRestTableEncryption extends CatalogTestBase { + private static Map appendCatalogEncryptionProperties(Map props) { + Map newProps = Maps.newHashMap(); + newProps.putAll(props); + newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + return newProps; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) + } + }; + } + + @BeforeEach + public void createTables() { + sql( + "CREATE TABLE %s (id bigint, data string, float float) USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s')", + tableName, UnitestKMS.MASTER_KEY_NAME1); + + sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", tableName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testSelect() { + List expected = + ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", tableName)); + } + + @TestTemplate + public void testInsertAndDelete() { + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', float('NaN'))", tableName); + + List expected = + ImmutableList.of( + row(1L, "a", 1.0F), + row(2L, "b", 2.0F), + row(3L, "c", Float.NaN), + row(4L, "d", 4.0F), + row(5L, "e", 5.0F), + row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE id < 4", tableName); + + expected = ImmutableList.of(row(4L, "d", 4.0F), row(5L, "e", 5.0F), row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Disabled("We don't yet check changes to encryption properties") + @TestTemplate + public void testKeyDelete() { + assertThatThrownBy( + () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES (`encryption.key-id`)", tableName)) + .hasMessageContaining("Cannot remove key in encrypted table"); + } + + @TestTemplate + public void testDirectDataFileRead() { + List dataFileTable = + sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.ALL_DATA_FILES); + List dataFiles = + Streams.concat(dataFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (dataFiles.isEmpty()) { + throw new RuntimeException("No data files found for table " + tableName); + } + + Schema schema = new Schema(optional(0, "id", Types.IntegerType.get())); + for (String filePath : dataFiles) { + assertThatThrownBy( + () -> + Parquet.read(localInput(filePath)) + .project(schema) + .callInit() + .build() + .iterator() + .next()) + .isInstanceOf(ParquetCryptoRuntimeException.class) + .hasMessageContaining("Trying to read file with encrypted footer. No keys available"); + } + } + + @TestTemplate + public void testManifestEncryption() throws IOException { + List manifestFileTable = + sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS); + + List manifestFiles = + Streams.concat(manifestFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (manifestFiles.isEmpty()) { + throw new RuntimeException("No manifest files found for table " + tableName); + } + + String metadataFolderPath = null; + + // Check encryption of manifest files + for (String manifestFilePath : manifestFiles) { + checkMetadataFileEncryption(localInput(manifestFilePath)); + + if (metadataFolderPath == null) { + metadataFolderPath = new File(manifestFilePath).getParent().replaceFirst("file:", ""); + } + } + + if (metadataFolderPath == null) { + throw new RuntimeException("No metadata folder found for table " + tableName); + } + + // Find manifest list and metadata files; check their encryption + File[] listOfMetadataFiles = new File(metadataFolderPath).listFiles(); + boolean foundManifestListFile = false; + + for (File metadataFile : listOfMetadataFiles) { + if (metadataFile.getName().startsWith("snap-")) { + foundManifestListFile = true; + checkMetadataFileEncryption(localInput(metadataFile)); + } + } + + if (!foundManifestListFile) { + throw new RuntimeException("No manifest list files found for table " + tableName); + } + } + + @TestTemplate + public void testCtas() { + String tableName = this.tableName + "_ctas"; + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s') AS SELECT * FROM VALUES (1, 'a', 1.0), (2, 'b', 2.0)" + + " AS t(id, data, float)", + tableName, UnitestKMS.MASTER_KEY_NAME1); + + assertThat(sql("SELECT * FROM %s", tableName).size()).isEqualTo(2); + } + + private void checkMetadataFileEncryption(InputFile file) throws IOException { + SeekableInputStream stream = file.newStream(); + byte[] magic = new byte[4]; + stream.read(magic); + stream.close(); + assertThat(magic).isEqualTo(Ciphers.GCM_STREAM_MAGIC_STRING.getBytes(StandardCharsets.UTF_8)); + } +}