diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6166d360d..4d0160949 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -67,6 +67,7 @@ opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } paste = { workspace = true } +rand = { workspace = true } reqwest = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b058c8d25..43ab946bc 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -32,7 +32,7 @@ use fnv::FnvHashSet; use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; use parquet::file::metadata::ParquetMetaData; @@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; @@ -56,6 +57,7 @@ pub struct ArrowReaderBuilder { file_io: FileIO, concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, + row_selection_enabled: bool, } impl ArrowReaderBuilder { @@ -68,6 +70,7 @@ impl ArrowReaderBuilder { file_io, concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, + row_selection_enabled: false, } } @@ -90,6 +93,12 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable row selection. + pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self { + self.row_selection_enabled = row_selection_enabled; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { @@ -97,6 +106,7 @@ impl ArrowReaderBuilder { file_io: self.file_io, concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, } } } @@ -111,6 +121,7 @@ pub struct ArrowReader { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, + row_selection_enabled: bool, } impl ArrowReader { @@ -121,6 +132,7 @@ impl ArrowReader { let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; + let row_selection_enabled = self.row_selection_enabled; let (tx, rx) = channel(concurrency_limit_data_files); let mut channel_for_error = tx.clone(); @@ -142,6 +154,7 @@ impl ArrowReader { file_io, tx, row_group_filtering_enabled, + row_selection_enabled, ) .await }) @@ -168,6 +181,7 @@ impl ArrowReader { file_io: FileIO, mut tx: Sender>, row_group_filtering_enabled: bool, + row_selection_enabled: bool, ) -> Result<()> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -176,11 +190,12 @@ impl ArrowReader { try_join!(parquet_file.metadata(), parquet_file.reader())?; let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + let should_load_page_index = row_selection_enabled && task.predicate().is_some(); + // Start creating the record batch stream, which wraps the parquet file reader let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( parquet_file_reader, - // Page index will be required in upcoming row selection PR - ArrowReaderOptions::new().with_page_index(false), + ArrowReaderOptions::new().with_page_index(should_load_page_index), ) .await?; @@ -224,6 +239,19 @@ impl ArrowReader { selected_row_groups = Some(result); } + if row_selection_enabled { + let row_selection = Self::get_row_selection( + predicate, + record_batch_stream_builder.metadata(), + &selected_row_groups, + &field_id_map, + task.schema(), + )?; + + record_batch_stream_builder = + record_batch_stream_builder.with_row_selection(row_selection); + } + if let Some(selected_row_groups) = selected_row_groups { record_batch_stream_builder = record_batch_stream_builder.with_row_groups(selected_row_groups); @@ -377,6 +405,67 @@ impl ArrowReader { Ok(results) } + + fn get_row_selection( + predicate: &BoundPredicate, + parquet_metadata: &Arc, + selected_row_groups: &Option>, + field_id_map: &HashMap, + snapshot_schema: &Schema, + ) -> Result { + let Some(column_index) = parquet_metadata.column_index() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Parquet file metadata does not contain a column index", + )); + }; + + let Some(offset_index) = parquet_metadata.offset_index() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Parquet file metadata does not contain an offset index", + )); + }; + + let mut selected_row_groups_idx = 0; + + let page_index = column_index + .iter() + .enumerate() + .zip(offset_index) + .zip(parquet_metadata.row_groups()); + + let mut results = Vec::new(); + for (((idx, column_index), offset_index), row_group_metadata) in page_index { + if let Some(selected_row_groups) = selected_row_groups { + // skip row groups that aren't present in selected_row_groups + if idx == selected_row_groups[selected_row_groups_idx] { + selected_row_groups_idx += 1; + } else { + continue; + } + } + + let selections_for_page = PageIndexEvaluator::eval( + predicate, + column_index, + offset_index, + row_group_metadata, + field_id_map, + snapshot_schema, + )?; + + results.push(selections_for_page); + + if let Some(selected_row_groups) = selected_row_groups { + if selected_row_groups_idx == selected_row_groups.len() { + break; + } + } + } + + Ok(results.into_iter().flatten().collect::>().into()) + } } /// Build the map of parquet field id to Parquet column index in the schema. diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index 06bfd8cda..69ddf4bbf 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -20,4 +20,5 @@ pub(crate) mod expression_evaluator; pub(crate) mod inclusive_metrics_evaluator; pub(crate) mod inclusive_projection; pub(crate) mod manifest_evaluator; +pub(crate) mod page_index_evaluator; pub(crate) mod row_group_metrics_evaluator; diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs new file mode 100644 index 000000000..e8c1849a9 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -0,0 +1,1498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Evaluates predicates against a Parquet Page Index + +use std::collections::HashMap; + +use fnv::FnvHashSet; +use ordered_float::OrderedFloat; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use parquet::file::metadata::RowGroupMetaData; +use parquet::file::page_index::index::Index; +use parquet::format::PageLocation; + +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema}; +use crate::{Error, ErrorKind, Result}; + +type OffsetIndex = Vec>; + +const IN_PREDICATE_LIMIT: usize = 200; + +enum MissingColBehavior { + CantMatch, + MightMatch, +} + +enum PageNullCount { + AllNull, + NoneNull, + SomeNull, + Unknown, +} + +impl PageNullCount { + fn from_row_and_null_counts(num_rows: usize, null_count: Option) -> Self { + match (num_rows, null_count) { + (x, Some(y)) if x == y as usize => PageNullCount::AllNull, + (_, Some(0)) => PageNullCount::NoneNull, + (_, Some(_)) => PageNullCount::SomeNull, + _ => PageNullCount::Unknown, + } + } +} + +pub(crate) struct PageIndexEvaluator<'a> { + column_index: &'a [Index], + offset_index: &'a OffsetIndex, + row_group_metadata: &'a RowGroupMetaData, + iceberg_field_id_to_parquet_column_index: &'a HashMap, + snapshot_schema: &'a Schema, +} + +impl<'a> PageIndexEvaluator<'a> { + pub(crate) fn new( + column_index: &'a [Index], + offset_index: &'a OffsetIndex, + row_group_metadata: &'a RowGroupMetaData, + field_id_map: &'a HashMap, + snapshot_schema: &'a Schema, + ) -> Self { + Self { + column_index, + offset_index, + row_group_metadata, + iceberg_field_id_to_parquet_column_index: field_id_map, + snapshot_schema, + } + } + + /// Evaluate this `PageIndexEvaluator`'s filter predicate against a + /// specific page's column index entry in a parquet file's page index. + /// [`ArrowReader`] uses the resulting [`RowSelection`] to reject + /// pages within a parquet file's row group that cannot contain rows + /// matching the filter predicate. + pub(crate) fn eval( + filter: &'a BoundPredicate, + column_index: &'a [Index], + offset_index: &'a OffsetIndex, + row_group_metadata: &'a RowGroupMetaData, + field_id_map: &'a HashMap, + snapshot_schema: &'a Schema, + ) -> Result> { + if row_group_metadata.num_rows() == 0 { + return Ok(vec![]); + } + + let mut evaluator = Self::new( + column_index, + offset_index, + row_group_metadata, + field_id_map, + snapshot_schema, + ); + + Ok(visit(&mut evaluator, filter)?.iter().copied().collect()) + } + + fn select_all_rows(&self) -> Result { + Ok(vec![RowSelector::select( + self.row_group_metadata.num_rows() as usize + )] + .into()) + } + + fn skip_all_rows(&self) -> Result { + Ok(vec![RowSelector::skip( + self.row_group_metadata.num_rows() as usize + )] + .into()) + } + + fn calc_row_selection( + &self, + field_id: i32, + predicate: F, + missing_col_behavior: MissingColBehavior, + ) -> Result + where + F: Fn(Option, Option, PageNullCount) -> Result, + { + let Some(&parquet_column_index) = + self.iceberg_field_id_to_parquet_column_index.get(&field_id) + else { + // if the snapshot's column is not present in the row group, + // exit early + return match missing_col_behavior { + MissingColBehavior::CantMatch => self.skip_all_rows(), + MissingColBehavior::MightMatch => self.select_all_rows(), + }; + }; + + let Some(field) = self.snapshot_schema.field_by_id(field_id) else { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Field with id {} missing from snapshot schema", field_id), + )); + }; + + let Some(field_type) = field.field_type.as_primitive_type() else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Field with id {} not convertible to primitive type", + field_id + ), + )); + }; + + let Some(column_index) = self.column_index.get(parquet_column_index) else { + // This should not happen, but we fail soft anyway so that the scan is still + // successful, just a bit slower + return self.select_all_rows(); + }; + + let Some(offset_index) = self.offset_index.get(parquet_column_index) else { + // if we have a column index, we should always have an offset index. + return Err(Error::new( + ErrorKind::Unexpected, + format!("Missing offset index for field id {}", field_id), + )); + }; + + // TODO: cache row_counts to avoid recalcing if the same column + // appears multiple times in the filter predicate + let row_counts = self.calc_row_counts(offset_index); + + let Some(page_filter) = Self::apply_predicate_to_column_index( + predicate, + field_type, + column_index, + &row_counts, + )? + else { + return self.select_all_rows(); + }; + + let row_selectors: Vec<_> = row_counts + .iter() + .zip(page_filter.iter()) + .map(|(&row_count, &is_selected)| { + if is_selected { + RowSelector::select(row_count) + } else { + RowSelector::skip(row_count) + } + }) + .collect(); + + Ok(row_selectors.into()) + } + + /// returns a list of row counts per page + fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec { + let mut remaining_rows = self.row_group_metadata.num_rows() as usize; + let mut row_counts = Vec::with_capacity(self.offset_index.len()); + + for (idx, page_location) in offset_index.iter().enumerate() { + let row_count = if idx < offset_index.len() - 1 { + let row_count = (offset_index[idx + 1].first_row_index + - page_location.first_row_index) as usize; + remaining_rows -= row_count; + row_count + } else { + remaining_rows + }; + + row_counts.push(row_count); + } + + row_counts + } + + fn apply_predicate_to_column_index( + predicate: F, + field_type: &PrimitiveType, + column_index: &Index, + row_counts: &[usize], + ) -> Result>> + where + F: Fn(Option, Option, PageNullCount) -> Result, + { + let result: Result> = match column_index { + Index::NONE => { + return Ok(None); + } + Index::BOOLEAN(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.max.map(|val| { + Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) + }), + item.min.map(|val| { + Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) + }), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::INT32(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.max + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), + item.min + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::INT64(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.max + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), + item.min + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::FLOAT(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.min.map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Float(OrderedFloat::from(val)), + ) + }), + item.max.map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Float(OrderedFloat::from(val)), + ) + }), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::DOUBLE(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.max.map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Double(OrderedFloat::from(val)), + ) + }), + item.min.map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::Double(OrderedFloat::from(val)), + ) + }), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::BYTE_ARRAY(idx) => idx + .indexes + .iter() + .zip(row_counts.iter()) + .map(|(item, &row_count)| { + predicate( + item.min.clone().map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::String( + String::from_utf8(val.data().to_vec()).unwrap(), + ), + ) + }), + item.max.clone().map(|val| { + Datum::new( + field_type.clone(), + PrimitiveLiteral::String( + String::from_utf8(val.data().to_vec()).unwrap(), + ), + ) + }), + PageNullCount::from_row_and_null_counts(row_count, item.null_count), + ) + }) + .collect(), + Index::FIXED_LEN_BYTE_ARRAY(_) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "unsupported 'FIXED_LEN_BYTE_ARRAY' index type in column_index", + )) + } + Index::INT96(_) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "unsupported 'INT96' index type in column_index", + )) + } + }; + + Ok(Some(result?)) + } + + fn visit_inequality( + &mut self, + reference: &BoundReference, + datum: &Datum, + cmp_fn: fn(&Datum, &Datum) -> bool, + use_lower_bound: bool, + ) -> Result { + let field_id = reference.field().id; + + self.calc_row_selection( + field_id, + |min, max, null_count| { + if matches!(null_count, PageNullCount::AllNull) { + return Ok(false); + } + + if datum.is_nan() { + // NaN indicates unreliable bounds. + return Ok(true); + } + + let bound = if use_lower_bound { min } else { max }; + + if let Some(bound) = bound { + if cmp_fn(&bound, datum) { + return Ok(true); + } + + return Ok(false); + } + + Ok(true) + }, + MissingColBehavior::MightMatch, + ) + } +} + +impl BoundPredicateVisitor for PageIndexEvaluator<'_> { + type T = RowSelection; + + fn always_true(&mut self) -> Result { + self.select_all_rows() + } + + fn always_false(&mut self) -> Result { + self.skip_all_rows() + } + + fn and(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result { + Ok(lhs.intersection(&rhs)) + } + + fn or(&mut self, lhs: RowSelection, rhs: RowSelection) -> Result { + Ok(union_row_selections(&lhs, &rhs)) + } + + fn not(&mut self, _: RowSelection) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + "NOT unsupported at this point. NOT-rewrite should be performed first", + )) + } + + fn is_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + self.calc_row_selection( + field_id, + |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::NoneNull)), + MissingColBehavior::MightMatch, + ) + } + + fn not_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + self.calc_row_selection( + field_id, + |_max, _min, null_count| Ok(!matches!(null_count, PageNullCount::AllNull)), + MissingColBehavior::CantMatch, + ) + } + + fn is_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + // NaN counts not present in ColumnChunkMetadata Statistics. + // Only float columns can be NaN. + if reference.field().field_type.is_floating_type() { + self.select_all_rows() + } else { + self.skip_all_rows() + } + } + + fn not_nan( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + // NaN counts not present in ColumnChunkMetadata Statistics + self.select_all_rows() + } + + fn less_than( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.visit_inequality(reference, datum, PartialOrd::lt, true) + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.visit_inequality(reference, datum, PartialOrd::le, true) + } + + fn greater_than( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.visit_inequality(reference, datum, PartialOrd::gt, false) + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + self.visit_inequality(reference, datum, PartialOrd::ge, false) + } + + fn eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + self.calc_row_selection( + field_id, + |min, max, nulls| { + if matches!(nulls, PageNullCount::AllNull) { + return Ok(false); + } + + if let Some(min) = min { + if min.gt(datum) { + return Ok(false); + } + } + + if let Some(max) = max { + if max.lt(datum) { + return Ok(false); + } + } + + Ok(true) + }, + MissingColBehavior::CantMatch, + ) + } + + fn not_eq( + &mut self, + _reference: &BoundReference, + _datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + // Because the bounds are not necessarily a min or max value, + // this cannot be answered using them. notEq(col, X) with (X, Y) + // doesn't guarantee that X is a value in col. + self.select_all_rows() + } + + fn starts_with( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + let PrimitiveLiteral::String(datum) = datum.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string values", + )); + }; + + self.calc_row_selection( + field_id, + |min, max, nulls| { + if matches!(nulls, PageNullCount::AllNull) { + return Ok(false); + } + + if let Some(lower_bound) = min { + let PrimitiveLiteral::String(lower_bound) = lower_bound.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string lower_bound value", + )); + }; + + let prefix_length = lower_bound.chars().count().min(datum.chars().count()); + + // truncate lower bound so that its length + // is not greater than the length of prefix + let truncated_lower_bound = + lower_bound.chars().take(prefix_length).collect::(); + if datum < &truncated_lower_bound { + return Ok(false); + } + } + + if let Some(upper_bound) = max { + let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string upper_bound value", + )); + }; + + let prefix_length = upper_bound.chars().count().min(datum.chars().count()); + + // truncate upper bound so that its length + // is not greater than the length of prefix + let truncated_upper_bound = + upper_bound.chars().take(prefix_length).collect::(); + if datum > &truncated_upper_bound { + return Ok(false); + } + } + + Ok(true) + }, + MissingColBehavior::CantMatch, + ) + } + + fn not_starts_with( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + // notStartsWith will match unless all values must start with the prefix. + // This happens when the lower and upper bounds both start with the prefix. + + let PrimitiveLiteral::String(prefix) = datum.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string values", + )); + }; + + self.calc_row_selection( + field_id, + |min, max, nulls| { + if !matches!(nulls, PageNullCount::NoneNull) { + return Ok(true); + } + + let Some(lower_bound) = min else { + return Ok(true); + }; + + let PrimitiveLiteral::String(lower_bound_str) = lower_bound.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use NotStartsWith operator on non-string lower_bound value", + )); + }; + + if lower_bound_str < prefix { + // if lower is shorter than the prefix then lower doesn't start with the prefix + return Ok(true); + } + + let prefix_len = prefix.chars().count(); + + if lower_bound_str.chars().take(prefix_len).collect::() == *prefix { + // lower bound matches the prefix + + let Some(upper_bound) = max else { + return Ok(true); + }; + + let PrimitiveLiteral::String(upper_bound) = upper_bound.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use NotStartsWith operator on non-string upper_bound value", + )); + }; + + // if upper is shorter than the prefix then upper can't start with the prefix + if upper_bound.chars().count() < prefix_len { + return Ok(true); + } + + if upper_bound.chars().take(prefix_len).collect::() == *prefix { + // both bounds match the prefix, so all rows must match the + // prefix and therefore do not satisfy the predicate + return Ok(false); + } + } + + Ok(true) + }, + MissingColBehavior::MightMatch, + ) + } + + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + + if literals.len() > IN_PREDICATE_LIMIT { + // skip evaluating the predicate if the number of values is too big + return self.select_all_rows(); + } + self.calc_row_selection( + field_id, + |min, max, nulls| { + if matches!(nulls, PageNullCount::AllNull) { + return Ok(false); + } + + match (min, max) { + (Some(min), Some(max)) => { + if literals + .iter() + .all(|datum| datum.lt(&min) || datum.gt(&max)) + { + // if all values are outside the bounds, rows cannot match. + return Ok(false); + } + } + (Some(min), _) => { + if !literals.iter().any(|datum| datum.ge(&min)) { + // if none of the values are greater than the min bound, rows cant match + return Ok(false); + } + } + (_, Some(max)) => { + if !literals.iter().any(|datum| datum.le(&max)) { + // if all values are greater than upper bound, rows cannot match. + return Ok(false); + } + } + + _ => {} + } + + Ok(true) + }, + MissingColBehavior::CantMatch, + ) + } + + fn not_in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + // Because the bounds are not necessarily a min or max value, + // this cannot be answered using them. notIn(col, {X, ...}) + // with (X, Y) doesn't guarantee that X is a value in col. + self.select_all_rows() + } +} + +/// Combine two lists of `RowSelection` return the union of them +/// For example: +/// self: NNYYYYNNYYNYN +/// other: NYNNNNNNY +/// +/// returned: NYYYYYNNYYNYN +/// +/// This can be removed from here once RowSelection::union is in parquet::arrow +/// (Hopefully once https://github.com/apache/arrow-rs/pull/6308 gets merged) +fn union_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelection { + let mut l_iter = left.iter().copied().peekable(); + let mut r_iter = right.iter().copied().peekable(); + + let iter = std::iter::from_fn(move || { + loop { + let l = l_iter.peek_mut(); + let r = r_iter.peek_mut(); + + match (l, r) { + (Some(a), _) if a.row_count == 0 => { + l_iter.next().unwrap(); + } + (_, Some(b)) if b.row_count == 0 => { + r_iter.next().unwrap(); + } + (Some(l), Some(r)) => { + return match (l.skip, r.skip) { + // Skip both ranges + (true, true) => { + if l.row_count < r.row_count { + let skip = l.row_count; + r.row_count -= l.row_count; + l_iter.next(); + Some(RowSelector::skip(skip)) + } else { + let skip = r.row_count; + l.row_count -= skip; + r_iter.next(); + Some(RowSelector::skip(skip)) + } + } + // Keep rows from left + (false, true) => { + if l.row_count < r.row_count { + r.row_count -= l.row_count; + l_iter.next() + } else { + let r_row_count = r.row_count; + l.row_count -= r_row_count; + r_iter.next(); + Some(RowSelector::select(r_row_count)) + } + } + // Keep rows from right + (true, false) => { + if l.row_count < r.row_count { + let l_row_count = l.row_count; + r.row_count -= l_row_count; + l_iter.next(); + Some(RowSelector::select(l_row_count)) + } else { + l.row_count -= r.row_count; + r_iter.next() + } + } + // Keep at least one + _ => { + if l.row_count < r.row_count { + r.row_count -= l.row_count; + l_iter.next() + } else { + l.row_count -= r.row_count; + r_iter.next() + } + } + }; + } + (Some(_), None) => return l_iter.next(), + (None, Some(_)) => return r_iter.next(), + (None, None) => return None, + } + } + }); + + iter.collect() +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + use parquet::basic::{LogicalType as ParquetLogicalType, Type as ParquetPhysicalType}; + use parquet::data_type::ByteArray; + use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData}; + use parquet::file::page_index::index::{Index, NativeIndex, PageIndex}; + use parquet::file::statistics::Statistics; + use parquet::format::{BoundaryOrder, PageLocation}; + use parquet::schema::types::{ + ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as parquetSchemaType, + }; + use rand::{thread_rng, Rng}; + + use super::{union_row_selections, PageIndexEvaluator}; + use crate::expr::{Bind, Reference}; + use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; + use crate::{ErrorKind, Result}; + + #[test] + fn test_union_row_selections() { + let selection = RowSelection::from(vec![RowSelector::select(1048576)]); + let result = union_row_selections(&selection, &selection); + assert_eq!(result, selection); + + // NYNYY + let a = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(20), + ]); + + // NNYYN + let b = RowSelection::from(vec![ + RowSelector::skip(20), + RowSelector::select(20), + RowSelector::skip(10), + ]); + + let result = union_row_selections(&a, &b); + + // NYYYY + assert_eq!(result.iter().collect::>(), vec![ + &RowSelector::skip(10), + &RowSelector::select(40) + ]); + } + + #[test] + fn eval_matches_no_rows_for_empty_row_group() -> Result<()> { + let row_group_metadata = create_row_group_metadata(0, 0, None, 0, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .greater_than(Datum::float(1.0)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_is_null_select_only_pages_with_nulls() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .is_null() + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::select(1024), + RowSelector::skip(1024), + RowSelector::select(2048), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_is_not_null_dont_select_pages_with_all_nulls() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .is_not_null() + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::skip(1024), RowSelector::select(3072)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_is_nan_select_all() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .is_nan() + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::select(4096)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_not_nan_select_all() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .is_not_nan() + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::select(4096)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_inequality_nan_datum_all_rows_except_all_null_pages() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .less_than(Datum::float(f32::NAN)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::skip(1024), RowSelector::select(3072)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_inequality_pages_containing_value_except_all_null_pages() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .less_than(Datum::float(5.0)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::skip(1024), + RowSelector::select(1024), + RowSelector::skip(1024), + RowSelector::select(1024), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_eq_pages_containing_value_except_all_null_pages() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .equal_to(Datum::float(5.0)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::skip(1024), + RowSelector::select(1024), + RowSelector::skip(1024), + RowSelector::select(1024), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_not_eq_all_rows() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .not_equal_to(Datum::float(5.0)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::select(4096)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_starts_with_error_float_col() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .starts_with(Datum::float(5.0)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + ); + + assert_eq!(result.unwrap_err().kind(), ErrorKind::Unexpected); + + Ok(()) + } + + #[test] + fn eval_starts_with_pages_containing_value_except_all_null_pages() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_string") + .starts_with(Datum::string("B")) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::select(512), + RowSelector::skip(3536), + RowSelector::select(48), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_not_starts_with_pages_containing_value_except_pages_with_min_and_max_equal_to_prefix_and_all_null_pages( + ) -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_string") + .not_starts_with(Datum::string("DE")) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::select(512), + RowSelector::skip(512), + RowSelector::select(3072), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_in_length_of_set_above_limit_all_rows() -> Result<()> { + let mut rng = thread_rng(); + + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_float") + .is_in(std::iter::repeat_with(|| Datum::float(rng.gen_range(0.0..10.0))).take(1000)) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![RowSelector::select(4096)]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn eval_in_valid_set_size_some_rows() -> Result<()> { + let row_group_metadata = create_row_group_metadata(4096, 1000, None, 1000, None)?; + let (column_index, offset_index) = create_page_index()?; + + let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?; + + let filter = Reference::new("col_string") + .is_in([Datum::string("AARDVARK"), Datum::string("ICEBERG")]) + .bind(iceberg_schema_ref.clone(), false)?; + + let result = PageIndexEvaluator::eval( + &filter, + &column_index, + &offset_index, + &row_group_metadata, + &field_id_map, + iceberg_schema_ref.as_ref(), + )?; + + let expected = vec![ + RowSelector::select(512), + RowSelector::skip(512), + RowSelector::select(2976), + RowSelector::skip(48), + RowSelector::select(48), + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + fn build_iceberg_schema_and_field_map() -> Result<(Arc, HashMap)> { + let iceberg_schema = Schema::builder() + .with_fields([ + Arc::new(NestedField::new( + 1, + "col_float", + Type::Primitive(PrimitiveType::Float), + false, + )), + Arc::new(NestedField::new( + 2, + "col_string", + Type::Primitive(PrimitiveType::String), + false, + )), + ]) + .build()?; + let iceberg_schema_ref = Arc::new(iceberg_schema); + + let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]); + + Ok((iceberg_schema_ref, field_id_map)) + } + + fn build_parquet_schema_descriptor() -> Result> { + let field_1 = Arc::new( + parquetSchemaType::primitive_type_builder("col_float", ParquetPhysicalType::FLOAT) + .with_id(Some(1)) + .build()?, + ); + + let field_2 = Arc::new( + parquetSchemaType::primitive_type_builder( + "col_string", + ParquetPhysicalType::BYTE_ARRAY, + ) + .with_id(Some(2)) + .with_logical_type(Some(ParquetLogicalType::String)) + .build()?, + ); + + let group_type = Arc::new( + parquetSchemaType::group_type_builder("all") + .with_id(Some(1000)) + .with_fields(vec![field_1, field_2]) + .build()?, + ); + + let schema_descriptor = SchemaDescriptor::new(group_type); + let schema_descriptor_arc = Arc::new(schema_descriptor); + Ok(schema_descriptor_arc) + } + + fn create_row_group_metadata( + num_rows: i64, + col_1_num_vals: i64, + col_1_stats: Option, + col_2_num_vals: i64, + col_2_stats: Option, + ) -> Result { + let schema_descriptor_arc = build_parquet_schema_descriptor()?; + + let column_1_desc_ptr = Arc::new(ColumnDescriptor::new( + schema_descriptor_arc.column(0).self_type_ptr(), + 1, + 1, + ColumnPath::new(vec!["col_float".to_string()]), + )); + + let column_2_desc_ptr = Arc::new(ColumnDescriptor::new( + schema_descriptor_arc.column(1).self_type_ptr(), + 1, + 1, + ColumnPath::new(vec!["col_string".to_string()]), + )); + + let mut col_1_meta = + ColumnChunkMetaData::builder(column_1_desc_ptr).set_num_values(col_1_num_vals); + if let Some(stats1) = col_1_stats { + col_1_meta = col_1_meta.set_statistics(stats1) + } + + let mut col_2_meta = + ColumnChunkMetaData::builder(column_2_desc_ptr).set_num_values(col_2_num_vals); + if let Some(stats2) = col_2_stats { + col_2_meta = col_2_meta.set_statistics(stats2) + } + + let row_group_metadata = RowGroupMetaData::builder(schema_descriptor_arc) + .set_num_rows(num_rows) + .set_column_metadata(vec![ + col_1_meta.build()?, + // .set_statistics(Statistics::float(None, None, None, 1, false)) + col_2_meta.build()?, + ]) + .build(); + + Ok(row_group_metadata?) + } + + fn create_page_index() -> Result<(Vec, Vec>)> { + let idx_float = Index::FLOAT(NativeIndex:: { + indexes: vec![ + PageIndex { + min: None, + max: None, + null_count: Some(1024), + }, + PageIndex { + min: Some(0.0), + max: Some(10.0), + null_count: Some(0), + }, + PageIndex { + min: Some(10.0), + max: Some(20.0), + null_count: Some(1), + }, + PageIndex { + min: None, + max: None, + null_count: None, + }, + ], + boundary_order: BoundaryOrder(0), // UNORDERED + }); + + let idx_string = Index::BYTE_ARRAY(NativeIndex:: { + indexes: vec![ + PageIndex { + min: Some("AA".into()), + max: Some("DD".into()), + null_count: Some(0), + }, + PageIndex { + min: Some("DE".into()), + max: Some("DE".into()), + null_count: Some(0), + }, + PageIndex { + min: Some("DF".into()), + max: Some("UJ".into()), + null_count: Some(1), + }, + PageIndex { + min: None, + max: None, + null_count: Some(48), + }, + PageIndex { + min: None, + max: None, + null_count: None, + }, + ], + boundary_order: BoundaryOrder(0), // UNORDERED + }); + + let page_locs_float = vec![ + PageLocation::new(0, 1024, 0), + PageLocation::new(1024, 1024, 1024), + PageLocation::new(2048, 1024, 2048), + PageLocation::new(3072, 1024, 3072), + ]; + + let page_locs_string = vec![ + PageLocation::new(0, 512, 0), + PageLocation::new(512, 512, 512), + PageLocation::new(1024, 2976, 1024), + PageLocation::new(4000, 48, 4000), + PageLocation::new(4048, 48, 4048), + ]; + + Ok((vec![idx_float, idx_string], vec![ + page_locs_float, + page_locs_string, + ])) + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 45d7d4fd1..c5cc4cb12 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -61,6 +61,7 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_entries: usize, concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, + row_selection_enabled: bool, } impl<'a> TableScanBuilder<'a> { @@ -78,6 +79,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_entries: num_cpus, concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, + row_selection_enabled: false, } } @@ -157,6 +159,25 @@ impl<'a> TableScanBuilder<'a> { self } + /// Determines whether to enable row selection. + /// When enabled, if a read is performed with a filter predicate, + /// then (for row groups that have not been skipped) the page index + /// for each row group in a parquet file is parsed and evaluated + /// against the filter predicate to determine if ranges of rows + /// within a row group can be skipped, based upon the page-level + /// statistics for each column. + /// + /// Defaults to being disabled. Enabling requires parsing the parquet page + /// index, which can be slow enough that parsing the page index outweighs any + /// gains from the reduced number of rows that need scanning. + /// It is recommended to experiment with partitioning, sorting, row group size, + /// page size, and page row limit Iceberg settings on the table being scanned in + /// order to get the best performance from using row selection. + pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self { + self.row_selection_enabled = row_selection_enabled; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -268,6 +289,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, + row_selection_enabled: self.row_selection_enabled, }) } } @@ -292,6 +314,7 @@ pub struct TableScan { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, + row_selection_enabled: bool, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -378,7 +401,8 @@ impl TableScan { pub async fn to_arrow(&self) -> Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) - .with_row_group_filtering_enabled(self.row_group_filtering_enabled); + .with_row_group_filtering_enabled(self.row_group_filtering_enabled) + .with_row_selection_enabled(self.row_selection_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);