Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transform: plumb through OptimizerFeatures #25628

Merged
21 changes: 16 additions & 5 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use mz_storage_types::sinks::S3UploadInfo;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::StatisticsOracle;
use mz_transform::{StatisticsOracle, TransformCtx};
use timely::progress::Antichain;
use tracing::warn;

Expand Down Expand Up @@ -103,6 +103,7 @@ pub struct Unresolved;
#[derive(Clone)]
pub struct LocalMirPlan<T = Unresolved> {
expr: MirRelationExpr,
df_meta: DataflowMetainfo,
context: T,
}

Expand Down Expand Up @@ -147,11 +148,15 @@ impl Optimize<HirRelationExpr> for Optimizer {
let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let expr = optimize_mir_local(expr, &self.typecheck_ctx)?.into_inner();
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

// Return the (sealed) plan at the end of this optimization step.
Ok(LocalMirPlan {
expr,
df_meta,
context: Unresolved,
})
}
Expand All @@ -168,6 +173,7 @@ impl LocalMirPlan<Unresolved> {
) -> LocalMirPlan<Resolved> {
LocalMirPlan {
expr: self.expr,
df_meta: self.df_meta,
context: Resolved {
timestamp_ctx,
session,
Expand All @@ -183,6 +189,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
let LocalMirPlan {
expr,
mut df_meta,
context:
Resolved {
timestamp_ctx,
Expand Down Expand Up @@ -279,12 +286,16 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
}
}

let df_meta = mz_transform::optimize_dataflow(
&mut df_desc,
// Construct TransformCtx for global optimization.
let mut transform_ctx = TransformCtx::global(
&df_builder,
&*stats,
&self.config.features,
)?;
&self.typecheck_ctx,
&mut df_meta,
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx)?;

if self.config.mode == OptimizeMode::Explain {
// Collect the list of indexes used by the dataflow at this point.
Expand Down
19 changes: 12 additions & 7 deletions src/adapter/src/optimize/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::TransformCtx;

use crate::catalog::Catalog;
use crate::optimize::dataflows::{
Expand All @@ -48,7 +49,7 @@ use crate::optimize::{

pub struct Optimizer {
/// A typechecking context to use throughout the optimizer pipeline.
_typecheck_ctx: TypecheckContext,
typecheck_ctx: TypecheckContext,
/// A snapshot of the catalog state.
catalog: Arc<Catalog>,
/// A snapshot of the cluster that will run the dataflows.
Expand All @@ -67,7 +68,7 @@ impl Optimizer {
config: OptimizerConfig,
) -> Self {
Self {
_typecheck_ctx: empty_context(),
typecheck_ctx: empty_context(),
catalog,
compute_instance,
exported_index_id,
Expand Down Expand Up @@ -166,13 +167,17 @@ impl Optimize<Index> for Optimizer {
|s| prep_scalar_expr(s, style),
)?;

// Optimize the dataflow across views, and any other ways that appeal.
let mut df_meta = mz_transform::optimize_dataflow(
&mut df_desc,
// Construct TransformCtx for global optimization.
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx = TransformCtx::global(
&df_builder,
&mz_transform::EmptyStatisticsOracle,
&mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
&self.config.features,
)?;
&self.typecheck_ctx,
&mut df_meta,
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx)?;

if self.config.mode == OptimizeMode::Explain {
// Collect the list of indexes used by the dataflow at this point.
Expand Down
23 changes: 17 additions & 6 deletions src/adapter/src/optimize/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use mz_sql::plan::HirRelationExpr;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::TransformCtx;
use timely::progress::Antichain;

use crate::catalog::Catalog;
Expand Down Expand Up @@ -104,6 +105,7 @@ impl Optimizer {
#[derive(Clone, Debug)]
pub struct LocalMirPlan {
expr: MirRelationExpr,
df_meta: DataflowMetainfo,
}

/// The (sealed intermediate) result after:
Expand Down Expand Up @@ -158,10 +160,13 @@ impl Optimize<HirRelationExpr> for Optimizer {
let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let expr = optimize_mir_local(expr, &self.typecheck_ctx)?.into_inner();
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

// Return the (sealed) plan at the end of this optimization step.
Ok(LocalMirPlan { expr })
Ok(LocalMirPlan { expr, df_meta })
}
}

Expand All @@ -178,7 +183,8 @@ impl Optimize<OptimizedMirRelationExpr> for Optimizer {

fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
let expr = expr.into_inner();
self.optimize(LocalMirPlan { expr })
let df_meta = DataflowMetainfo::default();
self.optimize(LocalMirPlan { expr, df_meta })
}
}

Expand All @@ -187,6 +193,7 @@ impl Optimize<LocalMirPlan> for Optimizer {

fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
let expr = OptimizedMirRelationExpr(plan.expr);
let mut df_meta = plan.df_meta;

let mut rel_typ = expr.typ();
for &i in self.non_null_assertions.iter() {
Expand Down Expand Up @@ -225,12 +232,16 @@ impl Optimize<LocalMirPlan> for Optimizer {
|s| prep_scalar_expr(s, style),
)?;

let df_meta = mz_transform::optimize_dataflow(
&mut df_desc,
// Construct TransformCtx for global optimization.
let mut transform_ctx = TransformCtx::global(
&df_builder,
&mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
&self.config.features,
)?;
&self.typecheck_ctx,
&mut df_meta,
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx)?;

if self.config.mode == OptimizeMode::Explain {
// Collect the list of indexes used by the dataflow at this point.
Expand Down
9 changes: 4 additions & 5 deletions src/adapter/src/optimize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFr
use mz_repr::GlobalId;
use mz_sql::plan::PlanError;
use mz_sql::session::vars::SystemVars;
use mz_transform::typecheck::SharedContext as TypecheckContext;
use mz_transform::TransformError;
use mz_transform::{TransformCtx, TransformError};

// Alias types
// -----------
Expand Down Expand Up @@ -338,11 +337,11 @@ impl From<anyhow::Error> for OptimizerError {
#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
fn optimize_mir_local(
expr: MirRelationExpr,
typecheck_ctx: &TypecheckContext,
ctx: &mut TransformCtx,
) -> Result<OptimizedMirRelationExpr, OptimizerError> {
#[allow(deprecated)]
let optimizer = mz_transform::Optimizer::logical_optimizer(typecheck_ctx);
let expr = optimizer.optimize(expr)?;
let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
let expr = optimizer.optimize(expr, ctx)?;

// Trace the result of this phase.
mz_repr::explain::trace_plan(expr.as_inner());
Expand Down
21 changes: 16 additions & 5 deletions src/adapter/src/optimize/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use mz_sql::session::metadata::SessionMetadata;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::StatisticsOracle;
use mz_transform::{StatisticsOracle, TransformCtx};
use timely::progress::Antichain;
use tracing::{debug_span, warn};

Expand Down Expand Up @@ -115,6 +115,7 @@ pub struct Unresolved;
#[derive(Clone)]
pub struct LocalMirPlan<T = Unresolved> {
expr: MirRelationExpr,
df_meta: DataflowMetainfo,
context: T,
}

Expand Down Expand Up @@ -155,11 +156,15 @@ impl Optimize<HirRelationExpr> for Optimizer {
let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let expr = optimize_mir_local(expr, &self.typecheck_ctx)?.into_inner();
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

// Return the (sealed) plan at the end of this optimization step.
Ok(LocalMirPlan {
expr,
df_meta,
context: Unresolved,
})
}
Expand All @@ -176,6 +181,7 @@ impl LocalMirPlan<Unresolved> {
) -> LocalMirPlan<Resolved> {
LocalMirPlan {
expr: self.expr,
df_meta: self.df_meta,
context: Resolved {
timestamp_ctx,
session,
Expand All @@ -191,6 +197,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
let LocalMirPlan {
expr,
mut df_meta,
context:
Resolved {
timestamp_ctx,
Expand Down Expand Up @@ -268,12 +275,16 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
}
}

let df_meta = mz_transform::optimize_dataflow(
&mut df_desc,
// Construct TransformCtx for global optimization.
let mut transform_ctx = TransformCtx::global(
&df_builder,
&*stats,
&self.config.features,
)?;
&self.typecheck_ctx,
&mut df_meta,
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx)?;

if self.config.mode == OptimizeMode::Explain {
// Collect the list of indexes used by the dataflow at this point.
Expand Down
22 changes: 13 additions & 9 deletions src/adapter/src/optimize/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mz_sql::plan::SubscribeFrom;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::TransformCtx;
use timely::progress::Antichain;

use crate::catalog::Catalog;
Expand Down Expand Up @@ -177,8 +178,9 @@ impl Optimize<SubscribeFrom> for Optimizer {
DataflowBuilder::new(catalog, compute).with_config(&self.config)
};
let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
let mut df_meta = DataflowMetainfo::default();

let mut df_desc = match plan {
match plan {
SubscribeFrom::Id(from_id) => {
let from = self.catalog.get_entry(&from_id);
let from_desc = from
Expand Down Expand Up @@ -207,8 +209,6 @@ impl Optimize<SubscribeFrom> for Optimizer {
refresh_schedule: None,
};
df_desc.export_sink(self.sink_id, sink_description);

df_desc
}
SubscribeFrom::Query { expr, desc } => {
// TODO: Change the `expr` type to be `HirRelationExpr` and run
Expand All @@ -218,7 +218,9 @@ impl Optimize<SubscribeFrom> for Optimizer {
// let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let expr = optimize_mir_local(expr, &self.typecheck_ctx)?;
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?;

df_builder.import_view_into_dataflow(&self.view_id, &expr, &mut df_desc)?;
df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
Expand All @@ -236,8 +238,6 @@ impl Optimize<SubscribeFrom> for Optimizer {
refresh_schedule: None,
};
df_desc.export_sink(self.sink_id, sink_description);

df_desc
}
};

Expand All @@ -248,12 +248,16 @@ impl Optimize<SubscribeFrom> for Optimizer {
|s| prep_scalar_expr(s, style),
)?;

let df_meta = mz_transform::optimize_dataflow(
&mut df_desc,
// Construct TransformCtx for global optimization.
let mut transform_ctx = TransformCtx::global(
&df_builder,
&mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
&self.config.features,
)?;
&self.typecheck_ctx,
&mut df_meta,
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx)?;

if self.config.mode == OptimizeMode::Explain {
// Collect the list of indexes used by the dataflow at this point.
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/optimize/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

use mz_expr::OptimizedMirRelationExpr;
use mz_sql::plan::HirRelationExpr;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::TransformCtx;

use crate::optimize::{optimize_mir_local, trace_plan, Optimize, OptimizerConfig, OptimizerError};

Expand Down Expand Up @@ -42,7 +44,10 @@ impl Optimize<HirRelationExpr> for Optimizer {
let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let expr = optimize_mir_local(expr, &self.typecheck_ctx)?;
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?;

// Return the resulting OptimizedMirRelationExpr.
Ok(expr)
Expand Down
Loading