diff --git a/crates/datafusion_ext/src/vars.rs b/crates/datafusion_ext/src/vars.rs index 041ba6792d..edc999b21b 100644 --- a/crates/datafusion_ext/src/vars.rs +++ b/crates/datafusion_ext/src/vars.rs @@ -95,6 +95,7 @@ impl SessionVars { is_cloud_instance: bool, dialect: Dialect, enable_experimental_scheduler: bool, + prefer_local_execution: bool, } } diff --git a/crates/datafusion_ext/src/vars/constants.rs b/crates/datafusion_ext/src/vars/constants.rs index 8993fdd67b..50c5421390 100644 --- a/crates/datafusion_ext/src/vars/constants.rs +++ b/crates/datafusion_ext/src/vars/constants.rs @@ -204,6 +204,14 @@ pub(super) const ENABLE_EXPERIMENTAL_SCHEDULER: ServerVar = ServerVar { description: "If the experimental query scheduler should be enabled", }; +pub(super) const PREFER_LOCAL_EXECUTION: ServerVar = ServerVar { + name: "prefer_local_execution", + value: &false, + group: "glaredb", + user_configurable: true, + description: "If the hybrid execution planner should prefer local execution", +}; + /// Note that these are not normally shown in the search path. pub(super) const IMPLICIT_SCHEMAS: [&str; 2] = [ POSTGRES_SCHEMA, diff --git a/crates/datafusion_ext/src/vars/inner.rs b/crates/datafusion_ext/src/vars/inner.rs index 178e4efc63..cc494968e6 100644 --- a/crates/datafusion_ext/src/vars/inner.rs +++ b/crates/datafusion_ext/src/vars/inner.rs @@ -48,6 +48,7 @@ pub struct SessionVarsInner { pub is_cloud_instance: SessionVar, pub dialect: SessionVar, pub enable_experimental_scheduler: SessionVar, + pub prefer_local_execution: SessionVar, } impl SessionVarsInner { @@ -114,6 +115,8 @@ impl SessionVarsInner { Ok(&self.dialect) } else if name.eq_ignore_ascii_case(ENABLE_EXPERIMENTAL_SCHEDULER.name) { Ok(&self.enable_experimental_scheduler) + } else if name.eq_ignore_ascii_case(PREFER_LOCAL_EXECUTION.name) { + Ok(&self.prefer_local_execution) } else { Err(VarError::UnknownVariable(name.to_string()).into()) } @@ -169,6 +172,8 @@ impl SessionVarsInner { self.dialect.set_from_str(val, setter) } else if name.eq_ignore_ascii_case(ENABLE_EXPERIMENTAL_SCHEDULER.name) { self.enable_experimental_scheduler.set_from_str(val, setter) + } else if name.eq_ignore_ascii_case(PREFER_LOCAL_EXECUTION.name) { + self.prefer_local_execution.set_from_str(val, setter) } else { Err(VarError::UnknownVariable(name.to_string()).into()) } @@ -230,6 +235,7 @@ impl Default for SessionVarsInner { is_cloud_instance: SessionVar::new(&IS_CLOUD_INSTANCE), dialect: SessionVar::new(&DIALECT), enable_experimental_scheduler: SessionVar::new(&ENABLE_EXPERIMENTAL_SCHEDULER), + prefer_local_execution: SessionVar::new(&PREFER_LOCAL_EXECUTION), } } } diff --git a/crates/sqlexec/src/planner/physical_plan/remote_scan.rs b/crates/sqlexec/src/planner/physical_plan/remote_scan.rs index e236e2209e..5ec8ab8fd0 100644 --- a/crates/sqlexec/src/planner/physical_plan/remote_scan.rs +++ b/crates/sqlexec/src/planner/physical_plan/remote_scan.rs @@ -14,7 +14,9 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::Expr; use datafusion_ext::metrics::AggregateMetricsStreamAdapter; +use datafusion_ext::runtime::runtime_group::RuntimeGroupExec; use futures::{stream, TryStreamExt}; +use protogen::metastore::types::catalog::RuntimePreference; use std::any::Any; use std::fmt; use std::hash::Hash; @@ -78,21 +80,26 @@ pub struct RemoteScanExec { } impl RemoteScanExec { + /// Returns a remote scan exec wrapped inside a [`RuntimeGroupExec`] + /// asserting that this can only be run on the remote side. pub fn new( provider: ProviderReference, projected_schema: Arc, projection: Option>, filters: Vec, limit: Option, - ) -> Self { - Self { - provider, - projected_schema, - projection, - filters, - limit, - metrics: ExecutionPlanMetricsSet::new(), - } + ) -> RuntimeGroupExec { + RuntimeGroupExec::new( + RuntimePreference::Remote, + Arc::new(Self { + provider, + projected_schema, + projection, + filters, + limit, + metrics: ExecutionPlanMetricsSet::new(), + }), + ) } } diff --git a/crates/sqlexec/src/remote/planner.rs b/crates/sqlexec/src/remote/planner.rs index 03168218af..0840459828 100644 --- a/crates/sqlexec/src/remote/planner.rs +++ b/crates/sqlexec/src/remote/planner.rs @@ -6,7 +6,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; use datafusion::logical_expr::{LogicalPlan as DfLogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; +use datafusion::physical_plan::{displayable, ExecutionPlan, PhysicalExpr}; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::Expr; use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter; @@ -80,7 +80,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { ) -> Result>> { let extension_type = node.name().parse::().unwrap(); - match extension_type { + let runtime_group_exec = match extension_type { ExtensionType::AlterDatabase => { let lp = require_downcast_lp::(node); let exec = AlterDatabaseExec { @@ -88,7 +88,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { name: lp.name.to_string(), operation: lp.operation.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::AlterTable => { let lp = require_downcast_lp::(node); @@ -98,7 +98,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { name: lp.name.to_owned(), operation: lp.operation.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::AlterTunnelRotateKeys => { let lp = require_downcast_lp::(node); @@ -108,7 +108,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { if_exists: lp.if_exists, new_ssh_key: lp.new_ssh_key.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateCredentials => { let lp = require_downcast_lp::(node); @@ -118,78 +118,83 @@ impl ExtensionPlanner for DDLExtensionPlanner { options: lp.options.clone(), comment: lp.comment.clone(), }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateExternalDatabase => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateExternalDatabaseExec { + let exec = CreateExternalDatabaseExec { catalog_version: self.catalog.version(), database_name: lp.database_name.clone(), if_not_exists: lp.if_not_exists, options: lp.options.clone(), tunnel: lp.tunnel.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateExternalTable => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateExternalTableExec { + let exec = CreateExternalTableExec { catalog_version: self.catalog.version(), tbl_reference: lp.tbl_reference.clone(), or_replace: lp.or_replace, if_not_exists: lp.if_not_exists, tunnel: lp.tunnel.clone(), table_options: lp.table_options.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateSchema => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateSchemaExec { + let exec = CreateSchemaExec { catalog_version: self.catalog.version(), schema_reference: lp.schema_reference.clone(), if_not_exists: lp.if_not_exists, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateTable => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateTableExec { + let exec = CreateTableExec { catalog_version: self.catalog.version(), tbl_reference: lp.tbl_reference.clone(), if_not_exists: lp.if_not_exists, or_replace: lp.or_replace, arrow_schema: Arc::new(lp.schema.as_ref().into()), source: physical_inputs.get(0).cloned(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateTempTable => { let lp = require_downcast_lp::(node); - let exec = Arc::new(CreateTempTableExec { + let exec = CreateTempTableExec { tbl_reference: lp.tbl_reference.clone(), if_not_exists: lp.if_not_exists, or_replace: lp.or_replace, arrow_schema: Arc::new(lp.schema.as_ref().into()), source: physical_inputs.get(0).cloned(), - }); - let exec = Arc::new(RuntimeGroupExec::new(RuntimePreference::Local, exec)); - Ok(Some(exec)) + }; + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::CreateTunnel => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateTunnelExec { + let exec = CreateTunnelExec { catalog_version: self.catalog.version(), name: lp.name.clone(), if_not_exists: lp.if_not_exists, options: lp.options.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::CreateView => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(CreateViewExec { + let exec = CreateViewExec { catalog_version: self.catalog.version(), view_reference: lp.view_reference.clone(), sql: lp.sql.clone(), columns: lp.columns.clone(), or_replace: lp.or_replace, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DescribeTable => { let DescribeTable { entry } = require_downcast_lp::(node); @@ -198,13 +203,10 @@ impl ExtensionPlanner for DDLExtensionPlanner { } else { RuntimePreference::Remote }; - let exec = Arc::new(DescribeTableExec { + let exec = DescribeTableExec { entry: entry.clone(), - }); - - let exec = Arc::new(RuntimeGroupExec::new(runtime, exec)); - - Ok(Some(exec)) + }; + RuntimeGroupExec::new(runtime, Arc::new(exec)) } ExtensionType::DropTables => { let tmp_catalog = session_state @@ -233,10 +235,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { ))); } } - let exec: Arc = match ( - temp_table_drops.is_empty(), - drops.is_empty(), - ) { + match (temp_table_drops.is_empty(), drops.is_empty()) { // both temp and remote tables (false, false) => { return Err(DataFusionError::Plan("Unable to drop temp and native tables in the same statement. Please use separate statements.".to_string()))?; @@ -248,8 +247,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { tbl_references: temp_table_drops, if_exists: plan.if_exists, }); - - Arc::new(RuntimeGroupExec::new(RuntimePreference::Local, tmp_exec)) + RuntimeGroupExec::new(RuntimePreference::Local, tmp_exec) } // only remote tables (true, false) => { @@ -258,24 +256,22 @@ impl ExtensionPlanner for DDLExtensionPlanner { tbl_references: drops, if_exists: plan.if_exists, }); - let exec = RuntimeGroupExec::new(RuntimePreference::Remote, exec); - Arc::new(exec) + RuntimeGroupExec::new(RuntimePreference::Remote, exec) } // no tables (true, true) => { return Err(DataFusionError::Plan("No tables to drop".to_string()))? } - }; - - Ok(Some(exec)) + } } ExtensionType::DropCredentials => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(DropCredentialsExec { + let exec = DropCredentialsExec { catalog_version: self.catalog.version(), names: lp.names.clone(), if_exists: lp.if_exists, - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropDatabase => { let lp = require_downcast_lp::(node); @@ -284,7 +280,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { names: lp.names.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropSchemas => { let lp = require_downcast_lp::(node); @@ -294,16 +290,16 @@ impl ExtensionPlanner for DDLExtensionPlanner { if_exists: lp.if_exists, cascade: lp.cascade, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropTunnel => { let lp = require_downcast_lp::(node); - let exec: DropTunnelExec = DropTunnelExec { + let exec = DropTunnelExec { catalog_version: self.catalog.version(), names: lp.names.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::DropViews => { let lp = require_downcast_lp::(node); @@ -313,7 +309,7 @@ impl ExtensionPlanner for DDLExtensionPlanner { view_references: lp.view_references.clone(), if_exists: lp.if_exists, }; - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::SetVariable => { let lp = require_downcast_lp::(node); @@ -321,16 +317,14 @@ impl ExtensionPlanner for DDLExtensionPlanner { variable: lp.variable.clone(), values: lp.values.clone(), }; - let exec = RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)); - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::ShowVariable => { let lp = require_downcast_lp::(node); let exec = ShowVarExec { variable: lp.variable.clone(), }; - let exec = RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)); - Ok(Some(Arc::new(exec))) + RuntimeGroupExec::new(RuntimePreference::Local, Arc::new(exec)) } ExtensionType::CopyTo => { let lp = require_downcast_lp::(node); @@ -338,7 +332,6 @@ impl ExtensionPlanner for DDLExtensionPlanner { CopyToDestinationOptions::Local(_) => RuntimePreference::Local, _ => RuntimePreference::Remote, }; - let exec = Arc::new(CopyToExec { format: lp.format.clone(), dest: lp.dest.clone(), @@ -346,16 +339,16 @@ impl ExtensionPlanner for DDLExtensionPlanner { physical_inputs.get(0).unwrap().clone(), )), }); - let exec = Arc::new(RuntimeGroupExec::new(runtime, exec)); - Ok(Some(exec)) + RuntimeGroupExec::new(runtime, exec) } ExtensionType::Update => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(UpdateExec { + let exec = UpdateExec { table: lp.table.clone(), updates: lp.updates.clone(), where_expr: lp.where_expr.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } ExtensionType::Insert => { let lp = require_downcast_lp::(node); @@ -373,17 +366,19 @@ impl ExtensionPlanner for DDLExtensionPlanner { physical_inputs.get(0).unwrap().clone(), )), }); - let exec = Arc::new(RuntimeGroupExec::new(lp.runtime_preference, exec)); - Ok(Some(exec)) + RuntimeGroupExec::new(lp.runtime_preference, exec) } ExtensionType::Delete => { let lp = require_downcast_lp::(node); - Ok(Some(Arc::new(DeleteExec { + let exec = DeleteExec { table: lp.table.clone(), where_expr: lp.where_expr.clone(), - }))) + }; + RuntimeGroupExec::new(RuntimePreference::Remote, Arc::new(exec)) } - } + }; + + Ok(Some(Arc::new(runtime_group_exec))) } } @@ -391,6 +386,7 @@ pub struct RemotePhysicalPlanner<'a> { pub database_id: Uuid, pub remote_client: RemoteSessionClient, pub catalog: &'a SessionCatalog, + pub prefer_local_execution: bool, } impl<'a> RemotePhysicalPlanner<'a> { @@ -408,7 +404,7 @@ impl<'a> RemotePhysicalPlanner<'a> { original_plan: Arc, ) -> Result<(Arc, Vec)> { let mut sends = Vec::new(); - let pref = original_plan + let original_plan_pref = original_plan .as_any() .downcast_ref::() .map(|exec| exec.preference) @@ -420,11 +416,11 @@ impl<'a> RemotePhysicalPlanner<'a> { for child in plan.children() { match child.as_any().downcast_ref::() { - Some(exec) if exec.preference == RuntimePreference::Local => { - if matches!(pref, RuntimePreference::Local) { - // If the root of the plan is local, then we can just - // execute everything locally by omitting the remote - // execution exec. + Some(exec) if self.prefer_local_execution(exec.preference) => { + if self.prefer_local_execution(original_plan_pref) { + // If the root of the plan is local, then we can + // just execute everything locally by omitting the + // remote execution exec. new_children.push(child); continue; } @@ -474,6 +470,13 @@ impl<'a> RemotePhysicalPlanner<'a> { Ok((plan, sends)) } + + /// Returns true if the planner prefers local execution for the given + /// preference. + fn prefer_local_execution(&self, pref: RuntimePreference) -> bool { + matches!(pref, RuntimePreference::Local) + || matches!(pref, RuntimePreference::Unspecified if self.prefer_local_execution) + } } #[async_trait] @@ -504,28 +507,40 @@ impl<'a> PhysicalPlanner for RemotePhysicalPlanner<'a> { // TODO: We could probably have an option to configure this. Note that // if we do add this as an option, we'll need to change // `replace_local_runtime_groups` to handle the root of the plan too. - let physical = match physical.as_any().downcast_ref::() { - Some(exec) if exec.preference == RuntimePreference::Local && send_execs.is_empty() => { - physical + let exec_prefers_local = { + let exec = physical.as_any().downcast_ref::(); + let exec_preference = exec + .clone() + .map(|e| e.preference) + .unwrap_or(RuntimePreference::Unspecified); + // if let Some(exec) = exec { + // println!("exec = {}", displayable(exec.child.as_ref()).indent(true)); + // } else { + // println!("physical = {}", displayable(physical.as_ref()).indent(true)); + // } + self.prefer_local_execution(exec_preference) && send_execs.is_empty() + }; + + let physical = if exec_prefers_local { + physical + } else { + let mut physical = physical; + // Temporary coalesce exec until our custom plans support partition. + if physical.output_partitioning().partition_count() != 1 { + physical = Arc::new(CoalescePartitionsExec::new(physical)); } - _ => { - let mut physical = physical; - // Temporary coalesce exec until our custom plans support partition. - if physical.output_partitioning().partition_count() != 1 { - physical = Arc::new(CoalescePartitionsExec::new(physical)); - } - // Wrap in exec that will send the plan to the remote machine. - let physical = Arc::new(RemoteExecutionExec::new( - self.remote_client.clone(), - physical, - )); + // Wrap in exec that will send the plan to the remote machine. + let physical = Arc::new(RemoteExecutionExec::new( + self.remote_client.clone(), + physical, + )); - // Create a wrapper physical plan which drives both the - // result stream, and the send execs - Arc::new(SendRecvJoinExec::new(physical, send_execs)) - } + // Create a wrapper physical plan which drives both the + // result stream, and the send execs + Arc::new(SendRecvJoinExec::new(physical, send_execs)) }; + Ok(physical) } diff --git a/crates/sqlexec/src/session.rs b/crates/sqlexec/src/session.rs index af8a39b35e..d881606923 100644 --- a/crates/sqlexec/src/session.rs +++ b/crates/sqlexec/src/session.rs @@ -404,6 +404,7 @@ impl Session { database_id: self.ctx.get_database_id(), remote_client: client, catalog: self.ctx.get_session_catalog(), + prefer_local_execution: self.ctx.get_session_vars().prefer_local_execution(), }; let plan = planner.create_physical_plan(&plan, &state).await?; Ok(plan)