Skip to content

Commit

Permalink
feat(frontend): seperate plan_fragmenter into two phase (#7581)
Browse files Browse the repository at this point in the history
To solve #7439, we need to do async operation in plan_fragmentor. To do this, I seperate the plan_fragmentor into two phase **so that we can do async operation in phase 2**:

phase 1 : BatchPlanFragmenter.split(batch_node) -> PreStageGraph
phase 2 : PreStageGraph.complete() -> StageGraph

The difference between PreStageGraph and StageGraph is that StageGraph contains the exchange_info and parallism. These information will be filled in phase 2.

Approved-By: liurenjie1024
  • Loading branch information
ZENOTME authored Feb 6, 2023
1 parent 2678067 commit c3bb027
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 193 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository = { workspace = true }
anyhow = "1"
arc-swap = "1"
assert-impl = "0.1"
async-recursion = "1.0.2"
async-trait = "0.1"
bk-tree = "0.4.0"
byteorder = "1.4"
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ impl TestCase {
if result.is_some() {
panic!("two queries in one test case");
}
let rsp = explain::handle_explain(handler_args, *statement, options, analyze)?;
let rsp =
explain::handle_explain(handler_args, *statement, options, analyze).await?;

let explain_output = get_explain_output(rsp).await;
let ret = TestCaseResult {
Expand Down
232 changes: 120 additions & 112 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::scheduler::BatchPlanFragmenter;
use crate::stream_fragmenter::build_graph;
use crate::utils::explain_stream_graph;

pub fn handle_explain(
pub async fn handle_explain(
handler_args: HandlerArgs,
stmt: Statement,
options: ExplainOptions,
Expand All @@ -48,132 +48,140 @@ pub fn handle_explain(

let session = context.session_ctx().clone();

let plan = match stmt {
Statement::CreateView {
or_replace: false,
materialized: true,
query,
name,
columns,
..
} => gen_create_mv_plan(&session, context.into(), *query, name, columns)?.0,
let mut plan_fragmenter = None;
let mut rows = {
let plan = match stmt {
Statement::CreateView {
or_replace: false,
materialized: true,
query,
name,
columns,
..
} => gen_create_mv_plan(&session, context.into(), *query, name, columns)?.0,

Statement::CreateSink { stmt } => gen_sink_plan(&session, context.into(), stmt)?.0,
Statement::CreateSink { stmt } => gen_sink_plan(&session, context.into(), stmt)?.0,

Statement::CreateTable {
name,
columns,
constraints,
source_schema,
..
} => match check_create_table_with_source(&handler_args.with_options, source_schema)? {
Some(_) => {
return Err(ErrorCode::NotImplemented(
"explain create table with a connector".to_string(),
None.into(),
)
.into())
}
None => {
gen_create_table_plan(
context,
name,
columns,
constraints,
ColumnIdGenerator::new_initial(),
)?
.0
}
},
Statement::CreateTable {
name,
columns,
constraints,
source_schema,
..
} => match check_create_table_with_source(&handler_args.with_options, source_schema)? {
Some(_) => {
return Err(ErrorCode::NotImplemented(
"explain create table with a connector".to_string(),
None.into(),
)
.into())
}
None => {
gen_create_table_plan(
context,
name,
columns,
constraints,
ColumnIdGenerator::new_initial(),
)?
.0
}
},

Statement::CreateIndex {
name,
table_name,
columns,
include,
distributed_by,
..
} => {
gen_create_index_plan(
&session,
context.into(),
Statement::CreateIndex {
name,
table_name,
columns,
include,
distributed_by,
)?
.0
}
..
} => {
gen_create_index_plan(
&session,
context.into(),
name,
table_name,
columns,
include,
distributed_by,
)?
.0
}

stmt => gen_batch_query_plan(&session, context.into(), stmt)?.0,
};
stmt => gen_batch_query_plan(&session, context.into(), stmt)?.0,
};

let ctx = plan.plan_base().ctx.clone();
let explain_trace = ctx.is_explain_trace();
let explain_verbose = ctx.is_explain_verbose();
let ctx = plan.plan_base().ctx.clone();
let explain_trace = ctx.is_explain_trace();
let explain_verbose = ctx.is_explain_verbose();

let mut rows = if explain_trace {
let trace = ctx.take_trace();
trace
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())]))
.collect::<Vec<_>>()
} else {
vec![]
};
let mut rows = if explain_trace {
let trace = ctx.take_trace();
trace
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())]))
.collect::<Vec<_>>()
} else {
vec![]
};

match options.explain_type {
ExplainType::DistSql => match plan.convention() {
Convention::Logical => unreachable!(),
Convention::Batch => {
let plan_fragmenter = BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
);
let query = plan_fragmenter.split(plan)?;
let stage_graph_json = serde_json::to_string_pretty(&query.stage_graph).unwrap();
rows.extend(
vec![stage_graph_json]
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
match options.explain_type {
ExplainType::DistSql => match plan.convention() {
Convention::Logical => unreachable!(),
Convention::Batch => {
plan_fragmenter = Some(BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
plan,
)?);
}
Convention::Stream => {
let graph = build_graph(plan);
rows.extend(
explain_stream_graph(&graph, explain_verbose)?
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}
},
ExplainType::Physical => {
// if explain trace is open, the plan has been in the rows
if !explain_trace {
let output = plan.explain_to_string()?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}
}
Convention::Stream => {
let graph = build_graph(plan);
rows.extend(
explain_stream_graph(&graph, explain_verbose)?
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}
},
ExplainType::Physical => {
// if explain trace is open, the plan has been in the rows
if !explain_trace {
let output = plan.explain_to_string()?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}
}
ExplainType::Logical => {
// if explain trace is open, the plan has been in the rows
if !explain_trace {
let output = plan.ctx().take_logical().ok_or_else(|| {
ErrorCode::InternalError("Logical plan not found for query".into())
})?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
ExplainType::Logical => {
// if explain trace is open, the plan has been in the rows
if !explain_trace {
let output = plan.ctx().take_logical().ok_or_else(|| {
ErrorCode::InternalError("Logical plan not found for query".into())
})?;
rows.extend(
output
.lines()
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}
}
}
rows
};

if let Some(plan_fragmenter) = plan_fragmenter {
let query = plan_fragmenter.generate_complete_query().await?;
let stage_graph_json = serde_json::to_string_pretty(&query.stage_graph).unwrap();
rows.extend(
vec![stage_graph_json]
.iter()
.flat_map(|s| s.lines())
.map(|s| Row::new(vec![Some(s.to_string().into())])),
);
}

Ok(PgResponse::new_for_stream(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub async fn handle(
statement,
analyze,
options,
} => explain::handle_explain(handler_args, *statement, options, analyze),
} => explain::handle_explain(handler_args, *statement, options, analyze).await,
Statement::CreateSource { stmt } => {
create_source::handle_create_source(handler_args, stmt).await
}
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub async fn handle_query(
let mut notice = String::new();

// Subblock to make sure PlanRef (an Rc) is dropped before `await` below.
let (query, query_mode, output_schema) = {
let (plan_fragmenter, query_mode, output_schema) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, query_mode, schema) = gen_batch_query_plan(&session, context.into(), stmt)?;

Expand All @@ -116,11 +116,12 @@ pub async fn handle_query(
let plan_fragmenter = BatchPlanFragmenter::new(
session.env().worker_node_manager_ref(),
session.env().catalog_reader().clone(),
);
let query = plan_fragmenter.split(plan)?;
plan,
)?;
context.append_notice(&mut notice);
(query, query_mode, schema)
(plan_fragmenter, query_mode, schema)
};
let query = plan_fragmenter.generate_complete_query().await?;
tracing::trace!("Generated query after plan fragmenter: {:?}", &query);

let pg_descs = output_schema
Expand Down
27 changes: 15 additions & 12 deletions src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ use risingwave_pb::batch_plan::exchange_info::{
use risingwave_pb::batch_plan::ExchangeInfo;

use super::super::plan_node::*;
use crate::catalog::catalog_service::CatalogReader;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::property::Order;
use crate::optimizer::PlanRef;
use crate::scheduler::BatchPlanFragmenter;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;

/// the distribution property provided by a operator.
#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -108,7 +109,12 @@ pub enum RequiredDist {
}

impl Distribution {
pub fn to_prost(&self, output_count: u32, fragmenter: &BatchPlanFragmenter) -> ExchangeInfo {
pub fn to_prost(
&self,
output_count: u32,
catalog_reader: &CatalogReader,
worker_node_manager: &WorkerNodeManagerRef,
) -> ExchangeInfo {
ExchangeInfo {
mode: match self {
Distribution::Single => DistributionMode::Single,
Expand Down Expand Up @@ -139,8 +145,9 @@ impl Distribution {
"hash key should not be empty, use `Single` instead"
);

let vnode_mapping = Self::get_vnode_mapping(fragmenter, table_id)
.expect("vnode_mapping of UpstreamHashShard should not be none");
let vnode_mapping =
Self::get_vnode_mapping(catalog_reader, worker_node_manager, table_id)
.expect("vnode_mapping of UpstreamHashShard should not be none");

let pu2id_map: HashMap<ParallelUnitId, u32> = vnode_mapping
.iter_unique()
Expand Down Expand Up @@ -194,18 +201,14 @@ impl Distribution {

#[inline(always)]
fn get_vnode_mapping(
fragmenter: &BatchPlanFragmenter,
catalog_reader: &CatalogReader,
worker_node_manager: &WorkerNodeManagerRef,
table_id: &TableId,
) -> Option<ParallelUnitMapping> {
fragmenter
.catalog_reader()
catalog_reader
.read_guard()
.get_table_by_id(table_id)
.map(|table| {
fragmenter
.worker_node_manager()
.get_fragment_mapping(&table.fragment_id)
})
.map(|table| worker_node_manager.get_fragment_mapping(&table.fragment_id))
.ok()
.flatten()
}
Expand Down
Loading

0 comments on commit c3bb027

Please sign in to comment.