Skip to content

Commit 92a264b

Browse files
authored
Use bulk decryption interface in ArrowReader (#2720)
1 parent f81d8ad commit 92a264b

File tree

3 files changed

+21
-10
lines changed

3 files changed

+21
-10
lines changed

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
package org.apache.iceberg.arrow.vectorized;
2121

2222
import java.io.IOException;
23+
import java.nio.ByteBuffer;
2324
import java.util.Collection;
24-
import java.util.Collections;
2525
import java.util.Iterator;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.function.Function;
2928
import java.util.stream.Collectors;
3029
import java.util.stream.Stream;
3130
import java.util.stream.StreamSupport;
@@ -38,6 +37,7 @@
3837
import org.apache.iceberg.Schema;
3938
import org.apache.iceberg.TableScan;
4039
import org.apache.iceberg.encryption.EncryptedFiles;
40+
import org.apache.iceberg.encryption.EncryptedInputFile;
4141
import org.apache.iceberg.encryption.EncryptionManager;
4242
import org.apache.iceberg.io.CloseableGroup;
4343
import org.apache.iceberg.io.CloseableIterable;
@@ -49,6 +49,7 @@
4949
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
5050
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5151
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
52+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
5253
import org.apache.iceberg.types.Types;
5354
import org.apache.parquet.schema.MessageType;
5455

@@ -210,11 +211,21 @@ private static final class VectorizedCombinedScanIterator implements CloseableIt
210211
.flatMap(Collection::stream)
211212
.collect(Collectors.toList());
212213
this.fileItr = fileTasks.iterator();
213-
this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
214+
215+
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
216+
fileTasks.stream()
214217
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
215-
.map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata()))
216-
.map(encryptionManager::decrypt)
217-
.collect(Collectors.toMap(InputFile::location, Function.identity())));
218+
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
219+
220+
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
221+
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
222+
223+
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
224+
Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
225+
226+
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(fileTasks.size());
227+
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
228+
this.inputFiles = ImmutableMap.copyOf(files);
218229
this.currentIterator = CloseableIterator.empty();
219230
this.expectedSchema = expectedSchema;
220231
this.nameMapping = nameMapping;

flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.IOException;
2323
import java.io.UncheckedIOException;
2424
import java.nio.ByteBuffer;
25-
import java.util.Collections;
2625
import java.util.Iterator;
2726
import java.util.Map;
2827
import java.util.stream.Stream;
@@ -35,6 +34,7 @@
3534
import org.apache.iceberg.io.FileIO;
3635
import org.apache.iceberg.io.InputFile;
3736
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3838
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
3939

4040
/**
@@ -64,7 +64,7 @@ abstract class DataIterator<T> implements CloseableIterator<T> {
6464

6565
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
6666
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
67-
this.inputFiles = Collections.unmodifiableMap(files);
67+
this.inputFiles = ImmutableMap.copyOf(files);
6868

6969
this.currentIterator = CloseableIterator.empty();
7070
}

spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.IOException;
2424
import java.math.BigDecimal;
2525
import java.nio.ByteBuffer;
26-
import java.util.Collections;
2726
import java.util.Iterator;
2827
import java.util.Map;
2928
import java.util.stream.Stream;
@@ -38,6 +37,7 @@
3837
import org.apache.iceberg.io.FileIO;
3938
import org.apache.iceberg.io.InputFile;
4039
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
40+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4141
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4242
import org.apache.iceberg.types.Type;
4343
import org.apache.iceberg.util.ByteBuffers;
@@ -76,7 +76,7 @@ abstract class BaseDataReader<T> implements Closeable {
7676

7777
Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
7878
decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
79-
this.inputFiles = Collections.unmodifiableMap(files);
79+
this.inputFiles = ImmutableMap.copyOf(files);
8080

8181
this.currentIterator = CloseableIterator.empty();
8282
}

0 commit comments

Comments
 (0)