diff --git a/crates/datafusion_ext/src/transform.rs b/crates/datafusion_ext/src/transform.rs index 5fd5d56241..f54dacc281 100644 --- a/crates/datafusion_ext/src/transform.rs +++ b/crates/datafusion_ext/src/transform.rs @@ -15,6 +15,16 @@ pub trait TreeNodeExt: TreeNode { let new_node = op(after_op_children)?.into(); Ok(new_node) } + + /// Like `transform_down`, but allows providing a closure with mutable access + /// to the environment. + fn transform_down_mut(self, op: &mut F) -> Result + where + F: FnMut(Self) -> Result>, + { + let after_op = op(self)?.into(); + after_op.map_children(|node| node.transform_down_mut(op)) + } } impl TreeNodeExt for Arc {} diff --git a/crates/sqlexec/src/planner/physical_plan/remote_scan.rs b/crates/sqlexec/src/planner/physical_plan/remote_scan.rs index e236e2209e..05366f2e71 100644 --- a/crates/sqlexec/src/planner/physical_plan/remote_scan.rs +++ b/crates/sqlexec/src/planner/physical_plan/remote_scan.rs @@ -14,7 +14,9 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::Expr; use datafusion_ext::metrics::AggregateMetricsStreamAdapter; +use datafusion_ext::runtime::runtime_group::RuntimeGroupExec; use futures::{stream, TryStreamExt}; +use protogen::metastore::types::catalog::RuntimePreference; use std::any::Any; use std::fmt; use std::hash::Hash; @@ -78,21 +80,27 @@ pub struct RemoteScanExec { } impl RemoteScanExec { + /// Returns a remote scan exec wrapped inside a [`RuntimeGroupExec`] + /// asserting that this can only be run on the remote side. + #[allow(clippy::new_ret_no_self)] pub fn new( provider: ProviderReference, projected_schema: Arc, projection: Option>, filters: Vec, limit: Option, - ) -> Self { - Self { - provider, - projected_schema, - projection, - filters, - limit, - metrics: ExecutionPlanMetricsSet::new(), - } + ) -> RuntimeGroupExec { + RuntimeGroupExec::new( + RuntimePreference::Remote, + Arc::new(Self { + provider, + projected_schema, + projection, + filters, + limit, + metrics: ExecutionPlanMetricsSet::new(), + }), + ) } } diff --git a/crates/sqlexec/src/remote/planner.rs b/crates/sqlexec/src/remote/planner.rs index aac6857faa..d0ad7962d8 100644 --- a/crates/sqlexec/src/remote/planner.rs +++ b/crates/sqlexec/src/remote/planner.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use catalog::session_catalog::SessionCatalog; use datafusion::arrow::datatypes::Schema; use datafusion::common::tree_node::Transformed; use datafusion::common::DFSchema; @@ -57,7 +58,6 @@ use crate::planner::physical_plan::send_recv::SendRecvJoinExec; use crate::planner::physical_plan::set_var::SetVarExec; use crate::planner::physical_plan::show_var::ShowVarExec; use crate::planner::physical_plan::update::UpdateExec; -use catalog::session_catalog::SessionCatalog; use super::client::RemoteSessionClient; @@ -81,7 +81,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { ) -> Result>> { let extension_type = node.name().parse::().unwrap(); - match extension_type { + let runtime_group_exec = match extension_type { ExtensionType::AlterDatabase => { let lp = require_downcast_lp::(node); let exec = AlterDatabaseExec { @@ -89,7 +89,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { name: lp.name.to_string(), operation: lp.operation.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::AlterTable => { let lp = require_downcast_lp::(node); @@ -99,7 +99,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { name: lp.name.to_owned(), operation: lp.operation.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::AlterTunnelRotateKeys => { let lp = require_downcast_lp::(node); @@ -109,7 +109,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { if_exists: lp.if_exists, new_ssh_key: lp.new_ssh_key.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateCredential => { let lp = require_downcast_lp::(node); @@ -120,7 +120,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { comment: lp.comment.clone(), or_replace: lp.or_replace, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateCredentials => { let lp = require_downcast_lp::(node); @@ -131,78 +131,83 @@ impl ExtensionPlanner for DDLExtensionPlanner { comment: lp.comment.clone(), or_replace: lp.or_replace, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateExternalDatabase => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateExternalDatabaseExec { + let exec = CreateExternalDatabaseExec { catalog_version: self.catalog.version(), database_name: lp.database_name.clone(), if_not_exists: lp.if_not_exists, options: lp.options.clone(), tunnel: lp.tunnel.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateExternalTable => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateExternalTableExec { + let exec = CreateExternalTableExec { catalog_version: self.catalog.version(), tbl_reference: lp.tbl_reference.clone(), or_replace: lp.or_replace, if_not_exists: lp.if_not_exists, tunnel: lp.tunnel.clone(), table_options: lp.table_options.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateSchema => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateSchemaExec { + let exec = CreateSchemaExec { catalog_version: self.catalog.version(), schema_reference: lp.schema_reference.clone(), if_not_exists: lp.if_not_exists, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateTable => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateTableExec { + let exec = CreateTableExec { catalog_version: self.catalog.version(), tbl_reference: lp.tbl_reference.clone(), if_not_exists: lp.if_not_exists, or_replace: lp.or_replace, arrow_schema: Arc::new(lp.schema.as_ref().into()), source: physical_inputs.get(0).cloned(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateTempTable => { let lp = require_downcast_lp::(node); - let exec = Arc::new(CreateTempTableExec { + let exec = CreateTempTableExec { tbl_reference: lp.tbl_reference.clone(), if_not_exists: lp.if_not_exists, or_replace: lp.or_replace, arrow_schema: Arc::new(lp.schema.as_ref().into()), source: physical_inputs.get(0).cloned(), - }); - let exec = Arc::new(RuntimeGroupExec::new(RuntimePreference::Local, exec)); - Ok(Some(exec)) + }; + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::CreateTunnel => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateTunnelExec { + let exec = CreateTunnelExec { catalog_version: self.catalog.version(), name: lp.name.clone(), if_not_exists: lp.if_not_exists, options: lp.options.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateView => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateViewExec { + let exec = CreateViewExec { catalog_version: self.catalog.version(), view_reference: lp.view_reference.clone(), sql: lp.sql.clone(), columns: lp.columns.clone(), or_replace: lp.or_replace, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DescribeTable => { let DescribeTable { entry } = require_downcast_lp::(node); @@ -211,13 +216,10 @@ impl ExtensionPlanner for DDLExtensionPlanner { } else { RuntimePreference::Remote }; - let exec = Arc::new(DescribeTableExec { + let exec = DescribeTableExec { entry: entry.clone(), - }); - - let exec = Arc::new(RuntimeGroupExec::new(runtime, exec)); - - Ok(Some(exec)) + }; + RuntimeGroupExec::new(runtime, Arc::new(exec)) } ExtensionType::DropTables => { let plan = require_downcast_lp::(node); @@ -241,10 +243,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { ))); } } - let exec: Arc = match ( - temp_table_drops.is_empty(), - drops.is_empty(), - ) { + match (temp_table_drops.is_empty(), drops.is_empty()) { // both temp and remote tables (false, false) => { return Err(DataFusionError::Plan("Unable to drop temp and native tables in the same statement. Please use separate statements.".to_string()))?; @@ -256,8 +255,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { tbl_references: temp_table_drops, if_exists: plan.if_exists, }); - - Arc::new(RuntimeGroupExec::new(RuntimePreference::Local, tmp_exec)) + RuntimeGroupExec::new(RuntimePreference::Local, tmp_exec) } // only remote tables (true, false) => { @@ -266,24 +264,22 @@ impl ExtensionPlanner for DDLExtensionPlanner { tbl_references: drops, if_exists: plan.if_exists, }); - let exec = RuntimeGroupExec::new(RuntimePreference::Remote, exec); - Arc::new(exec) + RuntimeGroupExec::new(RuntimePreference::Remote, exec) } // no tables (true, true) => { return Err(DataFusionError::Plan("No tables to drop".to_string()))? } - }; - - Ok(Some(exec)) + } } ExtensionType::DropCredentials => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(DropCredentialsExec { + let exec = DropCredentialsExec { catalog_version: self.catalog.version(), names: lp.names.clone(), if_exists: lp.if_exists, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropDatabase => { let lp = require_downcast_lp::(node); @@ -292,7 +288,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { names: lp.names.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropSchemas => { let lp = require_downcast_lp::(node); @@ -302,16 +298,16 @@ impl ExtensionPlanner for DDLExtensionPlanner { if_exists: lp.if_exists, cascade: lp.cascade, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropTunnel => { let lp = require_downcast_lp::(node); - let exec: DropTunnelExec = DropTunnelExec { + let exec = DropTunnelExec { catalog_version: self.catalog.version(), names: lp.names.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropViews => { let lp = require_downcast_lp::(node); @@ -321,7 +317,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { view_references: lp.view_references.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::SetVariable => { let lp = require_downcast_lp::(node); @@ -329,16 +325,14 @@ impl ExtensionPlanner for DDLExtensionPlanner { variable: lp.variable.clone(), values: lp.values.clone(), }; - let exec = RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)); - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::ShowVariable => { let lp = require_downcast_lp::(node); let exec = ShowVarExec { variable: lp.variable.clone(), }; - let exec = RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)); - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::CopyTo => { let lp = require_downcast_lp::(node); @@ -346,7 +340,6 @@ impl ExtensionPlanner for DDLExtensionPlanner { CopyToDestinationOptions::Local(_) => RuntimePreference::Local, _ => RuntimePreference::Remote, }; - let exec = Arc::new(CopyToExec { format: lp.format.clone(), dest: lp.dest.clone(), @@ -354,16 +347,16 @@ impl ExtensionPlanner for DDLExtensionPlanner { physical_inputs.get(0).unwrap().clone(), )), }); - let exec = Arc::new(RuntimeGroupExec::new(runtime, exec)); - Ok(Some(exec)) + RuntimeGroupExec::new(runtime, exec) } ExtensionType::Update => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(UpdateExec { + let exec = UpdateExec { table: lp.table.clone(), updates: lp.updates.clone(), where_expr: lp.where_expr.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::Insert => { let lp = require_downcast_lp::(node); @@ -381,17 +374,19 @@ impl ExtensionPlanner for DDLExtensionPlanner { physical_inputs.get(0).unwrap().clone(), )), }); - let exec = Arc::new(RuntimeGroupExec::new(lp.runtime_preference, exec)); - Ok(Some(exec)) + RuntimeGroupExec::new(lp.runtime_preference, exec) } ExtensionType::Delete => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(DeleteExec { + let exec = DeleteExec { table: lp.table.clone(), where_expr: lp.where_expr.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } - } + }; + + Ok(Some(Arc::new(runtime_group_exec))) } } @@ -403,85 +398,183 @@ pub struct RemotePhysicalPlanner<'a> { } impl<'a> RemotePhysicalPlanner<'a> { - /// Replace all local runtime groups that are not the root of the plan with - /// equivalent client recv execs. - /// - /// The modifed execution plan with client recv execs will be returned, - /// along with the send execs that will be responsible for pushing batches - /// to the remote node. - /// - /// This should be ran after all optimizations have been made to the - /// physical plan. - fn replace_local_runtime_groups( + fn pushdown_remote_pref( &self, - original_plan: Arc, - ) -> Result<(Arc, Vec)> { - let mut sends = Vec::new(); - let pref = original_plan + root: Arc, + ) -> Result<(RuntimeGroupExec, Vec)> { + let root_pref = root .as_any() .downcast_ref::() - .map(|exec| exec.preference) + .map(|r| r.preference) .unwrap_or(RuntimePreference::Unspecified); - let plan = original_plan.transform_up_mut(&mut |plan| { - let mut new_children: Vec> = Vec::new(); - let mut did_modify = false; - - for child in plan.children() { - match child.as_any().downcast_ref::() { - Some(exec) if exec.preference == RuntimePreference::Local => { - if matches!(pref, RuntimePreference::Local) { - // If the root of the plan is local, then we can just - // execute everything locally by omitting the remote - // execution exec. - new_children.push(child); - continue; + if matches!(root_pref, RuntimePreference::Remote) { + let mut sends = Vec::new(); + + // Replace all "local" execs with recv-send pairs. + let plan = root.transform_up_mut(&mut |plan| { + let mut did_modify = false; + let mut new_children: Vec> = Vec::new(); + + for child in plan.children() { + match child.as_any().downcast_ref::() { + Some(exec) if matches!(exec.preference, RuntimePreference::Local) => { + did_modify = true; + + let work_id = Uuid::new_v4(); + debug!(%work_id, "creating send and recv execs"); + + let mut input = exec.child.clone(); + + // Create the receive exec. This will be executed on the + // remote node. + let recv = ClientExchangeRecvExec { + work_id, + schema: input.schema(), + }; + + // Temporary coalesce exec until our custom plans support partition. + if input.output_partitioning().partition_count() != 1 { + input = Arc::new(CoalescePartitionsExec::new(input)); + } + + // And create the associated send exec. This will be + // executed locally, and pushes batches over the + // broadcast endpoint. + let send = ClientExchangeSendExec { + database_id: self.database_id, + work_id, + client: self.remote_client.clone(), + input, + }; + sends.push(send); + + new_children.push(Arc::new(recv)); } + _ => new_children.push(child), + } + } - did_modify = true; - - let work_id = Uuid::new_v4(); - debug!(%work_id, "creating send and recv execs"); + let transformed = if did_modify { + let new_plan = plan.with_new_children(new_children)?; + Transformed::Yes(new_plan) + } else { + Transformed::No(plan) + }; - let mut input = exec.child.clone(); + Ok(transformed) + })?; - // Create the receive exec. This will be executed on the - // remote node. - let recv = ClientExchangeRecvExec { - work_id, - schema: input.schema(), - }; + Ok(( + RuntimeGroupExec::new(RuntimePreference::Remote, plan), + sends, + )) + } else { + let mut children_and_sends = Vec::new(); + let mut any_local = matches!(root_pref, RuntimePreference::Local); + let mut none_remote = true; - // Temporary coalesce exec until our custom plans support partition. - if input.output_partitioning().partition_count() != 1 { - input = Arc::new(CoalescePartitionsExec::new(input)); - } + for child in root.children() { + let (new_child, sends) = self.pushdown_remote_pref(child)?; - // And create the associated send exec. This will be - // executed locally, and pushes batches over the - // broadcast endpoint. - let send = ClientExchangeSendExec { - database_id: self.database_id, - work_id, - client: self.remote_client.clone(), - input, - }; - sends.push(send); - - new_children.push(Arc::new(recv)); + match new_child.preference { + RuntimePreference::Local => { + any_local = true; } - _ => new_children.push(child), + RuntimePreference::Remote => { + none_remote = false; + } + _ => (), } + + children_and_sends.push((new_child, sends)); } - if !did_modify { - return Ok(Transformed::No(plan)); + if any_local { + // Create the remote execs as they are. + let children: Vec> = children_and_sends + .into_iter() + .map(|(child, sends)| { + if matches!(child.preference, RuntimePreference::Remote) { + self.create_join_exec(child.child, sends) + } else { + assert!(sends.is_empty()); + child.child + } + }) + .collect(); + + let plan = if children.is_empty() { + root + } else { + root.with_new_children(children)? + }; + + // Mark this as being executed on local. + Ok(( + RuntimeGroupExec::new(RuntimePreference::Local, plan), + Vec::new(), + )) + } else if none_remote { + // This implies that the whole plan can be executed on local. + // Keep the actual preference intact. + + let children: Vec<_> = children_and_sends + .into_iter() + .map(|(child, _)| child.child) + .collect(); + + let plan = if children.is_empty() { + root + } else { + root.with_new_children(children)? + }; + + Ok(( + RuntimeGroupExec::new(RuntimePreference::Unspecified, plan), + Vec::new(), + )) + } else { + // We can "pull up" the remote preference now. + let (children, sends): (Vec<_>, Vec<_>) = children_and_sends + .into_iter() + .map(|(child, sends)| (child.child, sends)) + .unzip(); + + let sends: Vec<_> = sends.into_iter().flatten().collect(); + + let plan = if children.is_empty() { + root + } else { + root.with_new_children(children)? + }; + + Ok(( + RuntimeGroupExec::new(RuntimePreference::Remote, plan), + sends, + )) } - let new_plan = plan.with_new_children(new_children)?; - Ok(Transformed::Yes(new_plan)) - })?; + } + } - Ok((plan, sends)) + fn create_join_exec( + &self, + mut physical: Arc, + sends: Vec, + ) -> Arc { + // Temporary coalesce exec until our custom plans support partition. + if physical.output_partitioning().partition_count() != 1 { + physical = Arc::new(CoalescePartitionsExec::new(physical)); + } + + // Wrap in exec that will send the plan to the remote machine. + let physical = Arc::new(RemoteExecutionExec::new( + self.remote_client.clone(), + physical, + self.query_text.to_owned(), + )); + + Arc::new(SendRecvJoinExec::new(physical, sends)) } } @@ -495,48 +588,22 @@ impl<'a> PhysicalPlanner for RemotePhysicalPlanner<'a> { // Rewrite any DDL that needs to process locally so we can only send the // query on remote and process it later on. - // Create the physical plans. This will call `scan` on - // the custom table providers meaning we'll have the - // correct exec refs. + // Create the physical plans. This will call `scan` on the custom table + // providers meaning we'll have the correct exec refs. let physical = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( DDLExtensionPlanner::new(self.catalog.clone()), )]) .create_physical_plan(logical_plan, session_state) .await?; - let (physical, send_execs) = self.replace_local_runtime_groups(physical)?; - - // If the root of the plan indicates a local runtime preference, then we - // can just execute everything locally by omitting the remote execution - // exec. - // - // TODO: We could probably have an option to configure this. Note that - // if we do add this as an option, we'll need to change - // `replace_local_runtime_groups` to handle the root of the plan too. - let physical = match physical.as_any().downcast_ref::() { - Some(exec) if exec.preference == RuntimePreference::Local && send_execs.is_empty() => { - physical - } - _ => { - let mut physical = physical; - // Temporary coalesce exec until our custom plans support partition. - if physical.output_partitioning().partition_count() != 1 { - physical = Arc::new(CoalescePartitionsExec::new(physical)); - } - // Wrap in exec that will send the plan to the remote machine. - let physical = Arc::new(RemoteExecutionExec::new( - self.remote_client.clone(), - physical, - self.query_text.to_string(), - )); + // Root of the plan should be local so all the send execs are wrapped up + // in joins if required. + let local_physical = RuntimeGroupExec::new(RuntimePreference::Local, physical); + let (physical, sends) = self.pushdown_remote_pref(Arc::new(local_physical))?; + assert!(sends.is_empty()); - // Create a wrapper physical plan which drives both the - // result stream, and the send execs - Arc::new(SendRecvJoinExec::new(physical, send_execs)) - } - }; - Ok(physical) + Ok(Arc::new(physical)) } fn create_physical_expr(