Skip to content

Commit

Permalink
feat(streaming): use multiple dispatcher for mview creation (#3758)
Browse files Browse the repository at this point in the history
* move mutation and barrier to stream_plan.proto

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add dispatcher when creating mview

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* from to proto of add dispatcher

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* put actor info to shared context

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add dispatcher on compute node

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* clean up empty dispatcher

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* cleanup

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix clippy

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix tests

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* use add dispatcher for source executor

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* rename dispatch to dispatcher

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add comments

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix unique dispatcher id

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix it

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* use expect dead code

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Jul 11, 2022
1 parent 59579ce commit 097e3eb
Show file tree
Hide file tree
Showing 18 changed files with 420 additions and 297 deletions.
42 changes: 0 additions & 42 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package data;

import "common.proto";
import "source.proto";

option optimize_for = SPEED;

Expand Down Expand Up @@ -118,57 +117,16 @@ enum Op {
UPDATE_DELETE = 3;
}

message StreamMessage {
oneof stream_message {
StreamChunk stream_chunk = 1;
Barrier barrier = 2;
}
}

message StreamChunk {
// for Column::from_protobuf(), may not need later
uint32 cardinality = 1;
repeated Op ops = 2;
repeated Column columns = 3;
}

message StopMutation {
repeated uint32 actors = 1;
}

message DispatcherMutation {
uint32 actor_id = 1;
uint64 dispatcher_id = 2;
repeated common.ActorInfo info = 3;
}

message UpdateMutation {
repeated DispatcherMutation mutations = 1;
}

message AddMutation {
repeated DispatcherMutation mutations = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message SourceChangeSplitMutation {
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message Epoch {
uint64 curr = 1;
uint64 prev = 2;
}

message Barrier {
Epoch epoch = 1;
oneof mutation {
StopMutation stop = 3;
UpdateMutation update = 4;
AddMutation add = 5;
SourceChangeSplitMutation splits = 7;
}
bytes span = 6;
}

message Terminate {}
55 changes: 53 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,60 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "source.proto";

option optimize_for = SPEED;

message StopMutation {
repeated uint32 actors = 1;
}

message DispatcherMutation {
uint32 actor_id = 1;
uint64 dispatcher_id = 2;
repeated common.ActorInfo info = 3;
}

message UpdateMutation {
repeated DispatcherMutation mutations = 1;
}

message AddMutation {
repeated DispatcherMutation mutations = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message SourceChangeSplitMutation {
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message AddDispatcherMutation {
message Dispatchers {
repeated Dispatcher dispatchers = 1;
}
map<uint32, Dispatchers> actor_dispatchers = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message Barrier {
data.Epoch epoch = 1;
oneof mutation {
StopMutation stop = 3;
UpdateMutation update = 4;
AddMutation add = 5;
AddDispatcherMutation add_dispatcher = 8;
SourceChangeSplitMutation splits = 7;
}
bytes span = 6;
}

message StreamMessage {
oneof stream_message {
data.StreamChunk stream_chunk = 1;
Barrier barrier = 2;
}
}

// Hash mapping for compute node. Stores mapping from virtual node to actor id.
message ActorMapping {
repeated uint64 original_indices = 1;
Expand Down Expand Up @@ -271,7 +322,8 @@ message StreamNode {
ExpandNode expand = 121;
DynamicFilterNode dynamic_filter = 122;
}
// The id for the operator.
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
uint64 operator_id = 1;
// Child node in plan aka. upstream nodes in the streaming DAG
repeated StreamNode input = 3;
Expand Down Expand Up @@ -312,7 +364,6 @@ message Dispatcher {
ActorMapping hash_mapping = 3;
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
// For dispatchers within actors, the id is the same as operator_id of the exchange plan node.
// For cross-MV dispatchers, there will only be one broadcast dispatcher of id 0.
uint64 dispatcher_id = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 5;
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ message ForceStopActorsResponse {

message InjectBarrierRequest {
string request_id = 1;
data.Barrier barrier = 2;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
}
Expand Down
3 changes: 2 additions & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package task_service;
import "batch_plan.proto";
import "common.proto";
import "data.proto";
import "stream_plan.proto";

option optimize_for = SPEED;

Expand Down Expand Up @@ -97,7 +98,7 @@ message GetDataRequest {
}

message GetStreamResponse {
data.StreamMessage message = 1;
stream_plan.StreamMessage message = 1;
}

service ExchangeService {
Expand Down
61 changes: 27 additions & 34 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::SplitImpl;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::data::barrier::Mutation;
use risingwave_pb::data::{AddMutation, DispatcherMutation, StopMutation};
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::add_dispatcher_mutation::Dispatchers;
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::{AddDispatcherMutation, Dispatcher, StopMutation};
use risingwave_pb::stream_service::DropActorsRequest;
use risingwave_rpc_client::StreamClientPoolRef;
use uuid::Uuid;

use super::info::BarrierActorInfo;
use crate::barrier::ChangedTableState;
use crate::barrier::ChangedTableState::{Create, Drop, NoTable};
use crate::model::{ActorId, DispatcherId, TableFragments};
use crate::model::{ActorId, TableFragments};
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;

Expand Down Expand Up @@ -64,7 +64,7 @@ pub enum Command {
CreateMaterializedView {
table_fragments: TableFragments,
table_sink_map: HashMap<TableId, Vec<ActorId>>,
dispatches: HashMap<(ActorId, DispatcherId), Vec<ActorInfo>>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
source_state: HashMap<ActorId, Vec<SplitImpl>>,
},
}
Expand Down Expand Up @@ -143,21 +143,21 @@ where
}

Command::CreateMaterializedView {
dispatches,
dispatchers,
source_state,
..
} => {
let mutations = dispatches
let actor_dispatchers = dispatchers
.iter()
.map(
|(&(up_actor_id, dispatcher_id), down_actor_infos)| DispatcherMutation {
actor_id: up_actor_id,
dispatcher_id,
info: down_actor_infos.to_vec(),
},
)
.map(|(&actor_id, dispatchers)| {
(
actor_id,
Dispatchers {
dispatchers: dispatchers.clone(),
},
)
})
.collect();

let actor_splits = source_state
.iter()
.filter(|(_, splits)| !splits.is_empty())
Expand All @@ -171,8 +171,8 @@ where
})
.collect();

Some(Mutation::Add(AddMutation {
mutations,
Some(Mutation::AddDispatcher(AddDispatcherMutation {
actor_dispatchers,
actor_splits,
}))
}
Expand All @@ -185,9 +185,10 @@ where
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
match &self.command {
Command::CreateMaterializedView { dispatches, .. } => dispatches
.iter()
.flat_map(|(_, down_actor_infos)| down_actor_infos.iter().map(|info| info.actor_id))
Command::CreateMaterializedView { dispatchers, .. } => dispatchers
.values()
.flatten()
.flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied())
.collect(),

_ => Default::default(),
Expand Down Expand Up @@ -226,31 +227,23 @@ where

Command::CreateMaterializedView {
table_fragments,
dispatches,
dispatchers,
table_sink_map,
source_state: _,
} => {
let mut dependent_table_actors = Vec::with_capacity(table_sink_map.len());
for (table_id, actors) in table_sink_map {
let downstream_actors = dispatches
let downstream_actors = dispatchers
.iter()
.filter(|((upstream_actor_id, _), _)| actors.contains(upstream_actor_id))
.map(|((upstream_actor_id, _), downstream_actor_infos)| {
(
*upstream_actor_id,
downstream_actor_infos
.iter()
.map(|info| info.actor_id)
.collect(),
)
})
.collect::<HashMap<ActorId, Vec<ActorId>>>();
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(&k, v)| (k, v.clone()))
.collect();
dependent_table_actors.push((*table_id, downstream_actors));
}
self.fragment_manager
.finish_create_table_fragments(
&table_fragments.table_id(),
&dependent_table_actors,
dependent_table_actors,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::data::Barrier;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::Barrier;
use risingwave_pb::stream_service::{
BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest,
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::storage::{self, MetaStore, Transaction};
pub type ActorId = u32;

/// Should be used together with `ActorId` to uniquely identify a dispatcher
#[expect(dead_code)]
pub type DispatcherId = u64;

/// A global, unique identifier of a fragment
Expand Down
30 changes: 17 additions & 13 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::try_match_expand;
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_common::util::compress::decompress_data;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::{FragmentType, StreamActor};
use risingwave_pb::stream_plan::{Dispatcher, FragmentType, StreamActor};
use tokio::sync::RwLock;

use crate::cluster::WorkerId;
Expand Down Expand Up @@ -163,7 +163,7 @@ where
pub async fn finish_create_table_fragments(
&self,
table_id: &TableId,
dependent_table_actors: &[(TableId, HashMap<ActorId, Vec<ActorId>>)],
dependent_table_actors: Vec<(TableId, HashMap<ActorId, Vec<Dispatcher>>)>,
) -> Result<()> {
let map = &mut self.core.write().await.table_fragments;

Expand All @@ -175,9 +175,9 @@ where
table_fragments.upsert_in_transaction(&mut transaction)?;

let mut dependent_tables = Vec::with_capacity(dependent_table_actors.len());
for (dependent_table_id, extra_downstream_actors) in dependent_table_actors {
for (dependent_table_id, mut new_dispatchers) in dependent_table_actors {
let mut dependent_table = map
.get(dependent_table_id)
.get(&dependent_table_id)
.ok_or_else(|| {
RwError::from(InternalError(format!(
"table_fragment not exist: id={}",
Expand All @@ -187,12 +187,9 @@ where
.clone();
for fragment in dependent_table.fragments.values_mut() {
for actor in &mut fragment.actors {
if let Some(downstream_actors) =
extra_downstream_actors.get(&actor.actor_id)
{
actor.dispatcher[0]
.downstream_actor_id
.extend(downstream_actors.iter().cloned());
// Extend new dispatchers to table fragments.
if let Some(new_dispatchers) = new_dispatchers.remove(&actor.actor_id) {
actor.dispatcher.extend(new_dispatchers);
}
}
}
Expand Down Expand Up @@ -240,9 +237,16 @@ where
for fragment in dependent_table.fragments.values_mut() {
if fragment.fragment_type == FragmentType::Sink as i32 {
for actor in &mut fragment.actors {
actor.dispatcher[0]
.downstream_actor_id
.retain(|x| !chain_actor_ids.contains(x));
// Remove these downstream actor ids from all dispatchers.
for dispatcher in &mut actor.dispatcher {
dispatcher
.downstream_actor_id
.retain(|x| !chain_actor_ids.contains(x));
}
// Remove empty dispatchers.
actor
.dispatcher
.retain(|d| !d.downstream_actor_id.is_empty());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use risingwave_pb::catalog::source::Info::StreamSource;
use risingwave_pb::catalog::Source;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::data::barrier::Mutation;
use risingwave_pb::data::SourceChangeSplitMutation;
use risingwave_pb::source::{
ConnectorSplit, ConnectorSplits, SourceActorInfo as ProstSourceActorInfo,
};
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::SourceChangeSplitMutation;
use risingwave_pb::stream_service::{
CreateSourceRequest as ComputeNodeCreateSourceRequest,
DropSourceRequest as ComputeNodeDropSourceRequest,
Expand Down
Loading

0 comments on commit 097e3eb

Please sign in to comment.