From 9aa186992d26aa9411582fdf57bfbba43e1f6440 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 18 Aug 2022 18:16:44 +0800 Subject: [PATCH 1/9] feat: enforce chain parallelism and vnode mapping consistent with upstream actor --- proto/stream_plan.proto | 2 + src/compute/src/rpc/service/stream_service.rs | 4 +- .../stream_fragmenter/graph/fragment_graph.rs | 5 +++ src/frontend/src/stream_fragmenter/mod.rs | 1 + src/meta/src/manager/catalog/fragment.rs | 21 +++++++++ src/meta/src/model/stream.rs | 16 +++++++ .../src/rpc/service/notification_service.rs | 4 +- src/meta/src/stream/scheduler.rs | 1 + src/meta/src/stream/stream_graph.rs | 44 ++++++++----------- src/meta/src/stream/stream_manager.rs | 39 +++++++++------- src/meta/src/stream/test_fragmenter.rs | 3 ++ src/stream/src/task/stream_manager.rs | 14 ++---- 12 files changed, 99 insertions(+), 55 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index ceae75fb70257..25eb75b80b7ab 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -454,6 +454,8 @@ message StreamFragmentGraph { bool is_singleton = 4; // Number of table ids (stateful states) for this fragment. uint32 table_ids_cnt = 5; + // Mark the dependent table id of this fragment. + uint32 dependent_table_id = 6; } message StreamFragmentEdge { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 699b539a13450..b334c86392050 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -82,9 +82,9 @@ impl StreamService for StreamServiceImpl { &self, request: Request, ) -> std::result::Result, Status> { - let table = request.into_inner(); + let req = request.into_inner(); - let res = self.mgr.update_actor_info(table); + let res = self.mgr.update_actor_info(&req.info); match res { Err(e) => { error!("failed to update actor info table actor {}", e); diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index a0a6294d31059..163190b63cd17 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -40,6 +40,9 @@ pub struct StreamFragment { /// Number of table ids (stateful states) for this fragment. pub table_ids_cnt: u32, + + /// Mark the dependent table id of this fragment. + pub dependent_table_id: Option, } /// An edge between the nodes in the fragment graph. @@ -66,6 +69,7 @@ impl StreamFragment { is_singleton: false, node: None, table_ids_cnt: 0, + dependent_table_id: None, } } @@ -76,6 +80,7 @@ impl StreamFragment { fragment_type: self.fragment_type as i32, is_singleton: self.is_singleton, table_ids_cnt: self.table_ids_cnt, + dependent_table_id: self.dependent_table_id.unwrap_or(0), } } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 493deaced3745..3d0b7dd99b615 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -207,6 +207,7 @@ impl StreamFragmenter { state .dependent_table_ids .insert(TableId::new(node.table_id)); + current_fragment.dependent_table_id = Some(node.table_id); current_fragment.is_singleton = node.is_singleton; } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index ee9407e3cbc82..4a52e4143fb20 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::ParallelUnitId; use risingwave_common::{bail, try_match_expand}; @@ -526,6 +527,26 @@ where Ok(info) } + pub async fn get_sink_vnode_mapping_info( + &self, + table_ids: &HashSet, + ) -> MetaResult)>>> { + let map = &self.core.read().await.table_fragments; + let mut info: HashMap)>> = HashMap::new(); + + for table_id in table_ids { + match map.get(table_id) { + Some(table_fragment) => { + info.insert(*table_id, table_fragment.sink_vnode_mapping_info()); + } + None => { + bail!("table_fragment not exist: id={}", table_id); + } + } + } + Ok(info) + } + pub async fn get_sink_parallel_unit_ids( &self, table_ids: &HashSet, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index ad60cad8cbee9..f6ed9189f644e 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use itertools::Itertools; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::ParallelUnitId; use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; @@ -317,6 +318,21 @@ impl TableFragments { actor_map } + pub fn sink_vnode_mapping_info(&self) -> Vec<(ActorId, Option)> { + self.fragments + .values() + .filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32) + .flat_map(|fragment| { + fragment.actors.iter().map(|actor| { + ( + actor.actor_id, + actor.vnode_bitmap.as_ref().map(Bitmap::from), + ) + }) + }) + .collect_vec() + } + pub fn parallel_unit_sink_actor_id(&self) -> BTreeMap { let sink_actor_ids = self.sink_actor_ids(); sink_actor_ids diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index 6ab9612d792cc..4480a6d8f40ca 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -101,7 +101,7 @@ where .collect_vec(); let hummock_snapshot = Some(self.hummock_manager.get_last_epoch().unwrap()); - let hummock_manager_rdguard = self.hummock_manager.get_read_guard().await; + let hummock_manager_guard = self.hummock_manager.get_read_guard().await; // Send the snapshot on subscription. After that we will send only updates. let meta_snapshot = match worker_type { @@ -133,7 +133,7 @@ where MetaSnapshot { tables, - hummock_version: Some(hummock_manager_rdguard.current_version.clone()), + hummock_version: Some(hummock_manager_guard.current_version.clone()), ..Default::default() } } diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index 8cf6ad0b94d98..b4f309e8fadb0 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -183,6 +183,7 @@ where // FIXME(Kexiang): select appropriate parallel_units, currently only support // `parallel_degree < parallel_units.size()` parallel_units.truncate(fragment.actors.len()); + // parallel_units.shuffle(&mut rand::thread_rng()); // Build vnode mapping according to the parallel units. let vnode_mapping = self.set_fragment_vnode_mapping(fragment, ¶llel_units)?; diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index e23507c26f3dd..893dfc157d410 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -775,8 +775,8 @@ impl BuildActorGraphState { /// [`ActorGraphBuilder`] generates the proto for interconnected actors for a streaming pipeline. pub struct ActorGraphBuilder { - /// GlobalFragmentId -> parallel_degree - parallelisms: HashMap, + /// Default parallelism. + default_parallelism: u32, fragment_graph: StreamFragmentGraph, } @@ -813,23 +813,8 @@ impl ActorGraphBuilder { let fragment_graph = StreamFragmentGraph::from_protobuf(fragment_graph.clone(), offset); - // TODO(Kexiang): now simply use Count(ParallelUnit) as parallelism of each fragment - let parallelisms: HashMap = fragment_graph - .fragments() - .iter() - .map(|(id, fragment)| { - let id = id.as_global_id(); - let parallel_degree = if fragment.is_singleton { - 1 - } else { - default_parallelism - }; - (id, parallel_degree) - }) - .collect(); - Ok(Self { - parallelisms, + default_parallelism, fragment_graph, }) } @@ -874,7 +859,7 @@ impl ActorGraphBuilder { state.stream_graph_builder.fill_info(info); // Generate actors of the streaming plan - self.build_actor_graph(&mut state, &self.fragment_graph)?; + self.build_actor_graph(&mut state, &self.fragment_graph, ctx)?; state }; @@ -925,6 +910,7 @@ impl ActorGraphBuilder { &self, state: &mut BuildActorGraphState, fragment_graph: &StreamFragmentGraph, + ctx: &CreateMaterializedViewContext, ) -> MetaResult<()> { // Use topological sort to build the graph from downstream to upstream. (The first fragment // poped out from the heap will be the top-most node in plan, or the sink in stream graph.) @@ -944,7 +930,7 @@ impl ActorGraphBuilder { while let Some(fragment_id) = actionable_fragment_id.pop_front() { // Build the actors corresponding to the fragment - self.build_actor_graph_fragment(fragment_id, state, fragment_graph)?; + self.build_actor_graph_fragment(fragment_id, state, fragment_graph, ctx)?; // Find if we can process more fragments for upstream_id in fragment_graph.get_upstreams(fragment_id).keys() { @@ -972,14 +958,22 @@ impl ActorGraphBuilder { fragment_id: GlobalFragmentId, state: &mut BuildActorGraphState, fragment_graph: &StreamFragmentGraph, + ctx: &CreateMaterializedViewContext, ) -> MetaResult<()> { let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone(); - let parallel_degree = self - .parallelisms - .get(&fragment_id.as_global_id()) - .unwrap() - .to_owned(); + let parallel_degree = if current_fragment.is_singleton { + 1 + } else if current_fragment.dependent_table_id != 0 { + // set fragment parallelism to the parallelism of its dependent table. + let upstream_actors = ctx + .table_sink_map + .get(&TableId::new(current_fragment.dependent_table_id)) + .expect("upstream actor should exist"); + upstream_actors.len() as u32 + } else { + self.default_parallelism + }; let node = Arc::new(current_fragment.node.unwrap()); let actor_ids = state diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index a521d3aa5894b..f8b993671c34d 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT}; use risingwave_pb::catalog::{Source, Table}; @@ -151,6 +152,8 @@ where struct Env<'a> { /// Records what's the corresponding actor of each parallel unit of one table. upstream_parallel_unit_info: &'a HashMap>, + /// Records each upstream mview actor's vnode mapping info. + upstream_vnode_mapping_info: &'a HashMap)>>, /// Records what's the actors on each worker of one table. tables_worker_actors: &'a HashMap>>, /// Schedule information of all actors. @@ -166,39 +169,36 @@ where &mut self, stream_node: &mut StreamNode, actor_id: ActorId, + vnode_mapping: &Option, same_worker_node_as_upstream: bool, is_singleton: bool, ) -> MetaResult<()> { let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else { // If node is not chain node, recursively deal with input nodes for input in &mut stream_node.input { - self.resolve_chain_node_inner(input, actor_id, same_worker_node_as_upstream, is_singleton)?; + self.resolve_chain_node_inner(input, actor_id, vnode_mapping, same_worker_node_as_upstream, is_singleton)?; } return Ok(()); }; // get upstream table id let table_id = TableId::new(chain.table_id); - - // FIXME: We assume that the chain node is always on the same parallel unit as its - // upstream materialize node here to find the upstream actor. let upstream_actor_id = { - // 1. use table id to get upstream parallel_unit -> actor_id mapping - let upstream_parallel_actor_mapping = - &self.upstream_parallel_unit_info[&table_id]; + // 1. use table id to get upstream vnode mapping info: [(actor_id, + // option(vnode_mapping))] + let upstream_vnode_mapping_info = &self.upstream_vnode_mapping_info[&table_id]; if is_singleton { // Directly find the singleton actor id. - *upstream_parallel_actor_mapping - .values() - .exactly_one() - .unwrap() + upstream_vnode_mapping_info.iter().exactly_one().unwrap().0 } else { - // 2. use our actor id to get parallel unit id of the chain actor - let parallel_unit_id = self.locations.actor_locations[&actor_id].id; - // 3. and use chain actor's parallel unit id to get the corresponding - // upstream actor id - upstream_parallel_actor_mapping[¶llel_unit_id] + // 2. find the upstream actor id by vnode mapping. + assert!(vnode_mapping.is_some()); + upstream_vnode_mapping_info + .iter() + .find(|(_, bitmap)| bitmap == vnode_mapping) + .unwrap() + .0 } }; @@ -282,6 +282,11 @@ where .get_sink_parallel_unit_ids(dependent_table_ids) .await?; + let upstream_vnode_mapping_info = &self + .fragment_manager + .get_sink_vnode_mapping_info(dependent_table_ids) + .await?; + let tables_worker_actors = &self .fragment_manager .get_tables_worker_actors(dependent_table_ids) @@ -289,6 +294,7 @@ where let mut env = Env { upstream_parallel_unit_info, + upstream_vnode_mapping_info, tables_worker_actors, locations, dispatchers, @@ -304,6 +310,7 @@ where env.resolve_chain_node_inner( stream_node, actor.actor_id, + &actor.vnode_bitmap.as_ref().map(Bitmap::from), actor.same_worker_node_as_upstream, is_singleton, )?; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 75cb01439c8f1..d84602c1f64c2 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -144,6 +144,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Source as i32, is_singleton: false, table_ids_cnt: 0, + dependent_table_id: 0, }); // exchange node @@ -216,6 +217,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Others as i32, is_singleton: false, table_ids_cnt: 0, + dependent_table_id: 0, }); // exchange node @@ -303,6 +305,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Sink as i32, is_singleton: true, table_ids_cnt: 0, + dependent_table_id: 0, }); fragments diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index d28932f76a969..2bfd990ab3306 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -323,12 +323,9 @@ impl LocalStreamManager { /// This function could only be called once during the lifecycle of `LocalStreamManager` for /// now. - pub fn update_actor_info( - &self, - req: stream_service::BroadcastActorInfoTableRequest, - ) -> Result<()> { + pub fn update_actor_info(&self, actor_infos: &[ActorInfo]) -> Result<()> { let mut core = self.core.lock(); - core.update_actor_info(req) + core.update_actor_info(actor_infos) } /// This function could only be called once during the lifecycle of `LocalStreamManager` for @@ -644,12 +641,9 @@ impl LocalStreamManagerCore { .collect::>>() } - fn update_actor_info( - &mut self, - req: stream_service::BroadcastActorInfoTableRequest, - ) -> Result<()> { + fn update_actor_info(&mut self, new_actor_infos: &[ActorInfo]) -> Result<()> { let mut actor_infos = self.context.actor_infos.write(); - for actor in req.get_info() { + for actor in new_actor_infos { let ret = actor_infos.insert(actor.get_actor_id(), actor.clone()); if let Some(prev_actor) = ret && actor != &prev_actor{ return Err(ErrorCode::InternalError(format!( From 8239d5e5c9fe1479bb92f97d0bd4d5d5babaeff9 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 19 Aug 2022 12:37:11 +0800 Subject: [PATCH 2/9] use buffer directly --- src/meta/src/manager/catalog/fragment.rs | 7 +++---- src/meta/src/model/stream.rs | 18 ++++++++---------- src/meta/src/stream/scheduler.rs | 1 - src/meta/src/stream/stream_manager.rs | 9 ++++----- 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 4a52e4143fb20..9fe55997a529e 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -18,11 +18,10 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::ParallelUnitId; use risingwave_common::{bail, try_match_expand}; -use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping, WorkerNode}; +use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode}; use risingwave_pb::meta::table_fragments::ActorState; use risingwave_pb::stream_plan::{Dispatcher, FragmentType, StreamActor}; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -530,9 +529,9 @@ where pub async fn get_sink_vnode_mapping_info( &self, table_ids: &HashSet, - ) -> MetaResult)>>> { + ) -> MetaResult)>>> { let map = &self.core.read().await.table_fragments; - let mut info: HashMap)>> = HashMap::new(); + let mut info: HashMap)>> = HashMap::new(); for table_id in table_ids { match map.get(table_id) { diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index f6ed9189f644e..a244357a5902a 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -15,10 +15,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use itertools::Itertools; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::ParallelUnitId; -use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping}; +use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping}; use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus, Fragment}; use risingwave_pb::meta::TableFragments as ProstTableFragments; use risingwave_pb::stream_plan::source_node::SourceType; @@ -318,17 +317,16 @@ impl TableFragments { actor_map } - pub fn sink_vnode_mapping_info(&self) -> Vec<(ActorId, Option)> { + /// Returns sink actor vnode mapping infos. + pub fn sink_vnode_mapping_info(&self) -> Vec<(ActorId, Option)> { self.fragments .values() .filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32) .flat_map(|fragment| { - fragment.actors.iter().map(|actor| { - ( - actor.actor_id, - actor.vnode_bitmap.as_ref().map(Bitmap::from), - ) - }) + fragment + .actors + .iter() + .map(|actor| (actor.actor_id, actor.vnode_bitmap.clone())) }) .collect_vec() } @@ -346,7 +344,7 @@ impl TableFragments { .collect() } - /// Generate toplogical order of fragments. If `index(a) < index(b)` in vec, then a is the + /// Generate topological order of fragments. If `index(a) < index(b)` in vec, then a is the /// downstream of b. pub fn generate_topological_order(&self) -> Vec { let mut actionable_fragment_id = VecDeque::new(); diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index b4f309e8fadb0..8cf6ad0b94d98 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -183,7 +183,6 @@ where // FIXME(Kexiang): select appropriate parallel_units, currently only support // `parallel_degree < parallel_units.size()` parallel_units.truncate(fragment.actors.len()); - // parallel_units.shuffle(&mut rand::thread_rng()); // Build vnode mapping according to the parallel units. let vnode_mapping = self.set_fragment_vnode_mapping(fragment, ¶llel_units)?; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f8b993671c34d..832ecd02aaabb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -17,11 +17,10 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT}; use risingwave_pb::catalog::{Source, Table}; -use risingwave_pb::common::{ActorInfo, ParallelUnitMapping, WorkerType}; +use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnitMapping, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus}; @@ -153,7 +152,7 @@ where /// Records what's the corresponding actor of each parallel unit of one table. upstream_parallel_unit_info: &'a HashMap>, /// Records each upstream mview actor's vnode mapping info. - upstream_vnode_mapping_info: &'a HashMap)>>, + upstream_vnode_mapping_info: &'a HashMap)>>, /// Records what's the actors on each worker of one table. tables_worker_actors: &'a HashMap>>, /// Schedule information of all actors. @@ -169,7 +168,7 @@ where &mut self, stream_node: &mut StreamNode, actor_id: ActorId, - vnode_mapping: &Option, + vnode_mapping: &Option, same_worker_node_as_upstream: bool, is_singleton: bool, ) -> MetaResult<()> { @@ -310,7 +309,7 @@ where env.resolve_chain_node_inner( stream_node, actor.actor_id, - &actor.vnode_bitmap.as_ref().map(Bitmap::from), + &actor.vnode_bitmap, actor.same_worker_node_as_upstream, is_singleton, )?; From ba9808def01b531613a5a7127158c9e23aa8d5ee Mon Sep 17 00:00:00 2001 From: August Date: Fri, 19 Aug 2022 19:51:05 +0800 Subject: [PATCH 3/9] schedule chain fragment when create mv --- proto/stream_plan.proto | 4 +- .../stream_fragmenter/graph/fragment_graph.rs | 8 +- src/frontend/src/stream_fragmenter/mod.rs | 2 +- src/meta/src/manager/catalog/fragment.rs | 32 +++- src/meta/src/model/stream.rs | 41 +++-- src/meta/src/stream/mod.rs | 6 +- src/meta/src/stream/scheduler.rs | 8 +- src/meta/src/stream/stream_graph.rs | 31 +++- src/meta/src/stream/stream_manager.rs | 155 ++++++++++-------- src/meta/src/stream/test_fragmenter.rs | 6 +- 10 files changed, 180 insertions(+), 113 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index c4d08b3fa0538..66425ca16cd74 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -454,8 +454,8 @@ message StreamFragmentGraph { bool is_singleton = 4; // Number of table ids (stateful states) for this fragment. uint32 table_ids_cnt = 5; - // Mark the dependent table id of this fragment. - uint32 dependent_table_id = 6; + // Mark the upstream table ids of this fragment. + repeated uint32 upstream_table_ids = 6; } message StreamFragmentEdge { diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index 163190b63cd17..bb29d3872a4c8 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -41,8 +41,8 @@ pub struct StreamFragment { /// Number of table ids (stateful states) for this fragment. pub table_ids_cnt: u32, - /// Mark the dependent table id of this fragment. - pub dependent_table_id: Option, + /// Mark the upstream table ids of this fragment. + pub upstream_table_ids: Vec, } /// An edge between the nodes in the fragment graph. @@ -69,7 +69,7 @@ impl StreamFragment { is_singleton: false, node: None, table_ids_cnt: 0, - dependent_table_id: None, + upstream_table_ids: vec![], } } @@ -80,7 +80,7 @@ impl StreamFragment { fragment_type: self.fragment_type as i32, is_singleton: self.is_singleton, table_ids_cnt: self.table_ids_cnt, - dependent_table_id: self.dependent_table_id.unwrap_or(0), + upstream_table_ids: self.upstream_table_ids.clone(), } } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 8b609bb3ee774..869e9869e4c8c 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -207,7 +207,7 @@ impl StreamFragmenter { state .dependent_table_ids .insert(TableId::new(node.table_id)); - current_fragment.dependent_table_id = Some(node.table_id); + current_fragment.upstream_table_ids.push(node.table_id); current_fragment.is_singleton = node.is_singleton; } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 9fe55997a529e..0ef56788714c2 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::types::ParallelUnitId; use risingwave_common::{bail, try_match_expand}; use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode}; use risingwave_pb::meta::table_fragments::ActorState; @@ -76,6 +75,14 @@ pub struct ActorInfos { pub source_actor_maps: HashMap>, } +pub struct FragmentVNodeInfo { + /// actor id => parallel unit + pub actor_parallel_unit_maps: BTreeMap, + + /// fragment vnode mapping info + pub vnode_mapping: Option, +} + #[derive(Default)] pub struct BuildGraphInfo { pub table_node_actors: HashMap>>, @@ -526,17 +533,17 @@ where Ok(info) } - pub async fn get_sink_vnode_mapping_info( + pub async fn get_sink_vnode_bitmap_info( &self, table_ids: &HashSet, - ) -> MetaResult)>>> { + ) -> MetaResult>>> { let map = &self.core.read().await.table_fragments; - let mut info: HashMap)>> = HashMap::new(); + let mut info: HashMap>> = HashMap::new(); for table_id in table_ids { match map.get(table_id) { Some(table_fragment) => { - info.insert(*table_id, table_fragment.sink_vnode_mapping_info()); + info.insert(*table_id, table_fragment.sink_vnode_bitmap_info()); } None => { bail!("table_fragment not exist: id={}", table_id); @@ -546,18 +553,25 @@ where Ok(info) } - pub async fn get_sink_parallel_unit_ids( + pub async fn get_sink_fragment_vnode_info( &self, table_ids: &HashSet, - ) -> MetaResult>> { + ) -> MetaResult> { let map = &self.core.read().await.table_fragments; - let mut info: HashMap> = HashMap::new(); + let mut info: HashMap = HashMap::new(); for table_id in table_ids { match map.get(table_id) { Some(table_fragment) => { - info.insert(*table_id, table_fragment.parallel_unit_sink_actor_id()); + info.insert( + *table_id, + FragmentVNodeInfo { + actor_parallel_unit_maps: table_fragment.sink_actor_parallel_units(), + vnode_mapping: table_fragment.sink_vnode_mapping(), + }, + ); } + None => { bail!("table_fragment not exist: id={}", table_id); } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index a244357a5902a..f4d1fa4d84d89 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -119,6 +119,16 @@ impl TableFragments { self.table_id } + /// Returns sink fragment vnode mapping. + pub fn sink_vnode_mapping(&self) -> Option { + self.fragments + .values() + .find(|fragment| fragment.fragment_type == FragmentType::Sink as i32) + .unwrap() + .vnode_mapping + .clone() + } + /// Update state of all actors pub fn update_actors_state(&mut self, state: ActorState) { for actor_status in self.actor_status.values_mut() { @@ -317,8 +327,17 @@ impl TableFragments { actor_map } - /// Returns sink actor vnode mapping infos. - pub fn sink_vnode_mapping_info(&self) -> Vec<(ActorId, Option)> { + /// Returns fragment vnode mapping. + pub fn fragment_vnode_mapping(&self, fragment_id: FragmentId) -> Option { + if let Some(fragment) = self.fragments.get(&fragment_id) { + fragment.vnode_mapping.clone() + } else { + None + } + } + + /// Returns sink actor vnode bitmap infos. + pub fn sink_vnode_bitmap_info(&self) -> BTreeMap> { self.fragments .values() .filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32) @@ -328,17 +347,20 @@ impl TableFragments { .iter() .map(|actor| (actor.actor_id, actor.vnode_bitmap.clone())) }) - .collect_vec() + .collect() } - pub fn parallel_unit_sink_actor_id(&self) -> BTreeMap { + pub fn sink_actor_parallel_units(&self) -> BTreeMap { let sink_actor_ids = self.sink_actor_ids(); sink_actor_ids .iter() .map(|actor_id| { ( - self.actor_status[actor_id].get_parallel_unit().unwrap().id, *actor_id, + self.actor_status[actor_id] + .get_parallel_unit() + .unwrap() + .clone(), ) }) .collect() @@ -425,15 +447,6 @@ impl TableFragments { result } - /// Update table fragment map, this should be called after fragment scheduled. - pub fn update_table_fragment_map(&mut self, fragment_id: FragmentId) { - if let Some(fragment) = self.fragments.get(&fragment_id) { - for table_id in &fragment.state_table_ids { - self.table_to_fragment_map.insert(*table_id, fragment_id); - } - } - } - /// Returns the internal table ids without the mview table. pub fn internal_table_ids(&self) -> Vec { self.fragments diff --git a/src/meta/src/stream/mod.rs b/src/meta/src/stream/mod.rs index 389d30fcf6100..68bac1f020adf 100644 --- a/src/meta/src/stream/mod.rs +++ b/src/meta/src/stream/mod.rs @@ -29,8 +29,8 @@ pub use stream_manager::*; use crate::MetaResult; -/// Record vnode mapping for stateful operators in meta. -pub fn record_table_vnode_mappings( +/// Record internal table ids for stateful operators in meta. +pub fn record_internal_state_tables( stream_node: &StreamNode, fragment: &mut Fragment, ) -> MetaResult<()> { @@ -80,7 +80,7 @@ pub fn record_table_vnode_mappings( } let input_nodes = stream_node.get_input(); for input_node in input_nodes { - record_table_vnode_mappings(input_node, fragment)?; + record_internal_state_tables(input_node, fragment)?; } Ok(()) } diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index 8cf6ad0b94d98..d6b52a2884ee3 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -24,8 +24,8 @@ use risingwave_common::util::compress::compress_data; use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::Fragment; +use risingwave_pb::stream_plan::FragmentType; -use super::record_table_vnode_mappings; use crate::manager::{ClusterManagerRef, WorkerId, WorkerLocations}; use crate::model::ActorId; use crate::storage::MetaStore; @@ -238,11 +238,7 @@ where data, ..Default::default() }); - // Looking at the first actor is enough, since all actors in one fragment have identical - // state table id. - let actor = fragment.actors.first().unwrap(); - let stream_node = actor.get_nodes()?.clone(); - record_table_vnode_mappings(&stream_node, fragment)?; + Ok(vnode_mapping) } diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 2f2604894b8ba..171071f3dd8f7 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -38,6 +38,7 @@ use crate::manager::{ }; use crate::model::{ActorId, FragmentId}; use crate::storage::MetaStore; +use crate::stream::record_internal_state_tables; use crate::MetaResult; /// Id of an Actor, maybe local or global @@ -828,8 +829,19 @@ impl ActorGraphBuilder { where S: MetaStore, { - self.generate_graph_inner(id_gen_manager, fragment_manager, ctx) - .await + let mut graph = self + .generate_graph_inner(id_gen_manager, fragment_manager, ctx) + .await?; + + // Record internal state table ids. + for fragment in graph.values_mut() { + // Looking at the first actor is enough, since all actors in one fragment have + // identical state table id. + let actor = fragment.actors.first().unwrap(); + let stream_node = actor.get_nodes()?.clone(); + record_internal_state_tables(&stream_node, fragment)?; + } + Ok(graph) } /// Build a stream graph by duplicating each fragment as parallel actors. @@ -910,7 +922,7 @@ impl ActorGraphBuilder { &self, state: &mut BuildActorGraphState, fragment_graph: &StreamFragmentGraph, - ctx: &CreateMaterializedViewContext, + ctx: &mut CreateMaterializedViewContext, ) -> MetaResult<()> { // Use topological sort to build the graph from downstream to upstream. (The first fragment // poped out from the heap will be the top-most node in plan, or the sink in stream graph.) @@ -958,18 +970,25 @@ impl ActorGraphBuilder { fragment_id: GlobalFragmentId, state: &mut BuildActorGraphState, fragment_graph: &StreamFragmentGraph, - ctx: &CreateMaterializedViewContext, + ctx: &mut CreateMaterializedViewContext, ) -> MetaResult<()> { let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone(); let parallel_degree = if current_fragment.is_singleton { 1 - } else if current_fragment.dependent_table_id != 0 { + } else if !current_fragment.upstream_table_ids.is_empty() { // set fragment parallelism to the parallelism of its dependent table. let upstream_actors = ctx .table_sink_map - .get(&TableId::new(current_fragment.dependent_table_id)) + .get(&TableId::from( + *current_fragment + .upstream_table_ids + .iter() + .exactly_one() + .unwrap(), + )) .expect("upstream actor should exist"); + ctx.colocate_fragment_set.insert(fragment_id.as_global_id()); upstream_actors.len() as u32 } else { self.default_parallelism diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 832ecd02aaabb..795c6c40f8eaa 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -18,14 +18,16 @@ use std::sync::Arc; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; -use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT}; +use risingwave_common::types::VIRTUAL_NODE_COUNT; use risingwave_pb::catalog::{Source, Table}; use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnitMapping, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus}; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{ActorMapping, Dispatcher, DispatcherType, StreamNode}; +use risingwave_pb::stream_plan::{ + ActorMapping, Dispatcher, DispatcherType, FragmentType, StreamNode, +}; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, HangingChannel, UpdateActorsRequest, }; @@ -37,10 +39,10 @@ use super::ScheduledLocations; use crate::barrier::{BarrierManagerRef, Command}; use crate::hummock::compaction_group::manager::CompactionGroupManagerRef; use crate::manager::{ - ClusterManagerRef, DatabaseId, FragmentManagerRef, MetaSrvEnv, NotificationManagerRef, - Relation, SchemaId, WorkerId, + ClusterManagerRef, DatabaseId, FragmentManagerRef, FragmentVNodeInfo, MetaSrvEnv, + NotificationManagerRef, Relation, SchemaId, WorkerId, }; -use crate::model::{ActorId, TableFragments}; +use crate::model::{ActorId, FragmentId, TableFragments}; use crate::storage::MetaStore; use crate::stream::{fetch_source_fragments, Scheduler, SourceManagerRef}; use crate::MetaResult; @@ -64,6 +66,8 @@ pub struct CreateMaterializedViewContext { pub table_id_offset: u32, /// Internal TableID to Table mapping pub internal_table_id_map: HashMap>, + /// Chain fragment set, used to determine when to schedule a chain fragment. + pub colocate_fragment_set: HashSet, /// SchemaId of mview pub schema_id: SchemaId, /// DatabaseId of mview @@ -145,20 +149,26 @@ where dependent_table_ids: &HashSet, dispatchers: &mut HashMap>, upstream_worker_actors: &mut HashMap>, - locations: &ScheduledLocations, + locations: &mut ScheduledLocations, + colocate_fragment_set: &HashSet, ) -> MetaResult<()> { // The closure environment. Used to simulate recursive closure. struct Env<'a> { - /// Records what's the corresponding actor of each parallel unit of one table. - upstream_parallel_unit_info: &'a HashMap>, - /// Records each upstream mview actor's vnode mapping info. - upstream_vnode_mapping_info: &'a HashMap)>>, + /// Records what's the corresponding parallel unit of each actor and mview vnode + /// mapping info of one table. + upstream_fragment_vnode_info: &'a HashMap, + /// Records each upstream mview actor's vnode bitmap info. + upstream_vnode_bitmap_info: &'a mut HashMap>>, /// Records what's the actors on each worker of one table. tables_worker_actors: &'a HashMap>>, /// Schedule information of all actors. - locations: &'a ScheduledLocations, + locations: &'a mut ScheduledLocations, /// New dispatchers for this mview. dispatchers: &'a mut HashMap>, + /// New vnode bitmaps for chain actors. + actor_vnode_bitmaps: &'a mut HashMap>, + /// Record fragment upstream table id of each fragment. + fragment_upstream_table_mapping: &'a mut HashMap, /// Upstream Materialize actor ids grouped by worker id. upstream_worker_actors: &'a mut HashMap>, } @@ -168,60 +178,53 @@ where &mut self, stream_node: &mut StreamNode, actor_id: ActorId, - vnode_mapping: &Option, - same_worker_node_as_upstream: bool, + fragment_id: FragmentId, + _same_worker_node_as_upstream: bool, is_singleton: bool, ) -> MetaResult<()> { let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else { // If node is not chain node, recursively deal with input nodes for input in &mut stream_node.input { - self.resolve_chain_node_inner(input, actor_id, vnode_mapping, same_worker_node_as_upstream, is_singleton)?; + self.resolve_chain_node_inner(input, actor_id, fragment_id, _same_worker_node_as_upstream, is_singleton)?; } return Ok(()); }; // get upstream table id let table_id = TableId::new(chain.table_id); - let upstream_actor_id = { - // 1. use table id to get upstream vnode mapping info: [(actor_id, - // option(vnode_mapping))] - let upstream_vnode_mapping_info = &self.upstream_vnode_mapping_info[&table_id]; - - if is_singleton { - // Directly find the singleton actor id. - upstream_vnode_mapping_info.iter().exactly_one().unwrap().0 - } else { - // 2. find the upstream actor id by vnode mapping. - assert!(vnode_mapping.is_some()); - upstream_vnode_mapping_info - .iter() - .find(|(_, bitmap)| bitmap == vnode_mapping) - .unwrap() - .0 - } - }; - - // The current implementation already ensures chain and upstream are on the same - // worker node. So we do a sanity check here, in case that the logic get changed but - // `same_worker_node` constraint is not satisfied. - if same_worker_node_as_upstream { - // Parallel unit id is a globally unique id across all worker nodes. It can be - // seen as something like CPU core id. Therefore, we verify that actor's unit id - // == upstream's unit id. - - let actor_parallel_unit_id = - self.locations.actor_locations.get(&actor_id).unwrap().id; - - assert_eq!( - *self - .upstream_parallel_unit_info - .get(&table_id) - .unwrap() - .get(&actor_parallel_unit_id) - .unwrap(), - upstream_actor_id - ); + self.fragment_upstream_table_mapping + .insert(fragment_id, table_id); + // 1. use table id to get upstream vnode mapping info: [(actor_id, + // option(vnode_bitmap))] + let upstream_vnode_mapping_info = + &mut self.upstream_vnode_bitmap_info.get_mut(&table_id).unwrap(); + + if is_singleton { + // The upstream fragment should also be singleton. + assert_eq!(upstream_vnode_mapping_info.len(), 1); } + // Assign a upstream actor id to this chain node. + let (upstream_actor_id, upstream_vnode_bitmap) = upstream_vnode_mapping_info + .pop_first() + .expect("upstream actor not found"); + + // Here we force schedule the chain node to the same parallel unit as its upstream, + // so `same_worker_node_as_upstream` is not used here. If we want to + // support different parallel units, we need to keep the vnode bitmap and assign a + // new parallel unit with some other strategies. + let upstream_parallel_unit = self + .upstream_fragment_vnode_info + .get(&table_id) + .unwrap() + .actor_parallel_unit_maps + .get(&upstream_actor_id) + .unwrap() + .clone(); + self.locations + .actor_locations + .insert(actor_id, upstream_parallel_unit); + self.actor_vnode_bitmaps + .insert(actor_id, upstream_vnode_bitmap); // fill upstream node-actor info for later use let upstream_table_worker_actors = @@ -276,14 +279,14 @@ where } } - let upstream_parallel_unit_info = &self + let upstream_fragment_vnode_info = &self .fragment_manager - .get_sink_parallel_unit_ids(dependent_table_ids) + .get_sink_fragment_vnode_info(dependent_table_ids) .await?; - let upstream_vnode_mapping_info = &self + let upstream_vnode_bitmap_info = &mut self .fragment_manager - .get_sink_vnode_mapping_info(dependent_table_ids) + .get_sink_vnode_bitmap_info(dependent_table_ids) .await?; let tables_worker_actors = &self @@ -292,15 +295,21 @@ where .await?; let mut env = Env { - upstream_parallel_unit_info, - upstream_vnode_mapping_info, + upstream_fragment_vnode_info, + upstream_vnode_bitmap_info, tables_worker_actors, locations, dispatchers, + actor_vnode_bitmaps: &mut Default::default(), + fragment_upstream_table_mapping: &mut Default::default(), upstream_worker_actors, }; for fragment in table_fragments.fragments.values_mut() { + if !colocate_fragment_set.contains(&fragment.fragment_id) { + continue; + } + let is_singleton = fragment.get_distribution_type()? == FragmentDistributionType::Single; @@ -309,11 +318,24 @@ where env.resolve_chain_node_inner( stream_node, actor.actor_id, - &actor.vnode_bitmap, + fragment.fragment_id, actor.same_worker_node_as_upstream, is_singleton, )?; + // setup actor vnode bitmap. + actor.vnode_bitmap = env.actor_vnode_bitmaps.remove(&actor.actor_id).unwrap(); } + // setup fragment vnode mapping. + let upstream_table_id = env + .fragment_upstream_table_mapping + .get(&fragment.fragment_id) + .unwrap(); + fragment.vnode_mapping = env + .upstream_fragment_vnode_info + .get(upstream_table_id) + .unwrap() + .vnode_mapping + .clone(); } Ok(()) } @@ -339,6 +361,7 @@ where dependent_table_ids, table_properties, internal_table_id_map, + colocate_fragment_set, affiliated_source, .. }: &mut CreateMaterializedViewContext, @@ -358,7 +381,7 @@ where // Schedule actors to parallel units. `locations` will record the parallel unit that an // actor is scheduled to, and the worker node this parallel unit is on. - let locations = { + let mut locations = { // List all running worker nodes. let workers = self .cluster_manager @@ -374,13 +397,14 @@ where // Create empty locations. let mut locations = ScheduledLocations::with_workers(workers); - // Schedule each fragment(actors) to nodes, recorded in `locations`. + // Schedule each fragment(actors) to nodes except chain, recorded in `locations`. // Vnode mapping in fragment will be filled in as well. let topological_order = table_fragments.generate_topological_order(); for fragment_id in topological_order { let fragment = table_fragments.fragments.get_mut(&fragment_id).unwrap(); - self.scheduler.schedule(fragment, &mut locations).await?; - table_fragments.update_table_fragment_map(fragment_id); + if !colocate_fragment_set.contains(&fragment_id) { + self.scheduler.schedule(fragment, &mut locations).await?; + } } locations @@ -394,7 +418,8 @@ where dependent_table_ids, dispatchers, upstream_worker_actors, - &locations, + &mut locations, + colocate_fragment_set, ) .await?; diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 91d4581ded852..3b9491ebcc359 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -144,7 +144,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Source as i32, is_singleton: false, table_ids_cnt: 0, - dependent_table_id: 0, + upstream_table_ids: vec![], }); // exchange node @@ -217,7 +217,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Others as i32, is_singleton: false, table_ids_cnt: 0, - dependent_table_id: 0, + upstream_table_ids: vec![], }); // exchange node @@ -305,7 +305,7 @@ fn make_stream_fragments() -> Vec { fragment_type: FragmentType::Sink as i32, is_singleton: true, table_ids_cnt: 0, - dependent_table_id: 0, + upstream_table_ids: vec![], }); fragments From b113826fe020e28d855c5339908fc5c335398056 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 19 Aug 2022 20:15:25 +0800 Subject: [PATCH 4/9] fix mutl-times upstream mview dependency --- src/meta/src/manager/catalog/fragment.rs | 4 +-- src/meta/src/model/stream.rs | 4 +-- src/meta/src/stream/scheduler.rs | 1 - src/meta/src/stream/stream_manager.rs | 44 ++++++++++++------------ 4 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 0ef56788714c2..7afb012d0c27a 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -536,9 +536,9 @@ where pub async fn get_sink_vnode_bitmap_info( &self, table_ids: &HashSet, - ) -> MetaResult>>> { + ) -> MetaResult)>>> { let map = &self.core.read().await.table_fragments; - let mut info: HashMap>> = HashMap::new(); + let mut info: HashMap)>> = HashMap::new(); for table_id in table_ids { match map.get(table_id) { diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index f4d1fa4d84d89..6bb452ef03f23 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -337,7 +337,7 @@ impl TableFragments { } /// Returns sink actor vnode bitmap infos. - pub fn sink_vnode_bitmap_info(&self) -> BTreeMap> { + pub fn sink_vnode_bitmap_info(&self) -> Vec<(ActorId, Option)> { self.fragments .values() .filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32) @@ -347,7 +347,7 @@ impl TableFragments { .iter() .map(|actor| (actor.actor_id, actor.vnode_bitmap.clone())) }) - .collect() + .collect_vec() } pub fn sink_actor_parallel_units(&self) -> BTreeMap { diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index d6b52a2884ee3..7d8db97fc1eb8 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -24,7 +24,6 @@ use risingwave_common::util::compress::compress_data; use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, WorkerNode}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::Fragment; -use risingwave_pb::stream_plan::FragmentType; use crate::manager::{ClusterManagerRef, WorkerId, WorkerLocations}; use crate::model::ActorId; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 795c6c40f8eaa..5d9547c817223 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -25,9 +25,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus}; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{ - ActorMapping, Dispatcher, DispatcherType, FragmentType, StreamNode, -}; +use risingwave_pb::stream_plan::{ActorMapping, Dispatcher, DispatcherType, StreamNode}; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, HangingChannel, UpdateActorsRequest, }; @@ -158,7 +156,7 @@ where /// mapping info of one table. upstream_fragment_vnode_info: &'a HashMap, /// Records each upstream mview actor's vnode bitmap info. - upstream_vnode_bitmap_info: &'a mut HashMap>>, + upstream_vnode_bitmap_info: &'a mut HashMap)>>, /// Records what's the actors on each worker of one table. tables_worker_actors: &'a HashMap>>, /// Schedule information of all actors. @@ -179,13 +177,14 @@ where stream_node: &mut StreamNode, actor_id: ActorId, fragment_id: FragmentId, + upstream_actor_idx: usize, _same_worker_node_as_upstream: bool, is_singleton: bool, ) -> MetaResult<()> { let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else { // If node is not chain node, recursively deal with input nodes for input in &mut stream_node.input { - self.resolve_chain_node_inner(input, actor_id, fragment_id, _same_worker_node_as_upstream, is_singleton)?; + self.resolve_chain_node_inner(input, actor_id, fragment_id, upstream_actor_idx, _same_worker_node_as_upstream, is_singleton)?; } return Ok(()); }; @@ -196,17 +195,17 @@ where .insert(fragment_id, table_id); // 1. use table id to get upstream vnode mapping info: [(actor_id, // option(vnode_bitmap))] - let upstream_vnode_mapping_info = - &mut self.upstream_vnode_bitmap_info.get_mut(&table_id).unwrap(); - - if is_singleton { - // The upstream fragment should also be singleton. - assert_eq!(upstream_vnode_mapping_info.len(), 1); - } - // Assign a upstream actor id to this chain node. - let (upstream_actor_id, upstream_vnode_bitmap) = upstream_vnode_mapping_info - .pop_first() - .expect("upstream actor not found"); + let upstream_vnode_mapping_info = &self.upstream_vnode_bitmap_info[&table_id]; + + let (upstream_actor_id, upstream_vnode_bitmap) = { + if is_singleton { + // The upstream fragment should also be singleton. + upstream_vnode_mapping_info.iter().exactly_one().unwrap() + } else { + // Assign a upstream actor id to this chain node. + &upstream_vnode_mapping_info[upstream_actor_idx] + } + }; // Here we force schedule the chain node to the same parallel unit as its upstream, // so `same_worker_node_as_upstream` is not used here. If we want to @@ -217,14 +216,14 @@ where .get(&table_id) .unwrap() .actor_parallel_unit_maps - .get(&upstream_actor_id) + .get(upstream_actor_id) .unwrap() .clone(); self.locations .actor_locations .insert(actor_id, upstream_parallel_unit); self.actor_vnode_bitmaps - .insert(actor_id, upstream_vnode_bitmap); + .insert(actor_id, upstream_vnode_bitmap.clone()); // fill upstream node-actor info for later use let upstream_table_worker_actors = @@ -235,7 +234,7 @@ where .flat_map(|(worker_id, actor_ids)| { actor_ids.iter().map(|actor_id| (*worker_id, *actor_id)) }) - .filter(|(_, actor_id)| upstream_actor_id == *actor_id) + .filter(|(_, actor_id)| upstream_actor_id == actor_id) .into_group_map(); for (worker_id, actor_ids) in chain_upstream_worker_actors { self.upstream_worker_actors @@ -255,7 +254,7 @@ where let Some(NodeBody::Merge(ref mut merge)) = merge_stream_node.node_body else { unreachable!("chain's input[0] should always be merge"); }; - merge.upstream_actor_id.push(upstream_actor_id); + merge.upstream_actor_id.push(*upstream_actor_id); // finally, we should also build dispatcher infos here. // @@ -264,7 +263,7 @@ where // `NoShuffle` dispatcher here. // TODO: support different parallel unit and distribution for new MV. self.dispatchers - .entry(upstream_actor_id) + .entry(*upstream_actor_id) .or_default() .push(Dispatcher { r#type: DispatcherType::NoShuffle as _, @@ -313,12 +312,13 @@ where let is_singleton = fragment.get_distribution_type()? == FragmentDistributionType::Single; - for actor in &mut fragment.actors { + for (idx, actor) in &mut fragment.actors.iter_mut().enumerate() { let stream_node = actor.nodes.as_mut().unwrap(); env.resolve_chain_node_inner( stream_node, actor.actor_id, fragment.fragment_id, + idx, actor.same_worker_node_as_upstream, is_singleton, )?; From 0bd0934731d022b444b78af72150011e607beaae Mon Sep 17 00:00:00 2001 From: August Date: Fri, 19 Aug 2022 20:55:41 +0800 Subject: [PATCH 5/9] fix singleton --- proto/stream_plan.proto | 1 + src/meta/src/model/stream.rs | 6 +++--- src/meta/src/stream/stream_graph.rs | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index a1fca4faa2c03..ab14cbc31f550 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -442,6 +442,7 @@ enum FragmentType { FRAGMENT_UNSPECIFIED = 0; OTHERS = 1; SOURCE = 2; + // TODO: change it to MATERIALIZED_VIEW or other name, since we have sink type now. SINK = 3; } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 6bb452ef03f23..0ce0bcfe954a9 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -120,13 +120,13 @@ impl TableFragments { } /// Returns sink fragment vnode mapping. + /// Note that: the real sink fragment is also stored as `TableFragments`, it's possible that + /// there's no fragment with FragmentType::Sink exists. pub fn sink_vnode_mapping(&self) -> Option { self.fragments .values() .find(|fragment| fragment.fragment_type == FragmentType::Sink as i32) - .unwrap() - .vnode_mapping - .clone() + .and_then(|fragment| fragment.vnode_mapping.clone()) } /// Update state of all actors diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 171071f3dd8f7..7717f7e2cf96b 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -973,6 +973,9 @@ impl ActorGraphBuilder { ctx: &mut CreateMaterializedViewContext, ) -> MetaResult<()> { let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone(); + if !current_fragment.upstream_table_ids.is_empty() { + ctx.colocate_fragment_set.insert(fragment_id.as_global_id()); + } let parallel_degree = if current_fragment.is_singleton { 1 @@ -988,7 +991,6 @@ impl ActorGraphBuilder { .unwrap(), )) .expect("upstream actor should exist"); - ctx.colocate_fragment_set.insert(fragment_id.as_global_id()); upstream_actors.len() as u32 } else { self.default_parallelism From b626e72de854202f0c33d60b5aa39bdff12f1ac5 Mon Sep 17 00:00:00 2001 From: August Date: Fri, 19 Aug 2022 21:00:34 +0800 Subject: [PATCH 6/9] clippy --- src/meta/src/model/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 0ce0bcfe954a9..f218cde62cb09 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -121,7 +121,7 @@ impl TableFragments { /// Returns sink fragment vnode mapping. /// Note that: the real sink fragment is also stored as `TableFragments`, it's possible that - /// there's no fragment with FragmentType::Sink exists. + /// there's no fragment with `FragmentType::Sink` exists. pub fn sink_vnode_mapping(&self) -> Option { self.fragments .values() From a89fcfcc26dae672012c1be7e561e32256513430 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 22 Aug 2022 13:21:30 +0800 Subject: [PATCH 7/9] code clean --- src/meta/src/stream/stream_graph.rs | 12 ++++++++++- src/meta/src/stream/stream_manager.rs | 29 +++++++++++---------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 7717f7e2cf96b..22756cfdfc5d5 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -974,7 +974,17 @@ impl ActorGraphBuilder { ) -> MetaResult<()> { let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone(); if !current_fragment.upstream_table_ids.is_empty() { - ctx.colocate_fragment_set.insert(fragment_id.as_global_id()); + assert_eq!(current_fragment.upstream_table_ids.len(), 1); + ctx.fragment_upstream_table_map.insert( + fragment_id.as_global_id(), + TableId::new( + *current_fragment + .upstream_table_ids + .iter() + .exactly_one() + .unwrap(), + ), + ); } let parallel_degree = if current_fragment.is_singleton { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f31fe699b2fb8..70eea77ec8821 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -64,8 +64,11 @@ pub struct CreateMaterializedViewContext { pub table_id_offset: u32, /// Internal TableID to Table mapping pub internal_table_id_map: HashMap>, - /// Chain fragment set, used to determine when to schedule a chain fragment. - pub colocate_fragment_set: HashSet, + /// The set of fragments with their upstream table ids, these fragments need to be colocated + /// with their upstream tables. Specifically, they are fragments containing chain nodes. + /// + /// They are scheduled in `resolve_chain_node`. + pub fragment_upstream_table_map: HashMap, /// SchemaId of mview pub schema_id: SchemaId, /// DatabaseId of mview @@ -148,7 +151,7 @@ where dispatchers: &mut HashMap>, upstream_worker_actors: &mut HashMap>, locations: &mut ScheduledLocations, - colocate_fragment_set: &HashSet, + fragment_upstream_table_map: &HashMap, ) -> MetaResult<()> { // The closure environment. Used to simulate recursive closure. struct Env<'a> { @@ -165,8 +168,6 @@ where dispatchers: &'a mut HashMap>, /// New vnode bitmaps for chain actors. actor_vnode_bitmaps: &'a mut HashMap>, - /// Record fragment upstream table id of each fragment. - fragment_upstream_table_mapping: &'a mut HashMap, /// Upstream Materialize actor ids grouped by worker id. upstream_worker_actors: &'a mut HashMap>, } @@ -176,7 +177,6 @@ where &mut self, stream_node: &mut StreamNode, actor_id: ActorId, - fragment_id: FragmentId, upstream_actor_idx: usize, _same_worker_node_as_upstream: bool, is_singleton: bool, @@ -184,15 +184,13 @@ where let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else { // If node is not chain node, recursively deal with input nodes for input in &mut stream_node.input { - self.resolve_chain_node_inner(input, actor_id, fragment_id, upstream_actor_idx, _same_worker_node_as_upstream, is_singleton)?; + self.resolve_chain_node_inner(input, actor_id, upstream_actor_idx, _same_worker_node_as_upstream, is_singleton)?; } return Ok(()); }; // get upstream table id let table_id = TableId::new(chain.table_id); - self.fragment_upstream_table_mapping - .insert(fragment_id, table_id); // 1. use table id to get upstream vnode mapping info: [(actor_id, // option(vnode_bitmap))] let upstream_vnode_mapping_info = &self.upstream_vnode_bitmap_info[&table_id]; @@ -300,12 +298,11 @@ where locations, dispatchers, actor_vnode_bitmaps: &mut Default::default(), - fragment_upstream_table_mapping: &mut Default::default(), upstream_worker_actors, }; for fragment in table_fragments.fragments.values_mut() { - if !colocate_fragment_set.contains(&fragment.fragment_id) { + if !fragment_upstream_table_map.contains_key(&fragment.fragment_id) { continue; } @@ -317,7 +314,6 @@ where env.resolve_chain_node_inner( stream_node, actor.actor_id, - fragment.fragment_id, idx, actor.same_worker_node_as_upstream, is_singleton, @@ -326,8 +322,7 @@ where actor.vnode_bitmap = env.actor_vnode_bitmaps.remove(&actor.actor_id).unwrap(); } // setup fragment vnode mapping. - let upstream_table_id = env - .fragment_upstream_table_mapping + let upstream_table_id = fragment_upstream_table_map .get(&fragment.fragment_id) .unwrap(); fragment.vnode_mapping = env @@ -361,7 +356,7 @@ where dependent_table_ids, table_properties, internal_table_id_map, - colocate_fragment_set, + fragment_upstream_table_map, affiliated_source, .. }: &mut CreateMaterializedViewContext, @@ -402,7 +397,7 @@ where let topological_order = table_fragments.generate_topological_order(); for fragment_id in topological_order { let fragment = table_fragments.fragments.get_mut(&fragment_id).unwrap(); - if !colocate_fragment_set.contains(&fragment_id) { + if !fragment_upstream_table_map.contains_key(&fragment_id) { self.scheduler.schedule(fragment, &mut locations).await?; } } @@ -419,7 +414,7 @@ where dispatchers, upstream_worker_actors, &mut locations, - colocate_fragment_set, + fragment_upstream_table_map, ) .await?; From 0b0848f6a0da07dc745c5b0652c7d863b31e9c35 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 22 Aug 2022 16:23:42 +0800 Subject: [PATCH 8/9] some refactor --- proto/stream_plan.proto | 2 +- src/common/src/catalog/mod.rs | 7 ++ src/meta/src/stream/mod.rs | 71 -------------------- src/meta/src/stream/stream_graph.rs | 97 +++++++++++++++++++++------ src/meta/src/stream/stream_manager.rs | 5 +- 5 files changed, 85 insertions(+), 97 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 7fb6c0793b797..5d47726afd95f 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -472,7 +472,7 @@ message StreamFragmentGraph { bool is_singleton = 4; // Number of table ids (stateful states) for this fragment. uint32 table_ids_cnt = 5; - // Mark the upstream table ids of this fragment. + // Mark the upstream table ids of this fragment, Used for fragments with `Chain`s. repeated uint32 upstream_table_ids = 6; } diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 80c7a38e59a05..976e2c2fe5685 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -112,6 +112,13 @@ impl From for TableId { Self::new(id) } } + +impl From<&u32> for TableId { + fn from(id: &u32) -> Self { + Self::new(*id) + } +} + impl From for u32 { fn from(id: TableId) -> Self { id.table_id diff --git a/src/meta/src/stream/mod.rs b/src/meta/src/stream/mod.rs index ea09ad33f08b8..63b993359a766 100644 --- a/src/meta/src/stream/mod.rs +++ b/src/meta/src/stream/mod.rs @@ -19,78 +19,7 @@ mod stream_manager; #[cfg(test)] mod test_fragmenter; -use risingwave_pb::meta::table_fragments::Fragment; -use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::StreamNode; pub use scheduler::*; pub use source_manager::*; pub use stream_graph::*; pub use stream_manager::*; - -use crate::MetaResult; - -/// Record internal table ids for stateful operators in meta. -pub fn record_internal_state_tables( - stream_node: &StreamNode, - fragment: &mut Fragment, -) -> MetaResult<()> { - // We only consider stateful operators with multiple parallel degrees here. Singleton stateful - // operators will not have vnode mappings, so that compactors could omit the unnecessary probing - // on vnode mappings. - match stream_node.get_node_body()? { - NodeBody::Materialize(node) => { - let table_id = node.get_table_id(); - fragment.state_table_ids.push(table_id); - } - NodeBody::Arrange(node) => { - let table_id = node.table.as_ref().unwrap().id; - fragment.state_table_ids.push(table_id); - } - NodeBody::HashAgg(node) => { - for table in &node.internal_tables { - fragment.state_table_ids.push(table.id); - } - } - NodeBody::GlobalSimpleAgg(node) => { - for table in &node.internal_tables { - fragment.state_table_ids.push(table.id); - } - } - NodeBody::HashJoin(node) => { - fragment - .state_table_ids - .push(node.left_table.as_ref().unwrap().id); - fragment - .state_table_ids - .push(node.right_table.as_ref().unwrap().id); - } - NodeBody::DynamicFilter(node) => { - fragment - .state_table_ids - .push(node.left_table.as_ref().unwrap().id); - fragment - .state_table_ids - .push(node.right_table.as_ref().unwrap().id); - } - NodeBody::AppendOnlyTopN(node) => { - fragment.state_table_ids.push(node.table_id_l); - fragment.state_table_ids.push(node.table_id_h); - } - NodeBody::GroupTopN(node) => { - fragment - .state_table_ids - .push(node.table.as_ref().unwrap().id); - } - NodeBody::TopN(node) => { - fragment - .state_table_ids - .push(node.table.as_ref().unwrap().id); - } - _ => {} - } - let input_nodes = stream_node.get_input(); - for input_node in input_nodes { - record_internal_state_tables(input_node, fragment)?; - } - Ok(()) -} diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 8a44f1c9cb6c8..024b46671f0fe 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -38,7 +38,6 @@ use crate::manager::{ }; use crate::model::{ActorId, FragmentId}; use crate::storage::MetaStore; -use crate::stream::record_internal_state_tables; use crate::MetaResult; /// Id of an Actor, maybe local or global @@ -800,7 +799,7 @@ impl ActorGraphBuilder { // identical state table id. let actor = fragment.actors.first().unwrap(); let stream_node = actor.get_nodes()?.clone(); - record_internal_state_tables(&stream_node, fragment)?; + Self::record_internal_state_tables(&stream_node, fragment)?; } Ok(graph) } @@ -934,33 +933,24 @@ impl ActorGraphBuilder { ctx: &mut CreateMaterializedViewContext, ) -> MetaResult<()> { let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone(); - if !current_fragment.upstream_table_ids.is_empty() { - assert_eq!(current_fragment.upstream_table_ids.len(), 1); - ctx.fragment_upstream_table_map.insert( - fragment_id.as_global_id(), - TableId::new( - *current_fragment - .upstream_table_ids - .iter() - .exactly_one() - .unwrap(), - ), - ); + let upstream_table_id = current_fragment + .upstream_table_ids + .iter() + .at_most_one() + .unwrap() + .map(TableId::from); + if let Some(upstream_table_id) = upstream_table_id { + ctx.fragment_upstream_table_map + .insert(fragment_id.as_global_id(), upstream_table_id); } let parallel_degree = if current_fragment.is_singleton { 1 - } else if !current_fragment.upstream_table_ids.is_empty() { + } else if let Some(upstream_table_id) = upstream_table_id { // set fragment parallelism to the parallelism of its dependent table. let upstream_actors = ctx .table_sink_map - .get(&TableId::from( - *current_fragment - .upstream_table_ids - .iter() - .exactly_one() - .unwrap(), - )) + .get(&upstream_table_id) .expect("upstream actor should exist"); upstream_actors.len() as u32 } else { @@ -1016,6 +1006,69 @@ impl ActorGraphBuilder { Ok(()) } + + /// Record internal table ids for stateful operators in meta. + fn record_internal_state_tables( + stream_node: &StreamNode, + fragment: &mut Fragment, + ) -> MetaResult<()> { + match stream_node.get_node_body()? { + NodeBody::Materialize(node) => { + let table_id = node.get_table_id(); + fragment.state_table_ids.push(table_id); + } + NodeBody::Arrange(node) => { + let table_id = node.table.as_ref().unwrap().id; + fragment.state_table_ids.push(table_id); + } + NodeBody::HashAgg(node) => { + for table in &node.internal_tables { + fragment.state_table_ids.push(table.id); + } + } + NodeBody::GlobalSimpleAgg(node) => { + for table in &node.internal_tables { + fragment.state_table_ids.push(table.id); + } + } + NodeBody::HashJoin(node) => { + fragment + .state_table_ids + .push(node.left_table.as_ref().unwrap().id); + fragment + .state_table_ids + .push(node.right_table.as_ref().unwrap().id); + } + NodeBody::DynamicFilter(node) => { + fragment + .state_table_ids + .push(node.left_table.as_ref().unwrap().id); + fragment + .state_table_ids + .push(node.right_table.as_ref().unwrap().id); + } + NodeBody::AppendOnlyTopN(node) => { + fragment.state_table_ids.push(node.table_id_l); + fragment.state_table_ids.push(node.table_id_h); + } + NodeBody::GroupTopN(node) => { + fragment + .state_table_ids + .push(node.table.as_ref().unwrap().id); + } + NodeBody::TopN(node) => { + fragment + .state_table_ids + .push(node.table.as_ref().unwrap().id); + } + _ => {} + } + let input_nodes = stream_node.get_input(); + for input_node in input_nodes { + Self::record_internal_state_tables(input_node, fragment)?; + } + Ok(()) + } } #[derive(Default)] diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b26ac80224975..2eb7e202c2eb4 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -64,8 +64,8 @@ pub struct CreateMaterializedViewContext { pub table_id_offset: u32, /// Internal TableID to Table mapping pub internal_table_id_map: HashMap>, - /// The set of fragments with their upstream table ids, these fragments need to be colocated - /// with their upstream tables. Specifically, they are fragments containing chain nodes. + /// The upstream tables of all fragments containing chain nodes. + /// These fragments need to be colocated with their upstream tables. /// /// They are scheduled in `resolve_chain_node`. pub fragment_upstream_table_map: HashMap, @@ -259,7 +259,6 @@ where // Note: currently we ensure that the downstream chain operator has the same // parallel unit and distribution as the upstream mview, so we can simply use // `NoShuffle` dispatcher here. - // TODO: support different parallel unit and distribution for new MV. self.dispatchers .entry(*upstream_actor_id) .or_default() From ebde1f42c663451615222719ee75374f2d5b1392 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 22 Aug 2022 17:03:53 +0800 Subject: [PATCH 9/9] rename --- src/meta/src/stream/stream_graph.rs | 2 +- src/meta/src/stream/stream_manager.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 024b46671f0fe..4ec7a458537c1 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -940,7 +940,7 @@ impl ActorGraphBuilder { .unwrap() .map(TableId::from); if let Some(upstream_table_id) = upstream_table_id { - ctx.fragment_upstream_table_map + ctx.chain_fragment_upstream_table_map .insert(fragment_id.as_global_id(), upstream_table_id); } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 2eb7e202c2eb4..7bc7dabb0cb8e 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -68,7 +68,7 @@ pub struct CreateMaterializedViewContext { /// These fragments need to be colocated with their upstream tables. /// /// They are scheduled in `resolve_chain_node`. - pub fragment_upstream_table_map: HashMap, + pub chain_fragment_upstream_table_map: HashMap, /// SchemaId of mview pub schema_id: SchemaId, /// DatabaseId of mview @@ -151,7 +151,7 @@ where dispatchers: &mut HashMap>, upstream_worker_actors: &mut HashMap>, locations: &mut ScheduledLocations, - fragment_upstream_table_map: &HashMap, + chain_fragment_upstream_table_map: &HashMap, ) -> MetaResult<()> { // The closure environment. Used to simulate recursive closure. struct Env<'a> { @@ -301,7 +301,7 @@ where }; for fragment in table_fragments.fragments.values_mut() { - if !fragment_upstream_table_map.contains_key(&fragment.fragment_id) { + if !chain_fragment_upstream_table_map.contains_key(&fragment.fragment_id) { continue; } @@ -321,7 +321,7 @@ where actor.vnode_bitmap = env.actor_vnode_bitmaps.remove(&actor.actor_id).unwrap(); } // setup fragment vnode mapping. - let upstream_table_id = fragment_upstream_table_map + let upstream_table_id = chain_fragment_upstream_table_map .get(&fragment.fragment_id) .unwrap(); fragment.vnode_mapping = env @@ -355,7 +355,7 @@ where dependent_table_ids, table_properties, internal_table_id_map, - fragment_upstream_table_map, + chain_fragment_upstream_table_map, affiliated_source, .. }: &mut CreateMaterializedViewContext, @@ -396,7 +396,7 @@ where let topological_order = table_fragments.generate_topological_order(); for fragment_id in topological_order { let fragment = table_fragments.fragments.get_mut(&fragment_id).unwrap(); - if !fragment_upstream_table_map.contains_key(&fragment_id) { + if !chain_fragment_upstream_table_map.contains_key(&fragment_id) { self.scheduler.schedule(fragment, &mut locations).await?; } } @@ -413,7 +413,7 @@ where dispatchers, upstream_worker_actors, &mut locations, - fragment_upstream_table_map, + chain_fragment_upstream_table_map, ) .await?;