Skip to content

Commit a066242

Browse files
committed
feat(wip): add ManifestEvalVisitor and InclusiveProjection
1 parent e246b2e commit a066242

File tree

3 files changed

+68
-22
lines changed

3 files changed

+68
-22
lines changed

crates/iceberg/src/expr/predicate.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl<T: Debug, const N: usize> Debug for LogicalExpression<T, N> {
4646
}
4747

4848
impl<T, const N: usize> LogicalExpression<T, N> {
49-
fn new(inputs: [Box<T>; N]) -> Self {
49+
pub(crate) fn new(inputs: [Box<T>; N]) -> Self {
5050
Self { inputs }
5151
}
5252

@@ -82,9 +82,9 @@ where
8282
#[derive(PartialEq)]
8383
pub struct UnaryExpression<T> {
8484
/// Operator of this predicate, must be single operand operator.
85-
op: PredicateOperator,
85+
pub(crate) op: PredicateOperator,
8686
/// Term of this predicate, for example, `a` in `a IS NULL`.
87-
term: T,
87+
pub(crate) term: T,
8888
}
8989

9090
impl<T: Debug> Debug for UnaryExpression<T> {
@@ -116,13 +116,17 @@ impl<T> UnaryExpression<T> {
116116
debug_assert!(op.is_unary());
117117
Self { op, term }
118118
}
119+
120+
pub(crate) fn field_id(&self) -> i32 {
121+
self.term.field().id
122+
}
119123
}
120124

121125
/// Binary predicate, for example, `a > 10`.
122126
#[derive(PartialEq)]
123127
pub struct BinaryExpression<T> {
124128
/// Operator of this predicate, must be binary operator, such as `=`, `>`, `<`, etc.
125-
op: PredicateOperator,
129+
pub(crate) op: PredicateOperator,
126130
/// Term of this predicate, for example, `a` in `a > 10`.
127131
term: T,
128132
/// Literal of this predicate, for example, `10` in `a > 10`.
@@ -144,6 +148,10 @@ impl<T> BinaryExpression<T> {
144148
debug_assert!(op.is_binary());
145149
Self { op, term, literal }
146150
}
151+
152+
pub(crate) fn field_id(&self) -> i32 {
153+
self.term.field().id
154+
}
147155
}
148156

149157
impl<T: Display> Display for BinaryExpression<T> {
@@ -165,7 +173,7 @@ impl<T: Bind> Bind for BinaryExpression<T> {
165173
#[derive(PartialEq)]
166174
pub struct SetExpression<T> {
167175
/// Operator of this predicate, must be set operator, such as `IN`, `NOT IN`, etc.
168-
op: PredicateOperator,
176+
pub(crate) op: PredicateOperator,
169177
/// Term of this predicate, for example, `a` in `a in (1, 2, 3)`.
170178
term: T,
171179
/// Literals of this predicate, for example, `(1, 2, 3)` in `a in (1, 2, 3)`.
@@ -187,6 +195,10 @@ impl<T> SetExpression<T> {
187195
debug_assert!(op.is_set());
188196
Self { op, term, literals }
189197
}
198+
199+
pub(crate) fn field_id(&self) -> i32 {
200+
self.term.field().id
201+
}
190202
}
191203

192204
impl<T: Bind> Bind for SetExpression<T> {
@@ -209,6 +221,9 @@ impl<T: Display + Debug> Display for SetExpression<T> {
209221
/// Unbound predicate expression before binding to a schema.
210222
#[derive(Debug, PartialEq)]
211223
pub enum Predicate {
224+
AlwaysTrue,
225+
AlwaysFalse,
226+
212227
/// And predicate, for example, `a > 10 AND b < 20`.
213228
And(LogicalExpression<Predicate, 2>),
214229
/// Or predicate, for example, `a > 10 OR b < 20`.
@@ -366,6 +381,12 @@ impl Bind for Predicate {
366381
impl Display for Predicate {
367382
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
368383
match self {
384+
Predicate::AlwaysTrue => {
385+
write!(f, "TRUE")
386+
}
387+
Predicate::AlwaysFalse => {
388+
write!(f, "FALSE")
389+
}
369390
Predicate::And(expr) => {
370391
write!(f, "({}) AND ({})", expr.inputs()[0], expr.inputs()[1])
371392
}
@@ -453,6 +474,8 @@ impl Predicate {
453474
/// ```
454475
pub fn negate(self) -> Predicate {
455476
match self {
477+
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
478+
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
456479
Predicate::And(expr) => Predicate::Or(LogicalExpression::new(
457480
expr.inputs.map(|expr| Box::new(expr.negate())),
458481
)),
@@ -599,6 +622,14 @@ impl Display for BoundPredicate {
599622
}
600623
}
601624

625+
pub(crate) trait PredicateVisitor<T> {
626+
fn visit(predicate: Predicate) -> T;
627+
}
628+
629+
pub(crate) trait BoundPredicateVisitor<T> {
630+
fn visit(predicate: BoundPredicate) -> T;
631+
}
632+
602633
#[cfg(test)]
603634
mod tests {
604635
use std::ops::Not;

crates/iceberg/src/scan.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@
1919
2020
use crate::arrow::ArrowReaderBuilder;
2121
use crate::expr::BoundPredicate::AlwaysTrue;
22-
use crate::expr::{Bind, BoundPredicate, BoundReference, LogicalExpression, Predicate, PredicateOperator};
22+
use crate::expr::{Bind, BoundPredicate, LogicalExpression, Predicate, PredicateOperator};
2323
use crate::io::FileIO;
2424
use crate::spec::{
25-
DataContentType, FieldSummary, Manifest, ManifestEntry, ManifestEntryRef, ManifestFile,
26-
PartitionField, PartitionSpec, PartitionSpecRef, Schema, SchemaRef, SnapshotRef,
27-
TableMetadataRef, Transform,
25+
DataContentType, FieldSummary, ManifestEntryRef, ManifestFile, PartitionField,
26+
PartitionSpecRef, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
2827
};
2928
use crate::table::Table;
3029
use crate::{Error, ErrorKind};
3130
use arrow_array::RecordBatch;
31+
use async_stream::try_stream;
3232
use futures::stream::{iter, BoxStream};
3333
use futures::StreamExt;
3434
use std::collections::HashMap;
3535
use std::ops::Deref;
3636
use std::sync::Arc;
37-
use async_stream::try_stream;
3837

3938
/// Builder to create table scan.
4039
pub struct TableScanBuilder<'a> {
@@ -173,7 +172,6 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
173172
impl TableScan {
174173
/// Returns a stream of file scan tasks.
175174
176-
177175
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
178176
// Cache `PartitionEvaluator`s created as part of this scan
179177
let mut partition_evaluator_cache: HashMap<i32, PartitionEvaluator> = HashMap::new();
@@ -201,7 +199,7 @@ impl TableScan {
201199
Some(
202200
partition_evaluator_cache
203201
.entry(manifest.partition_spec_id())
204-
.or_insert_with_key(self.create_partition_evaluator(filter))
202+
.or_insert_with_key(|key| self.create_partition_evaluator(key, filter))
205203
.deref(),
206204
)
207205
} else {
@@ -236,17 +234,19 @@ impl TableScan {
236234
}
237235
}
238236
}
239-
}.boxed())
237+
}
238+
.boxed())
240239
}
241240

242-
fn create_partition_evaluator(&self, filter: &Predicate) -> fn(&i32) -> crate::Result<PartitionEvaluator> {
243-
|&id| {
244-
// TODO: predicate binding not yet merged to main
245-
let bound_predicate = filter.bind(self.schema.clone(), self.case_sensitive)?;
241+
fn create_partition_evaluator(&self, id: &i32, filter: &Predicate) -> PartitionEvaluator {
242+
// TODO: predicate binding not yet merged to main
243+
let bound_predicate = filter
244+
.bind(self.schema.clone(), self.case_sensitive)
245+
.unwrap();
246246

247-
let partition_spec = self.table_metadata.partition_spec_by_id(id).unwrap();
248-
PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone())
249-
}
247+
let partition_spec = self.table_metadata.partition_spec_by_id(*id).unwrap();
248+
PartitionEvaluator::new(partition_spec.clone(), bound_predicate, self.schema.clone())
249+
.unwrap()
250250
}
251251

252252
pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
@@ -315,7 +315,11 @@ struct ManifestEvalVisitor {
315315
}
316316

317317
impl ManifestEvalVisitor {
318-
fn new(partition_schema: SchemaRef, partition_filter: Predicate, case_sensitive: bool) -> crate::Result<Self> {
318+
fn new(
319+
partition_schema: SchemaRef,
320+
partition_filter: Predicate,
321+
case_sensitive: bool,
322+
) -> crate::Result<Self> {
319323
let partition_filter = partition_filter.bind(partition_schema.clone(), case_sensitive)?;
320324

321325
Ok(Self {
@@ -485,7 +489,11 @@ impl InclusiveProjection {
485489
}
486490

487491
parts.iter().fold(Predicate::AlwaysTrue, |res, &part| {
488-
res.and(part.transform.project(&part.name, &predicate))
492+
if let Some(pred_for_part) = part.transform.project(&part.name, &predicate)? {
493+
res.and(pred_for_part)
494+
} else {
495+
res
496+
}
489497
})
490498
}
491499
}

crates/iceberg/src/spec/transform.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Transforms in iceberg.
1919
2020
use crate::error::{Error, Result};
21+
use crate::expr::{BoundPredicate, Predicate};
2122
use crate::spec::datatypes::{PrimitiveType, Type};
2223
use crate::ErrorKind;
2324
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -261,6 +262,12 @@ impl Transform {
261262
_ => self == other,
262263
}
263264
}
265+
266+
pub fn project(&self, name: &str, predicate: &BoundPredicate) -> Result<Option<Predicate>> {
267+
// Waiting on https://github.com/apache/iceberg-rust/pull/269
268+
// to deliver https://github.com/apache/iceberg-rust/issues/264
269+
todo!()
270+
}
264271
}
265272

266273
impl Display for Transform {

0 commit comments

Comments
 (0)