From 02d9083372f544a2a903d736dbf0ede6badee1a3 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 1 Aug 2022 16:34:06 +0800 Subject: [PATCH 1/4] fix: only schedule dml to cn contains table source writer --- src/frontend/src/binder/insert.rs | 12 +++++++++- src/frontend/src/binder/update.rs | 22 +++++++++++++------ src/frontend/src/handler/dml.rs | 11 ++++++++-- .../scheduler/distributed/query_manager.rs | 21 +++++++++++++++++- 4 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index 70eb96426ffef..a983b5c5d775d 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, ParallelUnitId}; use risingwave_sqlparser::ast::{Ident, ObjectName, Query, SetExpr}; use super::{BoundQuery, BoundSetExpr}; @@ -26,6 +26,9 @@ pub struct BoundInsert { /// Used for injecting deletion chunks to the source. pub table_source: BoundTableSource, + /// Used for scheduling. + pub vnode_mapping: Option>, + pub source: BoundQuery, /// Used as part of an extra `Project` when the column types of `source` query does not match @@ -40,6 +43,7 @@ impl Binder { _columns: Vec, source: Query, ) -> Result { + let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?; let table_source = self.bind_table_source(source_name)?; let expected_types = table_source @@ -48,6 +52,11 @@ impl Binder { .map(|c| c.data_type.clone()) .collect(); + let vnode_mapping = self + .bind_table(&schema_name, &table_name, None)? + .table_catalog + .vnode_mapping; + // When the column types of `source` query does not match `expected_types`, casting is // needed. // @@ -112,6 +121,7 @@ impl Binder { let insert = BoundInsert { table_source, + vnode_mapping, source, cast_exprs, }; diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index bf25106121813..b569189e2c90b 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use itertools::Itertools; use risingwave_common::ensure; use risingwave_common::error::{ErrorCode, Result}; +use risingwave_common::types::ParallelUnitId; use risingwave_sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}; use super::{Binder, BoundTableSource, Relation}; @@ -29,6 +30,9 @@ pub struct BoundUpdate { /// Used for injecting new chunks to the source. pub table_source: BoundTableSource, + /// Used for scheduling. + pub vnode_mapping: Option>, + /// Used for scanning the records to update with the `selection`. pub table: Relation, @@ -46,14 +50,13 @@ impl Binder { assignments: Vec, selection: Option, ) -> Result { - let table_source = { - ensure!(table.joins.is_empty()); - let name = match &table.relation { - TableFactor::Table { name, .. } => name.clone(), - _ => unreachable!(), - }; - self.bind_table_source(name)? + ensure!(table.joins.is_empty()); + let source_name = match &table.relation { + TableFactor::Table { name, .. } => name.clone(), + _ => unreachable!(), }; + let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?; + let table_source = self.bind_table_source(source_name)?; if table_source.append_only { return Err(ErrorCode::BindError( @@ -61,6 +64,10 @@ impl Binder { ) .into()); } + let vnode_mapping = self + .bind_table(&schema_name, &table_name, None)? + .table_catalog + .vnode_mapping; let table = self.bind_vec_table_with_joins(vec![table])?.unwrap(); assert_matches!(table, Relation::BaseTable(_)); @@ -123,6 +130,7 @@ impl Binder { Ok(BoundUpdate { table_source, + vnode_mapping, table, selection, exprs, diff --git a/src/frontend/src/handler/dml.rs b/src/frontend/src/handler/dml.rs index 456740c9813db..9ec293e36e928 100644 --- a/src/frontend/src/handler/dml.rs +++ b/src/frontend/src/handler/dml.rs @@ -17,7 +17,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::Result; use risingwave_sqlparser::ast::Statement; -use crate::binder::Binder; +use crate::binder::{Binder, BoundStatement}; use crate::handler::privilege::{check_privileges, resolve_privileges}; use crate::handler::util::{to_pg_field, to_pg_rows}; use crate::planner::Planner; @@ -39,6 +39,13 @@ pub async fn handle_dml(context: OptimizerContext, stmt: Statement) -> Result insert.vnode_mapping.clone(), + BoundStatement::Update(update) => update.vnode_mapping.clone(), + BoundStatement::Delete(delete) => delete.table.table_catalog.vnode_mapping.clone(), + BoundStatement::Query(_) => unreachable!(), + }; + let (plan, pg_descs) = { // Subblock to make sure PlanRef (an Rc) is dropped before `await` below. let root = Planner::new(context.into()).plan(bound)?; @@ -54,7 +61,7 @@ pub async fn handle_dml(context: OptimizerContext, stmt: Statement) -> Result>, ) -> SchedulerResult { - let worker_node_addr = self.worker_node_manager.next_random()?.host.unwrap(); + let worker_node_addr = match vnodes { + Some(mut parallel_unit_ids) => { + parallel_unit_ids.dedup(); + let candidates = self + .worker_node_manager + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + let mut rng = rand::thread_rng(); + let die = Uniform::from(0..candidates.len()); + candidates + .get(die.sample(&mut rng)) + .unwrap() + .clone() + .host + .unwrap() + } + None => self.worker_node_manager.next_random()?.host.unwrap(), + }; let compute_client = self .compute_client_pool From 7c9130cf5a2b407ee9e9c63b92f6a1c736ee3d6c Mon Sep 17 00:00:00 2001 From: August Date: Mon, 1 Aug 2022 17:22:10 +0800 Subject: [PATCH 2/4] fix duplicate bind --- src/frontend/src/binder/insert.rs | 7 ++++--- src/frontend/src/binder/update.rs | 23 ++++++++++++----------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/frontend/src/binder/insert.rs b/src/frontend/src/binder/insert.rs index a983b5c5d775d..8919e30cef198 100644 --- a/src/frontend/src/binder/insert.rs +++ b/src/frontend/src/binder/insert.rs @@ -53,9 +53,10 @@ impl Binder { .collect(); let vnode_mapping = self - .bind_table(&schema_name, &table_name, None)? - .table_catalog - .vnode_mapping; + .catalog + .get_table_by_name(&self.db_name, &schema_name, &table_name)? + .vnode_mapping + .clone(); // When the column types of `source` query does not match `expected_types`, casting is // needed. diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index b569189e2c90b..53b52aa8ea615 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -51,12 +50,14 @@ impl Binder { selection: Option, ) -> Result { ensure!(table.joins.is_empty()); - let source_name = match &table.relation { - TableFactor::Table { name, .. } => name.clone(), - _ => unreachable!(), + let table_source = { + ensure!(table.joins.is_empty()); + let name = match &table.relation { + TableFactor::Table { name, .. } => name.clone(), + _ => unreachable!(), + }; + self.bind_table_source(name)? }; - let (schema_name, table_name) = Self::resolve_table_name(source_name.clone())?; - let table_source = self.bind_table_source(source_name)?; if table_source.append_only { return Err(ErrorCode::BindError( @@ -64,13 +65,13 @@ impl Binder { ) .into()); } - let vnode_mapping = self - .bind_table(&schema_name, &table_name, None)? - .table_catalog - .vnode_mapping; let table = self.bind_vec_table_with_joins(vec![table])?.unwrap(); - assert_matches!(table, Relation::BaseTable(_)); + let vnode_mapping = if let Relation::BaseTable(base_table) = &table { + base_table.table_catalog.vnode_mapping.clone() + } else { + unreachable!() + }; let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?; From 9d276b7b1067c7889a089310a5566dbf11f7a547 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 1 Aug 2022 17:30:56 +0800 Subject: [PATCH 3/4] choose --- src/frontend/src/scheduler/distributed/query_manager.rs | 6 ++---- src/frontend/src/scheduler/worker_node_manager.rs | 9 +++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index 4078cb89f1e19..02588edfbeb58 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -17,7 +17,7 @@ use std::fmt::{Debug, Formatter}; use futures::StreamExt; use futures_async_stream::try_stream; use log::debug; -use rand::distributions::{Distribution as RandDistribution, Uniform}; +use rand::seq::SliceRandom; use risingwave_common::array::DataChunk; use risingwave_common::error::RwError; use risingwave_common::types::ParallelUnitId; @@ -82,10 +82,8 @@ impl QueryManager { let candidates = self .worker_node_manager .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; - let mut rng = rand::thread_rng(); - let die = Uniform::from(0..candidates.len()); candidates - .get(die.sample(&mut rng)) + .choose(&mut rand::thread_rng()) .unwrap() .clone() .host diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/frontend/src/scheduler/worker_node_manager.rs index 0f8a7f8780d57..962a6073b6a54 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/frontend/src/scheduler/worker_node_manager.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock}; -use rand::distributions::{Distribution as RandDistribution, Uniform}; +use rand::seq::SliceRandom; use risingwave_common::bail; use risingwave_common::types::ParallelUnitId; use risingwave_common::util::worker_util::get_pu_to_worker_mapping; @@ -67,14 +67,15 @@ impl WorkerNodeManager { /// Get a random worker node. pub fn next_random(&self) -> SchedulerResult { let current_nodes = self.worker_nodes.read().unwrap(); - let mut rng = rand::thread_rng(); if current_nodes.is_empty() { tracing::error!("No worker node available."); bail!("No worker node available"); } - let die = Uniform::from(0..current_nodes.len()); - Ok(current_nodes.get(die.sample(&mut rng)).unwrap().clone()) + Ok(current_nodes + .choose(&mut rand::thread_rng()) + .unwrap() + .clone()) } pub fn worker_node_count(&self) -> usize { From a066bb5ea1ea3a24ff6d55cb52099072f5397ca1 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 1 Aug 2022 17:32:28 +0800 Subject: [PATCH 4/4] fix --- src/frontend/src/binder/update.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/frontend/src/binder/update.rs b/src/frontend/src/binder/update.rs index 53b52aa8ea615..da3aaba7c0e54 100644 --- a/src/frontend/src/binder/update.rs +++ b/src/frontend/src/binder/update.rs @@ -49,7 +49,6 @@ impl Binder { assignments: Vec, selection: Option, ) -> Result { - ensure!(table.joins.is_empty()); let table_source = { ensure!(table.joins.is_empty()); let name = match &table.relation {