From b164328c38a6158fcd6ba6b4897d282a314d2c55 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Sat, 16 Dec 2023 19:11:40 +0300 Subject: [PATCH] post-merge fixes --- datafusion/physical-plan/src/joins/hash_join.rs | 5 +---- datafusion/physical-plan/src/joins/stream_join_utils.rs | 8 ++++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index f44995d73153..13ac06ee301c 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1159,10 +1159,7 @@ impl HashJoinStream { fn poll_next_impl( &mut self, cx: &mut std::task::Context<'_>, - ) -> Poll>> - where - Self: Send, - { + ) -> Poll>> { loop { return match self.state { HashJoinStreamState::WaitBuildSide => { diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index f54f902b648b..64a976a1e39f 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -759,7 +759,7 @@ pub trait EagerJoinStream { match self.right_stream().next().await { Some(Ok(batch)) => { if batch.num_rows() == 0 { - return Ok(StreamJoinStateResult::Continue); + return Ok(StatefulStreamResult::Continue); } self.set_state(EagerJoinStreamState::PullLeft); @@ -788,7 +788,7 @@ pub trait EagerJoinStream { match self.left_stream().next().await { Some(Ok(batch)) => { if batch.num_rows() == 0 { - return Ok(StreamJoinStateResult::Continue); + return Ok(StatefulStreamResult::Continue); } self.set_state(EagerJoinStreamState::PullRight); self.process_batch_from_left(batch) @@ -817,7 +817,7 @@ pub trait EagerJoinStream { match self.left_stream().next().await { Some(Ok(batch)) => { if batch.num_rows() == 0 { - return Ok(StreamJoinStateResult::Continue); + return Ok(StatefulStreamResult::Continue); } self.process_batch_after_right_end(batch) } @@ -847,7 +847,7 @@ pub trait EagerJoinStream { match self.right_stream().next().await { Some(Ok(batch)) => { if batch.num_rows() == 0 { - return Ok(StreamJoinStateResult::Continue); + return Ok(StatefulStreamResult::Continue); } self.process_batch_after_left_end(batch) }