Skip to content

Commit

Permalink
feat: Add option to prefer local execution over remote
Browse files Browse the repository at this point in the history
Fixes: #2104

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Nov 30, 2023
1 parent 739505f commit c8f92ee
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 93 deletions.
1 change: 1 addition & 0 deletions crates/datafusion_ext/src/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl SessionVars {
is_cloud_instance: bool,
dialect: Dialect,
enable_experimental_scheduler: bool,
prefer_local_execution: bool,
}
}

Expand Down
8 changes: 8 additions & 0 deletions crates/datafusion_ext/src/vars/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ pub(super) const ENABLE_EXPERIMENTAL_SCHEDULER: ServerVar<bool> = ServerVar {
description: "If the experimental query scheduler should be enabled",
};

pub(super) const PREFER_LOCAL_EXECUTION: ServerVar<bool> = ServerVar {
name: "prefer_local_execution",
value: &false,
group: "glaredb",
user_configurable: true,
description: "If the hybrid execution planner should prefer local execution",
};

/// Note that these are not normally shown in the search path.
pub(super) const IMPLICIT_SCHEMAS: [&str; 2] = [
POSTGRES_SCHEMA,
Expand Down
6 changes: 6 additions & 0 deletions crates/datafusion_ext/src/vars/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct SessionVarsInner {
pub is_cloud_instance: SessionVar<bool>,
pub dialect: SessionVar<Dialect>,
pub enable_experimental_scheduler: SessionVar<bool>,
pub prefer_local_execution: SessionVar<bool>,
}

impl SessionVarsInner {
Expand Down Expand Up @@ -114,6 +115,8 @@ impl SessionVarsInner {
Ok(&self.dialect)
} else if name.eq_ignore_ascii_case(ENABLE_EXPERIMENTAL_SCHEDULER.name) {
Ok(&self.enable_experimental_scheduler)
} else if name.eq_ignore_ascii_case(PREFER_LOCAL_EXECUTION.name) {
Ok(&self.prefer_local_execution)
} else {
Err(VarError::UnknownVariable(name.to_string()).into())
}
Expand Down Expand Up @@ -169,6 +172,8 @@ impl SessionVarsInner {
self.dialect.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(ENABLE_EXPERIMENTAL_SCHEDULER.name) {
self.enable_experimental_scheduler.set_from_str(val, setter)
} else if name.eq_ignore_ascii_case(PREFER_LOCAL_EXECUTION.name) {
self.prefer_local_execution.set_from_str(val, setter)
} else {
Err(VarError::UnknownVariable(name.to_string()).into())
}
Expand Down Expand Up @@ -230,6 +235,7 @@ impl Default for SessionVarsInner {
is_cloud_instance: SessionVar::new(&IS_CLOUD_INSTANCE),
dialect: SessionVar::new(&DIALECT),
enable_experimental_scheduler: SessionVar::new(&ENABLE_EXPERIMENTAL_SCHEDULER),
prefer_local_execution: SessionVar::new(&PREFER_LOCAL_EXECUTION),
}
}
}
Expand Down
25 changes: 16 additions & 9 deletions crates/sqlexec/src/planner/physical_plan/remote_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::AggregateMetricsStreamAdapter;
use datafusion_ext::runtime::runtime_group::RuntimeGroupExec;
use futures::{stream, TryStreamExt};
use protogen::metastore::types::catalog::RuntimePreference;
use std::any::Any;
use std::fmt;
use std::hash::Hash;
Expand Down Expand Up @@ -78,21 +80,26 @@ pub struct RemoteScanExec {
}

impl RemoteScanExec {
/// Returns a remote scan exec wrapped inside a [`RuntimeGroupExec`]
/// asserting that this can only be run on the remote side.
pub fn new(
provider: ProviderReference,
projected_schema: Arc<Schema>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
limit: Option<usize>,
) -> Self {
Self {
provider,
projected_schema,
projection,
filters,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
) -> RuntimeGroupExec {
RuntimeGroupExec::new(
RuntimePreference::Remote,
Arc::new(Self {
provider,
projected_schema,
projection,
filters,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}),
)
}
}

Expand Down
Loading

0 comments on commit c8f92ee

Please sign in to comment.