Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): use multiple dispatcher for mview creation #3758

Merged
merged 17 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ syntax = "proto3";
package data;

import "common.proto";
import "source.proto";

option optimize_for = SPEED;

Expand Down Expand Up @@ -118,57 +117,16 @@ enum Op {
UPDATE_DELETE = 3;
}

message StreamMessage {
oneof stream_message {
StreamChunk stream_chunk = 1;
Barrier barrier = 2;
}
}

message StreamChunk {
// for Column::from_protobuf(), may not need later
uint32 cardinality = 1;
repeated Op ops = 2;
repeated Column columns = 3;
}

message StopMutation {
repeated uint32 actors = 1;
}

message DispatcherMutation {
uint32 actor_id = 1;
uint64 dispatcher_id = 2;
repeated common.ActorInfo info = 3;
}

message UpdateMutation {
repeated DispatcherMutation mutations = 1;
}

message AddMutation {
repeated DispatcherMutation mutations = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message SourceChangeSplitMutation {
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message Epoch {
uint64 curr = 1;
uint64 prev = 2;
}

message Barrier {
Epoch epoch = 1;
oneof mutation {
StopMutation stop = 3;
UpdateMutation update = 4;
AddMutation add = 5;
SourceChangeSplitMutation splits = 7;
}
bytes span = 6;
}

message Terminate {}
55 changes: 53 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,60 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "source.proto";

option optimize_for = SPEED;

message StopMutation {
repeated uint32 actors = 1;
}

message DispatcherMutation {
uint32 actor_id = 1;
uint64 dispatcher_id = 2;
repeated common.ActorInfo info = 3;
}

message UpdateMutation {
repeated DispatcherMutation mutations = 1;
}

message AddMutation {
repeated DispatcherMutation mutations = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message SourceChangeSplitMutation {
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message AddDispatcherMutation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please doc when each mutation will be used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll do this along with refactoring the AddOutput mutation in next PRs.

message Dispatchers {
repeated Dispatcher dispatchers = 1;
}
map<uint32, Dispatchers> actor_dispatchers = 1;
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

message Barrier {
data.Epoch epoch = 1;
oneof mutation {
StopMutation stop = 3;
UpdateMutation update = 4;
AddMutation add = 5;
AddDispatcherMutation add_dispatcher = 8;
SourceChangeSplitMutation splits = 7;
}
bytes span = 6;
}

message StreamMessage {
oneof stream_message {
data.StreamChunk stream_chunk = 1;
Barrier barrier = 2;
}
}

// Hash mapping for compute node. Stores mapping from virtual node to actor id.
message ActorMapping {
repeated uint64 original_indices = 1;
Expand Down Expand Up @@ -271,7 +322,8 @@ message StreamNode {
ExpandNode expand = 121;
DynamicFilterNode dynamic_filter = 122;
}
// The id for the operator.
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
uint64 operator_id = 1;
// Child node in plan aka. upstream nodes in the streaming DAG
repeated StreamNode input = 3;
Expand Down Expand Up @@ -312,7 +364,6 @@ message Dispatcher {
ActorMapping hash_mapping = 3;
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
// For dispatchers within actors, the id is the same as operator_id of the exchange plan node.
// For cross-MV dispatchers, there will only be one broadcast dispatcher of id 0.
uint64 dispatcher_id = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 5;
Expand Down
2 changes: 1 addition & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ message ForceStopActorsResponse {

message InjectBarrierRequest {
string request_id = 1;
data.Barrier barrier = 2;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
}
Expand Down
3 changes: 2 additions & 1 deletion proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package task_service;
import "batch_plan.proto";
import "common.proto";
import "data.proto";
import "stream_plan.proto";

option optimize_for = SPEED;

Expand Down Expand Up @@ -97,7 +98,7 @@ message GetDataRequest {
}

message GetStreamResponse {
data.StreamMessage message = 1;
stream_plan.StreamMessage message = 1;
}

service ExchangeService {
Expand Down
61 changes: 27 additions & 34 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::SplitImpl;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::data::barrier::Mutation;
use risingwave_pb::data::{AddMutation, DispatcherMutation, StopMutation};
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::add_dispatcher_mutation::Dispatchers;
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::{AddDispatcherMutation, Dispatcher, StopMutation};
use risingwave_pb::stream_service::DropActorsRequest;
use risingwave_rpc_client::StreamClientPoolRef;
use uuid::Uuid;

use super::info::BarrierActorInfo;
use crate::barrier::ChangedTableState;
use crate::barrier::ChangedTableState::{Create, Drop, NoTable};
use crate::model::{ActorId, DispatcherId, TableFragments};
use crate::model::{ActorId, TableFragments};
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;

Expand Down Expand Up @@ -64,7 +64,7 @@ pub enum Command {
CreateMaterializedView {
table_fragments: TableFragments,
table_sink_map: HashMap<TableId, Vec<ActorId>>,
dispatches: HashMap<(ActorId, DispatcherId), Vec<ActorInfo>>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
source_state: HashMap<ActorId, Vec<SplitImpl>>,
},
}
Expand Down Expand Up @@ -143,21 +143,21 @@ where
}

Command::CreateMaterializedView {
dispatches,
dispatchers,
source_state,
..
} => {
let mutations = dispatches
let actor_dispatchers = dispatchers
.iter()
.map(
|(&(up_actor_id, dispatcher_id), down_actor_infos)| DispatcherMutation {
actor_id: up_actor_id,
dispatcher_id,
info: down_actor_infos.to_vec(),
},
)
.map(|(&actor_id, dispatchers)| {
(
actor_id,
Dispatchers {
dispatchers: dispatchers.clone(),
},
)
})
.collect();

let actor_splits = source_state
.iter()
.filter(|(_, splits)| !splits.is_empty())
Expand All @@ -171,8 +171,8 @@ where
})
.collect();

Some(Mutation::Add(AddMutation {
mutations,
Some(Mutation::AddDispatcher(AddDispatcherMutation {
actor_dispatchers,
actor_splits,
}))
}
Expand All @@ -185,9 +185,10 @@ where
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
match &self.command {
Command::CreateMaterializedView { dispatches, .. } => dispatches
.iter()
.flat_map(|(_, down_actor_infos)| down_actor_infos.iter().map(|info| info.actor_id))
Command::CreateMaterializedView { dispatchers, .. } => dispatchers
.values()
.flatten()
.flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied())
.collect(),

_ => Default::default(),
Expand Down Expand Up @@ -226,31 +227,23 @@ where

Command::CreateMaterializedView {
table_fragments,
dispatches,
dispatchers,
table_sink_map,
source_state: _,
} => {
let mut dependent_table_actors = Vec::with_capacity(table_sink_map.len());
for (table_id, actors) in table_sink_map {
let downstream_actors = dispatches
let downstream_actors = dispatchers
.iter()
.filter(|((upstream_actor_id, _), _)| actors.contains(upstream_actor_id))
.map(|((upstream_actor_id, _), downstream_actor_infos)| {
(
*upstream_actor_id,
downstream_actor_infos
.iter()
.map(|info| info.actor_id)
.collect(),
)
})
.collect::<HashMap<ActorId, Vec<ActorId>>>();
.filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id))
.map(|(&k, v)| (k, v.clone()))
.collect();
dependent_table_actors.push((*table_id, downstream_actors));
}
self.fragment_manager
.finish_create_table_fragments(
&table_fragments.table_id(),
&dependent_table_actors,
dependent_table_actors,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::data::Barrier;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::Barrier;
use risingwave_pb::stream_service::{
BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest,
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::storage::{self, MetaStore, Transaction};
pub type ActorId = u32;

/// Should be used together with `ActorId` to uniquely identify a dispatcher
#[expect(dead_code)]
pub type DispatcherId = u64;

/// A global, unique identifier of a fragment
Expand Down
30 changes: 17 additions & 13 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::try_match_expand;
use risingwave_common::types::{ParallelUnitId, VIRTUAL_NODE_COUNT};
use risingwave_common::util::compress::decompress_data;
use risingwave_pb::meta::table_fragments::ActorState;
use risingwave_pb::stream_plan::{FragmentType, StreamActor};
use risingwave_pb::stream_plan::{Dispatcher, FragmentType, StreamActor};
use tokio::sync::RwLock;

use crate::cluster::WorkerId;
Expand Down Expand Up @@ -163,7 +163,7 @@ where
pub async fn finish_create_table_fragments(
&self,
table_id: &TableId,
dependent_table_actors: &[(TableId, HashMap<ActorId, Vec<ActorId>>)],
dependent_table_actors: Vec<(TableId, HashMap<ActorId, Vec<Dispatcher>>)>,
) -> Result<()> {
let map = &mut self.core.write().await.table_fragments;

Expand All @@ -175,9 +175,9 @@ where
table_fragments.upsert_in_transaction(&mut transaction)?;

let mut dependent_tables = Vec::with_capacity(dependent_table_actors.len());
for (dependent_table_id, extra_downstream_actors) in dependent_table_actors {
for (dependent_table_id, mut new_dispatchers) in dependent_table_actors {
let mut dependent_table = map
.get(dependent_table_id)
.get(&dependent_table_id)
.ok_or_else(|| {
RwError::from(InternalError(format!(
"table_fragment not exist: id={}",
Expand All @@ -187,12 +187,9 @@ where
.clone();
for fragment in dependent_table.fragments.values_mut() {
for actor in &mut fragment.actors {
if let Some(downstream_actors) =
extra_downstream_actors.get(&actor.actor_id)
{
actor.dispatcher[0]
.downstream_actor_id
.extend(downstream_actors.iter().cloned());
// Extend new dispatchers to table fragments.
if let Some(new_dispatchers) = new_dispatchers.remove(&actor.actor_id) {
actor.dispatcher.extend(new_dispatchers);
}
}
}
Expand Down Expand Up @@ -240,9 +237,16 @@ where
for fragment in dependent_table.fragments.values_mut() {
if fragment.fragment_type == FragmentType::Sink as i32 {
for actor in &mut fragment.actors {
actor.dispatcher[0]
.downstream_actor_id
.retain(|x| !chain_actor_ids.contains(x));
// Remove these downstream actor ids from all dispatchers.
for dispatcher in &mut actor.dispatcher {
dispatcher
.downstream_actor_id
.retain(|x| !chain_actor_ids.contains(x));
}
// Remove empty dispatchers.
actor
.dispatcher
.retain(|d| !d.downstream_actor_id.is_empty());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use risingwave_pb::catalog::source::Info::StreamSource;
use risingwave_pb::catalog::Source;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::data::barrier::Mutation;
use risingwave_pb::data::SourceChangeSplitMutation;
use risingwave_pb::source::{
ConnectorSplit, ConnectorSplits, SourceActorInfo as ProstSourceActorInfo,
};
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::SourceChangeSplitMutation;
use risingwave_pb::stream_service::{
CreateSourceRequest as ComputeNodeCreateSourceRequest,
DropSourceRequest as ComputeNodeDropSourceRequest,
Expand Down
Loading