Skip to content

Commit 80e6317

Browse files
shardulm94rzhang10
authored andcommitted
Row level filtering: Allow table scans to pass a row level filter for ORC files
- ORC: Support NameMapping with row-level filtering (linkedin#53)
1 parent 5373637 commit 80e6317

File tree

11 files changed

+576
-34
lines changed

11 files changed

+576
-34
lines changed

core/src/main/java/org/apache/iceberg/BaseFileScanTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,5 +232,9 @@ public SplitScanTask merge(ScanTask other) {
232232
SplitScanTask that = (SplitScanTask) other;
233233
return new SplitScanTask(offset, len + that.length(), fileScanTask);
234234
}
235+
236+
public FileScanTask underlyingFileScanTask() {
237+
return fileScanTask;
238+
}
235239
}
236240
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.data.orc;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.LongStream;
27+
import org.apache.iceberg.Files;
28+
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.data.DataTestHelpers;
30+
import org.apache.iceberg.data.GenericRecord;
31+
import org.apache.iceberg.data.Record;
32+
import org.apache.iceberg.io.CloseableIterable;
33+
import org.apache.iceberg.io.FileAppender;
34+
import org.apache.iceberg.orc.ORC;
35+
import org.apache.iceberg.orc.OrcRowFilter;
36+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
37+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
38+
import org.apache.iceberg.types.Types;
39+
import org.junit.Assert;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
import org.junit.rules.TemporaryFolder;
43+
44+
import static org.apache.iceberg.types.Types.NestedField.optional;
45+
import static org.apache.iceberg.types.Types.NestedField.required;
46+
47+
public class TestOrcRowLevelFiltering {
48+
49+
@Rule
50+
public TemporaryFolder temp = new TemporaryFolder();
51+
52+
private static final Schema SCHEMA = new Schema(
53+
required(100, "id", Types.LongType.get()),
54+
required(101, "data1", Types.StringType.get()),
55+
required(102, "data2", Types.StringType.get())
56+
);
57+
58+
private static final List<Record> RECORDS = LongStream.range(0, 100).mapToObj(i -> {
59+
Record record = GenericRecord.create(SCHEMA);
60+
record.set(0, i);
61+
record.set(1, "data1:" + i);
62+
record.set(2, "data2:" + i);
63+
return record;
64+
}).collect(Collectors.toList());
65+
66+
@Test
67+
public void testReadOrcWithRowFilterNoProjection() throws IOException {
68+
testReadOrcWithRowFilter(SCHEMA, rowFilterId(), RECORDS.subList(75, 100));
69+
}
70+
71+
@Test
72+
public void testReadOrcWithRowFilterProjection() throws IOException {
73+
Schema projectedSchema = new Schema(
74+
required(101, "data1", Types.StringType.get())
75+
);
76+
77+
List<Record> expected = RECORDS.subList(75, 100).stream().map(r -> {
78+
Record record = GenericRecord.create(projectedSchema);
79+
record.set(0, r.get(1));
80+
return record;
81+
}).collect(Collectors.toList());
82+
83+
testReadOrcWithRowFilter(projectedSchema, rowFilterId(), expected);
84+
}
85+
86+
@Test
87+
public void testReadOrcWithRowFilterPartialFilterColumns() throws IOException {
88+
Schema projectedSchema = new Schema(
89+
required(101, "data1", Types.StringType.get()),
90+
required(102, "data2", Types.StringType.get())
91+
);
92+
93+
List<Record> expected = RECORDS.subList(25, 75).stream().map(r -> {
94+
Record record = GenericRecord.create(projectedSchema);
95+
record.set(0, r.get(1));
96+
record.set(1, r.get(2));
97+
return record;
98+
}).collect(Collectors.toList());
99+
100+
testReadOrcWithRowFilter(projectedSchema, rowFilterIdAndData1(), expected);
101+
}
102+
103+
@Test
104+
public void testReadOrcWithRowFilterNonExistentColumn() throws IOException {
105+
testReadOrcWithRowFilter(SCHEMA, rowFilterData3(), ImmutableList.of());
106+
}
107+
108+
private void testReadOrcWithRowFilter(Schema schema, OrcRowFilter rowFilter, List<Record> expected)
109+
throws IOException {
110+
File testFile = temp.newFile();
111+
Assert.assertTrue("Delete should succeed", testFile.delete());
112+
try (FileAppender<Record> writer = ORC.write(Files.localOutput(testFile))
113+
.schema(SCHEMA)
114+
.createWriterFunc(GenericOrcWriter::buildWriter)
115+
.build()) {
116+
for (Record rec : RECORDS) {
117+
writer.add(rec);
118+
}
119+
}
120+
121+
List<Record> rows;
122+
try (CloseableIterable<Record> reader = ORC.read(Files.localInput(testFile))
123+
.project(schema)
124+
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
125+
.rowFilter(rowFilter)
126+
.build()) {
127+
rows = Lists.newArrayList(reader);
128+
}
129+
130+
for (int i = 0; i < expected.size(); i += 1) {
131+
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i));
132+
}
133+
}
134+
135+
private OrcRowFilter rowFilterId() {
136+
return new OrcRowFilter() {
137+
@Override
138+
public Schema requiredSchema() {
139+
return new Schema(
140+
required(100, "id", Types.LongType.get())
141+
);
142+
}
143+
144+
@Override
145+
public boolean shouldKeep(Object[] values) {
146+
return (Long) values[0] >= 75;
147+
}
148+
};
149+
}
150+
151+
private OrcRowFilter rowFilterIdAndData1() {
152+
return new OrcRowFilter() {
153+
@Override
154+
public Schema requiredSchema() {
155+
return new Schema(
156+
SCHEMA.findField("id"),
157+
SCHEMA.findField("data1")
158+
);
159+
}
160+
161+
@Override
162+
public boolean shouldKeep(Object[] values) {
163+
return (Long) values[0] >= 25 && ((String) values[1]).compareTo("data1:75") < 0;
164+
}
165+
};
166+
}
167+
168+
private OrcRowFilter rowFilterData3() {
169+
return new OrcRowFilter() {
170+
@Override
171+
public Schema requiredSchema() {
172+
return new Schema(
173+
optional(104, "data3", Types.LongType.get())
174+
);
175+
}
176+
177+
@Override
178+
public boolean shouldKeep(Object[] values) {
179+
return values[0] != null && (Long) values[0] >= 25;
180+
}
181+
};
182+
}
183+
}

orc/src/main/java/org/apache/iceberg/orc/ORC.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ public static class ReadBuilder {
578578
private Expression filter = null;
579579
private boolean caseSensitive = true;
580580
private NameMapping nameMapping = null;
581+
private OrcRowFilter rowFilter = null;
581582

582583
private Function<TypeDescription, OrcRowReader<?>> readerFunc;
583584
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
@@ -654,10 +655,15 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
654655
return this;
655656
}
656657

658+
public ReadBuilder rowFilter(OrcRowFilter newRowFilter) {
659+
this.rowFilter = newRowFilter;
660+
return this;
661+
}
662+
657663
public <D> CloseableIterable<D> build() {
658664
Preconditions.checkNotNull(schema, "Schema is required");
659665
return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter,
660-
batchedReaderFunc, recordsPerBatch);
666+
batchedReaderFunc, recordsPerBatch, rowFilter);
661667
}
662668
}
663669

0 commit comments

Comments
 (0)