diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index fa39d23e43fe..e96d32722f9a 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -25,6 +25,7 @@ import org.apache.iceberg.avro.InternalReader; import org.apache.iceberg.avro.InternalWriter; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; @@ -163,6 +164,11 @@ public interface ReadBuilder { /** Set a custom class for in-memory objects at the given field ID. */ ReadBuilder setCustomType(int fieldId, Class structClass); + /** Set a filter to apply on result rows if applicable. */ + default ReadBuilder filter(Expression newFilter) { + return this; + } + /** Build the configured reader. */ CloseableIterable build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestInternalData.java b/core/src/test/java/org/apache/iceberg/TestInternalData.java index ae1678e04d5f..e93de8338bd1 100644 --- a/core/src/test/java/org/apache/iceberg/TestInternalData.java +++ b/core/src/test/java/org/apache/iceberg/TestInternalData.java @@ -18,13 +18,16 @@ */ package org.apache.iceberg; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.List; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; @@ -155,4 +158,47 @@ public void testCustomTypeForNestedField() throws IOException { } } } + + @TestTemplate + public void testFilter() throws IOException { + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + int numRecords = 1000; + List testData = Lists.newArrayListWithExpectedSize(numRecords); + for (int i = 0; i < numRecords; i += 1) { + Record record = GenericRecord.create(SIMPLE_SCHEMA.asStruct()); + record.set(0, (long) i); + record.set(1, "some_str"); + testData.add(record); + } + + int numRowBatch = 8 * 10; // 1 row batch contains 10 longs + try (FileAppender appender = + InternalData.write(format, outputFile) + .set(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(numRowBatch)) + .schema(SIMPLE_SCHEMA) + .build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List readRecords = Lists.newArrayList(); + + try (CloseableIterable reader = + InternalData.read(format, inputFile) + .project(SIMPLE_SCHEMA) + .setRootType(PartitionData.class) + .filter(Expressions.lessThan("id", 100)) + .build()) { + for (PartitionData record : reader) { + readRecords.add(record); + } + } + + if (format.equals(FileFormat.PARQUET)) { + assertThat(readRecords).hasSize(100); + } else { + assertThat(readRecords).hasSameSizeAs(testData); + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 2b2e460ee994..92dc9d1126e7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1279,6 +1279,7 @@ public ReadBuilder filterRecords(boolean newFilterRecords) { return this; } + @Override public ReadBuilder filter(Expression newFilter) { this.filter = newFilter; return this;