Skip to content

Commit

Permalink
fix: Hybrid planning when the root plan has local preference
Browse files Browse the repository at this point in the history
Fixes: #2104

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Dec 4, 2023
1 parent b34edb9 commit 73dad61
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 166 deletions.
10 changes: 10 additions & 0 deletions crates/datafusion_ext/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ pub trait TreeNodeExt: TreeNode {
let new_node = op(after_op_children)?.into();
Ok(new_node)
}

/// Like `transform_down`, but allows providing a closure with mutable access
/// to the environment.
fn transform_down_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op = op(self)?.into();
after_op.map_children(|node| node.transform_down_mut(op))
}
}

impl<T: DynTreeNode + ?Sized> TreeNodeExt for Arc<T> {}
26 changes: 17 additions & 9 deletions crates/sqlexec/src/planner/physical_plan/remote_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::AggregateMetricsStreamAdapter;
use datafusion_ext::runtime::runtime_group::RuntimeGroupExec;
use futures::{stream, TryStreamExt};
use protogen::metastore::types::catalog::RuntimePreference;
use std::any::Any;
use std::fmt;
use std::hash::Hash;
Expand Down Expand Up @@ -78,21 +80,27 @@ pub struct RemoteScanExec {
}

impl RemoteScanExec {
/// Returns a remote scan exec wrapped inside a [`RuntimeGroupExec`]
/// asserting that this can only be run on the remote side.
#[allow(clippy::new_ret_no_self)]
pub fn new(
provider: ProviderReference,
projected_schema: Arc<Schema>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
limit: Option<usize>,
) -> Self {
Self {
provider,
projected_schema,
projection,
filters,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
) -> RuntimeGroupExec {
RuntimeGroupExec::new(
RuntimePreference::Remote,
Arc::new(Self {
provider,
projected_schema,
projection,
filters,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}),
)
}
}

Expand Down
Loading

0 comments on commit 73dad61

Please sign in to comment.