2020package org .apache .iceberg .orc ;
2121
2222import java .io .IOException ;
23+ import java .util .Set ;
2324import java .util .function .Function ;
2425import org .apache .hadoop .conf .Configuration ;
2526import org .apache .iceberg .Schema ;
3132import org .apache .iceberg .io .CloseableIterable ;
3233import org .apache .iceberg .io .CloseableIterator ;
3334import org .apache .iceberg .io .InputFile ;
35+ import org .apache .iceberg .relocated .com .google .common .base .Preconditions ;
36+ import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
37+ import org .apache .iceberg .types .TypeUtil ;
3438import org .apache .iceberg .util .Pair ;
3539import org .apache .orc .Reader ;
3640import org .apache .orc .TypeDescription ;
@@ -51,11 +55,13 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
5155 private final boolean caseSensitive ;
5256 private final Function <TypeDescription , OrcBatchReader <?>> batchReaderFunction ;
5357 private final int recordsPerBatch ;
58+ private final OrcRowFilter rowFilter ;
5459
5560 OrcIterable (InputFile file , Configuration config , Schema schema ,
5661 Long start , Long length ,
5762 Function <TypeDescription , OrcRowReader <?>> readerFunction , boolean caseSensitive , Expression filter ,
58- Function <TypeDescription , OrcBatchReader <?>> batchReaderFunction , int recordsPerBatch ) {
63+ Function <TypeDescription , OrcBatchReader <?>> batchReaderFunction , int recordsPerBatch ,
64+ OrcRowFilter rowFilter ) {
5965 this .schema = schema ;
6066 this .readerFunction = readerFunction ;
6167 this .file = file ;
@@ -66,6 +72,7 @@ class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
6672 this .filter = (filter == Expressions .alwaysTrue ()) ? null : filter ;
6773 this .batchReaderFunction = batchReaderFunction ;
6874 this .recordsPerBatch = recordsPerBatch ;
75+ this .rowFilter = rowFilter ;
6976 }
7077
7178 @ SuppressWarnings ("unchecked" )
@@ -81,16 +88,37 @@ public CloseableIterator<T> iterator() {
8188 sarg = ExpressionToSearchArgument .convert (boundFilter , readOrcSchema );
8289 }
8390
84- VectorizedRowBatchIterator rowBatchIterator = newOrcIterator (file , readOrcSchema , start , length , orcFileReader ,
85- sarg , recordsPerBatch );
86- if (batchReaderFunction != null ) {
87- OrcBatchReader <T > batchReader = (OrcBatchReader <T >) batchReaderFunction .apply (readOrcSchema );
88- return CloseableIterator .transform (rowBatchIterator , pair -> {
89- batchReader .setBatchContext (pair .second ());
90- return batchReader .read (pair .first ());
91- });
91+ if (rowFilter == null ) {
92+ VectorizedRowBatchIterator rowBatchIterator = newOrcIterator (file , readOrcSchema , start , length , orcFileReader ,
93+ sarg , recordsPerBatch );
94+ if (batchReaderFunction != null ) {
95+ OrcBatchReader <T > batchReader = (OrcBatchReader <T >) batchReaderFunction .apply (readOrcSchema );
96+ return CloseableIterator .transform (rowBatchIterator , pair -> {
97+ batchReader .setBatchContext (pair .second ());
98+ return batchReader .read (pair .first ());
99+ });
100+ } else {
101+ return new OrcRowIterator <>(rowBatchIterator , (OrcRowReader <T >) readerFunction .apply (readOrcSchema ),
102+ null , null );
103+ }
92104 } else {
93- return new OrcRowIterator <>(rowBatchIterator , (OrcRowReader <T >) readerFunction .apply (readOrcSchema ));
105+ Preconditions .checkArgument (batchReaderFunction == null ,
106+ "Row-level filtering not supported by vectorized reader" );
107+ Set <Integer > filterColumnIds = TypeUtil .getProjectedIds (rowFilter .requiredSchema ());
108+ Set <Integer > filterColumnIdsNotInReadSchema = Sets .difference (filterColumnIds ,
109+ TypeUtil .getProjectedIds (schema ));
110+ Schema extraFilterColumns = TypeUtil .select (rowFilter .requiredSchema (), filterColumnIdsNotInReadSchema );
111+ Schema finalReadSchema = TypeUtil .join (schema , extraFilterColumns );
112+
113+ TypeDescription finalReadOrcSchema = ORCSchemaUtil .buildOrcProjection (finalReadSchema ,
114+ orcFileReader .getSchema ());
115+ TypeDescription rowFilterOrcSchema = ORCSchemaUtil .buildOrcProjection (rowFilter .requiredSchema (),
116+ orcFileReader .getSchema ());
117+ RowFilterValueReader filterReader = new RowFilterValueReader (finalReadOrcSchema , rowFilterOrcSchema );
118+
119+ return new OrcRowIterator <>(
120+ newOrcIterator (file , finalReadOrcSchema , start , length , orcFileReader , sarg , recordsPerBatch ),
121+ (OrcRowReader <T >) readerFunction .apply (readOrcSchema ), rowFilter , filterReader );
94122 }
95123 }
96124
@@ -116,34 +144,67 @@ private static VectorizedRowBatchIterator newOrcIterator(InputFile file,
116144
117145 private static class OrcRowIterator <T > implements CloseableIterator <T > {
118146
119- private int nextRow ;
120- private VectorizedRowBatch current ;
147+ private int currentRow ;
148+ private VectorizedRowBatch currentBatch ;
149+ private boolean advanced = false ;
121150
122151 private final VectorizedRowBatchIterator batchIter ;
123152 private final OrcRowReader <T > reader ;
153+ private final OrcRowFilter filter ;
154+ private final RowFilterValueReader filterReader ;
124155
125- OrcRowIterator (VectorizedRowBatchIterator batchIter , OrcRowReader <T > reader ) {
156+ OrcRowIterator (VectorizedRowBatchIterator batchIter , OrcRowReader <T > reader , OrcRowFilter filter ,
157+ RowFilterValueReader filterReader ) {
126158 this .batchIter = batchIter ;
127159 this .reader = reader ;
128- current = null ;
129- nextRow = 0 ;
160+ this .filter = filter ;
161+ this .filterReader = filterReader ;
162+ currentBatch = null ;
163+ currentRow = 0 ;
164+ }
165+
166+ private void advance () {
167+ if (!advanced ) {
168+ while (true ) {
169+ currentRow ++;
170+ // if batch has been consumed, move to next batch
171+ if (currentBatch == null || currentRow >= currentBatch .size ) {
172+ if (batchIter .hasNext ()) {
173+ Pair <VectorizedRowBatch , Long > nextBatch = batchIter .next ();
174+ currentBatch = nextBatch .first ();
175+ currentRow = 0 ;
176+ reader .setBatchContext (nextBatch .second ());
177+ if (filterReader != null ) {
178+ filterReader .setBatchContext (nextBatch .second ());
179+ }
180+ } else {
181+ // no more batches left to process
182+ currentBatch = null ;
183+ currentRow = -1 ;
184+ break ;
185+ }
186+ }
187+ if (filter == null || filter .shouldKeep (filterReader .read (currentBatch , currentRow ))) {
188+ // we have found our row
189+ break ;
190+ }
191+ }
192+ advanced = true ;
193+ }
130194 }
131195
132196 @ Override
133197 public boolean hasNext () {
134- return (current != null && nextRow < current .size ) || batchIter .hasNext ();
198+ advance ();
199+ return currentBatch != null ;
135200 }
136201
137202 @ Override
138203 public T next () {
139- if (current == null || nextRow >= current .size ) {
140- Pair <VectorizedRowBatch , Long > nextBatch = batchIter .next ();
141- current = nextBatch .first ();
142- nextRow = 0 ;
143- this .reader .setBatchContext (nextBatch .second ());
144- }
145-
146- return this .reader .read (current , nextRow ++);
204+ advance ();
205+ // mark current row as used
206+ advanced = false ;
207+ return this .reader .read (currentBatch , currentRow );
147208 }
148209
149210 @ Override
0 commit comments