From e8d228a284b693296e9b521e4fe407ffc6a1bfab Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 31 Oct 2024 17:13:44 -0700 Subject: [PATCH] Nested loop join --- crates/execution/src/iter.rs | 87 +++++++++++++++++------------------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index d6149ecc2d0..8db3e2c9c7c 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -101,14 +101,14 @@ pub enum IterOp { /// An index scan opcode takes 2 args: /// 1. An [IndexId] /// 2. A ptr to an [AlgebraicValue] - IndexScanEq(IndexId, u16), + IxScanEq(IndexId, u16), /// An index range scan opcode takes 3 args: /// 1. An [IndexId] /// 2. A ptr to the lower bound /// 3. A ptr to the upper bound - IndexScanRange(IndexId, Bound, Bound), - /// A cross join has 2 args, but its opcode has none - CrossJoin, + IxScanRange(IndexId, Bound, Bound), + /// Pops its 2 args from the stack + NLJoin, /// An index join opcode takes 2 args: /// 1. An [IndexId] /// 2. An instruction ptr @@ -162,21 +162,21 @@ impl CachedIterPlan { // Push delta scan stack.push(Iter::DeltaScan(tx.delta_scan_iter(table_id))); } - IterOp::IndexScanEq(index_id, ptr) => { + IterOp::IxScanEq(index_id, ptr) => { // Push index scan stack.push(Iter::IndexScan(tx.index_scan_iter(index_id, &self.constant(ptr)))); } - IterOp::IndexScanRange(index_id, lower, upper) => { + IterOp::IxScanRange(index_id, lower, upper) => { // Push range scan let lower = lower.map(|ptr| self.constant(ptr)); let upper = upper.map(|ptr| self.constant(ptr)); stack.push(Iter::IndexScan(tx.index_scan_iter(index_id, &(lower, upper)))); } - IterOp::CrossJoin => { - // Pop args and push cross join + IterOp::NLJoin => { + // Pop args and push nested loop join let rhs = stack.pop().unwrap(); let lhs = stack.pop().unwrap(); - stack.push(Iter::CrossJoin(CrossJoinIter::new(lhs, rhs))); + stack.push(Iter::NLJoin(NestedLoopJoin::new(lhs, rhs))); } IterOp::IxJoin(index_id, i, n) => { // Pop arg and push index join @@ -187,7 +187,7 @@ impl CachedIterPlan { let ops = &self.expr_ops[i..i + n as usize]; let program = ExprProgram::new(ops, &self.constants); let projection = ProgramEvaluator::from(program); - stack.push(Iter::IxJoin(IxJoin::Eq(IndexJoin::new( + stack.push(Iter::IxJoin(LeftDeepJoin::Eq(IndexJoin::new( input, index, table, blob_store, projection, )))); } @@ -200,7 +200,7 @@ impl CachedIterPlan { let ops = &self.expr_ops[i..i + n as usize]; let program = ExprProgram::new(ops, &self.constants); let projection = ProgramEvaluator::from(program); - stack.push(Iter::UniqueIxJoin(IxJoin::Eq(UniqueIndexJoin::new( + stack.push(Iter::UniqueIxJoin(LeftDeepJoin::Eq(UniqueIndexJoin::new( input, index, table, blob_store, projection, )))); } @@ -230,12 +230,12 @@ pub enum Iter<'a> { DeltaScan(DeltaScanIter<'a>), /// A [RowRef] index iterator IndexScan(IndexScanIter<'a>), - /// A cross product iterator - CrossJoin(CrossJoinIter<'a>), + /// A nested loop join iterator + NLJoin(NestedLoopJoin<'a>), /// A non-unique (constraint) index join iterator - IxJoin(IxJoin>), + IxJoin(LeftDeepJoin>), /// A unique (constraint) index join iterator - UniqueIxJoin(IxJoin>), + UniqueIxJoin(LeftDeepJoin>), /// A tuple-at-a-time filter iterator Filter(Filter<'a>), } @@ -269,7 +269,7 @@ impl<'a> Iterator for Iter<'a> { // Filter is a passthru iter.next() } - Self::CrossJoin(iter) => { + Self::NLJoin(iter) => { iter.next().map(|t| { match t { // A leaf join @@ -287,8 +287,7 @@ impl<'a> Iterator for Iter<'a> { // / \ // b c (Tuple::Row(r), Tuple::Join(mut rows)) => { - // Returns (n+1)-tuples, - // if the rhs returns n-tuples. + // Returns an (n+1)-tuple let mut pointers = vec![r]; pointers.append(&mut rows); Tuple::Join(pointers) @@ -300,8 +299,7 @@ impl<'a> Iterator for Iter<'a> { // / \ // a b (Tuple::Join(mut rows), Tuple::Row(r)) => { - // Returns (n+1)-tuples, - // if the lhs returns n-tuples. + // Returns an (n+1)-tuple rows.push(r); Tuple::Join(rows) } @@ -313,9 +311,7 @@ impl<'a> Iterator for Iter<'a> { // / \ / \ // a b c d (Tuple::Join(mut lhs), Tuple::Join(mut rhs)) => { - // Returns (n+m)-tuples, - // if the lhs returns n-tuples, - // if the rhs returns m-tuples. + // Returns an (n+m)-tuple lhs.append(&mut rhs); Tuple::Join(lhs) } @@ -326,8 +322,8 @@ impl<'a> Iterator for Iter<'a> { } } -/// An iterator for an index join -pub enum IxJoin { +/// An iterator for a left deep join tree +pub enum LeftDeepJoin { /// A standard join Eq(Iter), /// A semijoin that returns the lhs @@ -336,7 +332,7 @@ pub enum IxJoin { SemiRhs(Iter), } -impl<'a, Iter> Iterator for IxJoin +impl<'a, Iter> Iterator for LeftDeepJoin where Iter: Iterator, RowRef<'a>)>, { @@ -370,7 +366,7 @@ where // / \ // a b (Tuple::Join(mut rows), ptr) => { - // Returns an n+1 tuple + // Returns an (n+1)-tuple rows.push(Row::Ptr(ptr)); Tuple::Join(rows) } @@ -496,9 +492,8 @@ impl<'a> Iterator for IndexJoin<'a> { } } -/// A cross join returns the cross product of its two inputs. -/// It materializes the rhs and streams the lhs. -pub struct CrossJoinIter<'a> { +/// A nested loop join returns the cross product of its inputs +pub struct NestedLoopJoin<'a> { /// The lhs input lhs: Box>, /// The rhs input @@ -511,7 +506,7 @@ pub struct CrossJoinIter<'a> { rhs_ptr: usize, } -impl<'a> CrossJoinIter<'a> { +impl<'a> NestedLoopJoin<'a> { fn new(lhs: Iter<'a>, rhs: Iter<'a>) -> Self { Self { lhs: Box::new(lhs), @@ -523,25 +518,27 @@ impl<'a> CrossJoinIter<'a> { } } -impl<'a> Iterator for CrossJoinIter<'a> { +impl<'a> Iterator for NestedLoopJoin<'a> { type Item = (Tuple<'a>, Tuple<'a>); fn next(&mut self) -> Option { - // Materialize the rhs on the first call - if self.build.is_empty() { - self.build = self.rhs.as_mut().collect(); - self.lhs_row = self.lhs.next(); - self.rhs_ptr = 0; + for t in self.rhs.as_mut() { + self.build.push(t); } - // Reset the rhs pointer - if self.rhs_ptr == self.build.len() { - self.lhs_row = self.lhs.next(); - self.rhs_ptr = 0; + match self.build.get(self.rhs_ptr) { + Some(v) => { + self.rhs_ptr += 1; + self.lhs_row.as_ref().map(|u| (u.clone(), v.clone())) + } + None => { + self.rhs_ptr = 1; + self.lhs_row = self.lhs.next(); + self.lhs_row + .as_ref() + .zip(self.build.first()) + .map(|(u, v)| (u.clone(), v.clone())) + } } - self.lhs_row.as_ref().map(|lhs_tuple| { - self.rhs_ptr += 1; - (lhs_tuple.clone(), self.build[self.rhs_ptr - 1].clone()) - }) } }