Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enforce chain parallelism and fill proper upstream with same vnode range #4740

Merged
merged 15 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ message StreamFragmentGraph {
bool is_singleton = 4;
// Number of table ids (stateful states) for this fragment.
uint32 table_ids_cnt = 5;
// Mark the dependent table id of this fragment.
uint32 dependent_table_id = 6;
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
}

message StreamFragmentEdge {
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ impl StreamService for StreamServiceImpl {
&self,
request: Request<BroadcastActorInfoTableRequest>,
) -> std::result::Result<Response<BroadcastActorInfoTableResponse>, Status> {
let table = request.into_inner();
let req = request.into_inner();

let res = self.mgr.update_actor_info(table);
let res = self.mgr.update_actor_info(&req.info);
match res {
Err(e) => {
error!("failed to update actor info table actor {}", e);
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct StreamFragment {

/// Number of table ids (stateful states) for this fragment.
pub table_ids_cnt: u32,

/// Mark the dependent table id of this fragment.
pub dependent_table_id: Option<u32>,
}

/// An edge between the nodes in the fragment graph.
Expand All @@ -66,6 +69,7 @@ impl StreamFragment {
is_singleton: false,
node: None,
table_ids_cnt: 0,
dependent_table_id: None,
}
}

Expand All @@ -76,6 +80,7 @@ impl StreamFragment {
fragment_type: self.fragment_type as i32,
is_singleton: self.is_singleton,
table_ids_cnt: self.table_ids_cnt,
dependent_table_id: self.dependent_table_id.unwrap_or(0),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl StreamFragmenter {
state
.dependent_table_ids
.insert(TableId::new(node.table_id));
current_fragment.dependent_table_id = Some(node.table_id);
current_fragment.is_singleton = node.is_singleton;
}

Expand Down
22 changes: 21 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::{bail, try_match_expand};
use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::{Dispatcher, FragmentType, StreamActor};
use tokio::sync::{RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -526,6 +526,26 @@ where
Ok(info)
}

pub async fn get_sink_vnode_mapping_info(
&self,
table_ids: &HashSet<TableId>,
) -> MetaResult<HashMap<TableId, Vec<(ActorId, Option<Buffer>)>>> {
let map = &self.core.read().await.table_fragments;
let mut info: HashMap<TableId, Vec<(ActorId, Option<Buffer>)>> = HashMap::new();

for table_id in table_ids {
match map.get(table_id) {
Some(table_fragment) => {
info.insert(*table_id, table_fragment.sink_vnode_mapping_info());
}
None => {
bail!("table_fragment not exist: id={}", table_id);
}
}
}
Ok(info)
}

pub async fn get_sink_parallel_unit_ids(
&self,
table_ids: &HashSet<TableId>,
Expand Down
18 changes: 16 additions & 2 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping};
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping};
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus, Fragment};
use risingwave_pb::meta::TableFragments as ProstTableFragments;
use risingwave_pb::stream_plan::source_node::SourceType;
Expand Down Expand Up @@ -317,6 +317,20 @@ impl TableFragments {
actor_map
}

/// Returns sink actor vnode mapping infos.
pub fn sink_vnode_mapping_info(&self) -> Vec<(ActorId, Option<Buffer>)> {
self.fragments
.values()
.filter(|fragment| fragment.fragment_type == FragmentType::Sink as i32)
.flat_map(|fragment| {
fragment
.actors
.iter()
.map(|actor| (actor.actor_id, actor.vnode_bitmap.clone()))
})
.collect_vec()
}

pub fn parallel_unit_sink_actor_id(&self) -> BTreeMap<ParallelUnitId, ActorId> {
let sink_actor_ids = self.sink_actor_ids();
sink_actor_ids
Expand All @@ -330,7 +344,7 @@ impl TableFragments {
.collect()
}

/// Generate toplogical order of fragments. If `index(a) < index(b)` in vec, then a is the
/// Generate topological order of fragments. If `index(a) < index(b)` in vec, then a is the
/// downstream of b.
pub fn generate_topological_order(&self) -> Vec<FragmentId> {
let mut actionable_fragment_id = VecDeque::new();
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
.collect_vec();
let hummock_snapshot = Some(self.hummock_manager.get_last_epoch().unwrap());

let hummock_manager_rdguard = self.hummock_manager.get_read_guard().await;
let hummock_manager_guard = self.hummock_manager.get_read_guard().await;

// Send the snapshot on subscription. After that we will send only updates.
let meta_snapshot = match worker_type {
Expand Down Expand Up @@ -133,7 +133,7 @@ where

MetaSnapshot {
tables,
hummock_version: Some(hummock_manager_rdguard.current_version.clone()),
hummock_version: Some(hummock_manager_guard.current_version.clone()),
..Default::default()
}
}
Expand Down
44 changes: 19 additions & 25 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,8 @@ impl BuildActorGraphState {

/// [`ActorGraphBuilder`] generates the proto for interconnected actors for a streaming pipeline.
pub struct ActorGraphBuilder {
/// GlobalFragmentId -> parallel_degree
parallelisms: HashMap<FragmentId, u32>,
/// Default parallelism.
default_parallelism: u32,

fragment_graph: StreamFragmentGraph,
}
Expand Down Expand Up @@ -813,23 +813,8 @@ impl ActorGraphBuilder {

let fragment_graph = StreamFragmentGraph::from_protobuf(fragment_graph.clone(), offset);

// TODO(Kexiang): now simply use Count(ParallelUnit) as parallelism of each fragment
let parallelisms: HashMap<FragmentId, u32> = fragment_graph
.fragments()
.iter()
.map(|(id, fragment)| {
let id = id.as_global_id();
let parallel_degree = if fragment.is_singleton {
1
} else {
default_parallelism
};
(id, parallel_degree)
})
.collect();

Ok(Self {
parallelisms,
default_parallelism,
fragment_graph,
})
}
Expand Down Expand Up @@ -874,7 +859,7 @@ impl ActorGraphBuilder {
state.stream_graph_builder.fill_info(info);

// Generate actors of the streaming plan
self.build_actor_graph(&mut state, &self.fragment_graph)?;
self.build_actor_graph(&mut state, &self.fragment_graph, ctx)?;
state
};

Expand Down Expand Up @@ -925,6 +910,7 @@ impl ActorGraphBuilder {
&self,
state: &mut BuildActorGraphState,
fragment_graph: &StreamFragmentGraph,
ctx: &CreateMaterializedViewContext,
) -> MetaResult<()> {
// Use topological sort to build the graph from downstream to upstream. (The first fragment
// poped out from the heap will be the top-most node in plan, or the sink in stream graph.)
Expand All @@ -944,7 +930,7 @@ impl ActorGraphBuilder {

while let Some(fragment_id) = actionable_fragment_id.pop_front() {
// Build the actors corresponding to the fragment
self.build_actor_graph_fragment(fragment_id, state, fragment_graph)?;
self.build_actor_graph_fragment(fragment_id, state, fragment_graph, ctx)?;

// Find if we can process more fragments
for upstream_id in fragment_graph.get_upstreams(fragment_id).keys() {
Expand Down Expand Up @@ -972,14 +958,22 @@ impl ActorGraphBuilder {
fragment_id: GlobalFragmentId,
state: &mut BuildActorGraphState,
fragment_graph: &StreamFragmentGraph,
ctx: &CreateMaterializedViewContext,
) -> MetaResult<()> {
let current_fragment = fragment_graph.get_fragment(fragment_id).unwrap().clone();

let parallel_degree = self
.parallelisms
.get(&fragment_id.as_global_id())
.unwrap()
.to_owned();
let parallel_degree = if current_fragment.is_singleton {
1
} else if current_fragment.dependent_table_id != 0 {
// set fragment parallelism to the parallelism of its dependent table.
let upstream_actors = ctx
.table_sink_map
.get(&TableId::new(current_fragment.dependent_table_id))
.expect("upstream actor should exist");
upstream_actors.len() as u32
} else {
self.default_parallelism
};

let node = Arc::new(current_fragment.node.unwrap());
let actor_ids = state
Expand Down
40 changes: 23 additions & 17 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_pb::catalog::{Source, Table};
use risingwave_pb::common::{ActorInfo, ParallelUnitMapping, WorkerType};
use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnitMapping, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::{ActorState, ActorStatus};
Expand Down Expand Up @@ -151,6 +151,8 @@ where
struct Env<'a> {
/// Records what's the corresponding actor of each parallel unit of one table.
upstream_parallel_unit_info: &'a HashMap<TableId, BTreeMap<ParallelUnitId, ActorId>>,
/// Records each upstream mview actor's vnode mapping info.
upstream_vnode_mapping_info: &'a HashMap<TableId, Vec<(ActorId, Option<Buffer>)>>,
/// Records what's the actors on each worker of one table.
tables_worker_actors: &'a HashMap<TableId, BTreeMap<WorkerId, Vec<ActorId>>>,
/// Schedule information of all actors.
Expand All @@ -166,39 +168,36 @@ where
&mut self,
stream_node: &mut StreamNode,
actor_id: ActorId,
vnode_mapping: &Option<Buffer>,
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
same_worker_node_as_upstream: bool,
is_singleton: bool,
) -> MetaResult<()> {
let Some(NodeBody::Chain(ref mut chain)) = stream_node.node_body else {
// If node is not chain node, recursively deal with input nodes
for input in &mut stream_node.input {
self.resolve_chain_node_inner(input, actor_id, same_worker_node_as_upstream, is_singleton)?;
self.resolve_chain_node_inner(input, actor_id, vnode_mapping, same_worker_node_as_upstream, is_singleton)?;
}
return Ok(());
};

// get upstream table id
let table_id = TableId::new(chain.table_id);

// FIXME: We assume that the chain node is always on the same parallel unit as its
// upstream materialize node here to find the upstream actor.
let upstream_actor_id = {
// 1. use table id to get upstream parallel_unit -> actor_id mapping
let upstream_parallel_actor_mapping =
&self.upstream_parallel_unit_info[&table_id];
// 1. use table id to get upstream vnode mapping info: [(actor_id,
// option(vnode_mapping))]
let upstream_vnode_mapping_info = &self.upstream_vnode_mapping_info[&table_id];

if is_singleton {
// Directly find the singleton actor id.
*upstream_parallel_actor_mapping
.values()
.exactly_one()
.unwrap()
upstream_vnode_mapping_info.iter().exactly_one().unwrap().0
} else {
// 2. use our actor id to get parallel unit id of the chain actor
let parallel_unit_id = self.locations.actor_locations[&actor_id].id;
// 3. and use chain actor's parallel unit id to get the corresponding
// upstream actor id
upstream_parallel_actor_mapping[&parallel_unit_id]
// 2. find the upstream actor id by vnode mapping.
assert!(vnode_mapping.is_some());
upstream_vnode_mapping_info
.iter()
.find(|(_, bitmap)| bitmap == vnode_mapping)
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
.unwrap()
.0
}
};

Expand Down Expand Up @@ -282,13 +281,19 @@ where
.get_sink_parallel_unit_ids(dependent_table_ids)
.await?;

let upstream_vnode_mapping_info = &self
.fragment_manager
.get_sink_vnode_mapping_info(dependent_table_ids)
.await?;

let tables_worker_actors = &self
.fragment_manager
.get_tables_worker_actors(dependent_table_ids)
.await?;

let mut env = Env {
upstream_parallel_unit_info,
upstream_vnode_mapping_info,
tables_worker_actors,
locations,
dispatchers,
Expand All @@ -304,6 +309,7 @@ where
env.resolve_chain_node_inner(
stream_node,
actor.actor_id,
&actor.vnode_bitmap,
actor.same_worker_node_as_upstream,
is_singleton,
)?;
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_type: FragmentType::Source as i32,
is_singleton: false,
table_ids_cnt: 0,
dependent_table_id: 0,
});

// exchange node
Expand Down Expand Up @@ -216,6 +217,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_type: FragmentType::Others as i32,
is_singleton: false,
table_ids_cnt: 0,
dependent_table_id: 0,
});

// exchange node
Expand Down Expand Up @@ -303,6 +305,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_type: FragmentType::Sink as i32,
is_singleton: true,
table_ids_cnt: 0,
dependent_table_id: 0,
});

fragments
Expand Down
14 changes: 4 additions & 10 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,9 @@ impl LocalStreamManager {

/// This function could only be called once during the lifecycle of `LocalStreamManager` for
/// now.
pub fn update_actor_info(
&self,
req: stream_service::BroadcastActorInfoTableRequest,
) -> Result<()> {
pub fn update_actor_info(&self, actor_infos: &[ActorInfo]) -> Result<()> {
let mut core = self.core.lock();
core.update_actor_info(req)
core.update_actor_info(actor_infos)
}

/// This function could only be called once during the lifecycle of `LocalStreamManager` for
Expand Down Expand Up @@ -644,12 +641,9 @@ impl LocalStreamManagerCore {
.collect::<Result<Vec<_>>>()
}

fn update_actor_info(
&mut self,
req: stream_service::BroadcastActorInfoTableRequest,
) -> Result<()> {
fn update_actor_info(&mut self, new_actor_infos: &[ActorInfo]) -> Result<()> {
let mut actor_infos = self.context.actor_infos.write();
for actor in req.get_info() {
for actor in new_actor_infos {
let ret = actor_infos.insert(actor.get_actor_id(), actor.clone());
if let Some(prev_actor) = ret && actor != &prev_actor{
return Err(ErrorCode::InternalError(format!(
Expand Down