Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): handle multiple edges with no-op fragment #9320

Merged
merged 9 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ message DedupNode {
repeated uint32 dedup_column_indices = 2;
}

message NoOpNode {}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -581,6 +583,7 @@ message StreamNode {
BarrierRecvNode barrier_recv = 132;
ValuesNode values = 133;
DedupNode append_only_dedup = 134;
NoOpNode no_op = 135;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
108 changes: 108 additions & 0 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,111 @@
└─StreamProject { exprs: [id, _row_id] }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "extra", "_row_id"] }
- id: self_join_multiple_edges
sql: |
create table t (a int, b int);
with cte as (select a, sum(b) sum from t group by a) select count(*) from cte c1 join cte c2 on c1.a = c2.a;
stream_plan: |
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [sum0(count)] }
└─StreamGlobalSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count] }
└─StreamHashJoin { type: Inner, predicate: t.a = t.a, output: all }
├─StreamShare { id = 4 }
| └─StreamProject { exprs: [t.a] }
| └─StreamHashAgg { group_key: [t.a], aggs: [count] }
| └─StreamExchange { dist: HashShard(t.a) }
| └─StreamTableScan { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamShare { id = 4 }
└─StreamProject { exprs: [t.a] }
└─StreamHashAgg { group_key: [t.a], aggs: [count] }
└─StreamExchange { dist: HashShard(t.a) }
└─StreamTableScan { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [count], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [sum0(count)] }
└── StreamGlobalSimpleAgg { aggs: [sum0(count), count] }
├── result table: 0
├── state tables: []
├── distinct tables: []
└── StreamExchange Single from 1

Fragment 1
StreamStatelessLocalSimpleAgg { aggs: [count] }
└── StreamHashJoin { type: Inner, predicate: t.a = t.a, output: all }
├── left table: 1
├── right table: 3
├── left degree table: 2
├── right degree table: 4
├── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([0]) from 4

Fragment 2
StreamProject { exprs: [t.a] }
└── StreamHashAgg { group_key: [t.a], aggs: [count] }
├── result table: 5
├── state tables: []
├── distinct tables: []
└── StreamExchange Hash([0]) from 3

Fragment 3
Chain { table: t, columns: [t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
├── Upstream
└── BatchPlanNode

Fragment 4
StreamNoOp
└── StreamExchange NoShuffle from 2

Table 0
├── columns: [ sum0(count), count ]
├── primary key: []
├── value indices: [ 0, 1 ]
├── distribution key: []
└── read pk prefix len hint: 0

Table 1
├── columns: [ t_a ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 2
├── columns: [ t_a, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 3
├── columns: [ t_a ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 4
├── columns: [ t_a, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 5
├── columns: [ t_a, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 4294967294
├── columns: [ count ]
├── primary key: []
├── value indices: [ 0 ]
├── distribution key: []
└── read pk prefix len hint: 0

20 changes: 19 additions & 1 deletion src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ impl StreamFragmentGraph {
downstream_id: LocalFragmentId,
edge: StreamFragmentEdge,
) {
self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
}

/// Try to link upstream to downstream in the graph.
///
/// If the edge between upstream and downstream already exists, return an error.
pub fn try_add_edge(
&mut self,
upstream_id: LocalFragmentId,
downstream_id: LocalFragmentId,
edge: StreamFragmentEdge,
) -> Result<(), String> {
let edge = StreamFragmentEdgeProto {
upstream_id,
downstream_id,
Expand All @@ -140,6 +152,12 @@ impl StreamFragmentGraph {

self.edges
.try_insert((upstream_id, downstream_id), edge)
.unwrap();
.map(|_| ())
.map_err(|e| {
format!(
"edge between {} and {} already exists: {}",
upstream_id, downstream_id, e
)
})
}
}
71 changes: 68 additions & 3 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::Result;
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::{
DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag,
DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
};

Expand Down Expand Up @@ -297,15 +297,80 @@ fn build_fragment(
// Exchange node should have only one input.
let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap();
let child_fragment = build_and_add_fragment(state, input)?;
state.fragment_graph.add_edge(

let result = state.fragment_graph.try_add_edge(
child_fragment.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
dispatch_strategy: exchange_node_strategy,
dispatch_strategy: exchange_node_strategy.clone(),
// Always use the exchange operator id as the link id.
link_id: child_node.operator_id,
},
);

// It's possible that there're multiple edges between two fragments, while the
// meta service and the compute node does not expect this. In this case, we
// manually insert a fragment of `NoOp` between the two fragments.
if result.is_err() {
// Assign a new operator id for the `Exchange`, so we can distinguish it
// from duplicate edges and break the sharing.
child_node.operator_id = state.gen_operator_id() as u64;

// Take the upstream plan node as the reference for properties of `NoOp`.
let ref_fragment_node = child_fragment.node.as_ref().unwrap();
let no_shuffle_strategy = DispatchStrategy {
r#type: DispatcherType::NoShuffle as i32,
dist_key_indices: vec![],
output_indices: (0..ref_fragment_node.fields.len() as u32).collect(),
};

let no_op_operator_id = state.gen_operator_id() as u64;
let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;

let no_op_fragment = {
let node = StreamNode {
operator_id: no_op_operator_id,
identity: "StreamNoOp".into(),
node_body: Some(NodeBody::NoOp(NoOpNode {})),
input: vec![StreamNode {
operator_id: no_shuffle_exchange_operator_id,
identity: "StreamNoShuffleExchange".into(),
node_body: Some(NodeBody::Exchange(ExchangeNode {
strategy: Some(no_shuffle_strategy.clone()),
})),
input: vec![],
..*ref_fragment_node.clone()
}],
..*ref_fragment_node.clone()
};

let mut fragment = state.new_stream_fragment();
fragment.node = Some(node.into());
Rc::new(fragment)
};

state.fragment_graph.add_fragment(no_op_fragment.clone());

state.fragment_graph.add_edge(
child_fragment.fragment_id,
no_op_fragment.fragment_id,
StreamFragmentEdge {
// Use `NoShuffle` exhcnage strategy for upstream edge.
dispatch_strategy: no_shuffle_strategy,
link_id: no_shuffle_exchange_operator_id,
},
);
state.fragment_graph.add_edge(
no_op_fragment.fragment_id,
current_fragment.fragment_id,
StreamFragmentEdge {
// Use the original exchange strategy for downstream edge.
dispatch_strategy: exchange_node_strategy,
link_id: child_node.operator_id,
},
);
}

Ok(child_node)
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ mod lookup_union;
mod managed_state;
mod merge;
mod mview;
mod no_op;
mod now;
mod over_window;
mod project;
Expand Down Expand Up @@ -125,6 +126,7 @@ pub use lookup::*;
pub use lookup_union::LookupUnionExecutor;
pub use merge::MergeExecutor;
pub use mview::*;
pub use no_op::NoOpExecutor;
pub use now::NowExecutor;
pub use project::ProjectExecutor;
pub use project_set::*;
Expand Down
53 changes: 53 additions & 0 deletions src/stream/src/executor/no_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;

use super::{ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, PkIndicesRef};

/// No-op executor directly forwards the input stream. Currently used to break the multiple edges in
/// the fragment graph.
pub struct NoOpExecutor {
_ctx: ActorContextRef,
identity: String,
input: BoxedExecutor,
}

impl NoOpExecutor {
pub fn new(ctx: ActorContextRef, input: BoxedExecutor, executor_id: u64) -> Self {
Self {
_ctx: ctx,
identity: format!("BarrierRecvExecutor {:X}", executor_id),
input,
}
}
}

impl Executor for NoOpExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.input.execute()
}

fn schema(&self) -> &Schema {
self.input.schema()
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
self.input.pk_indices()
}

fn identity(&self) -> &str {
&self.identity
}
}
3 changes: 3 additions & 0 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod lookup;
mod lookup_union;
mod merge;
mod mview;
mod no_op;
mod now;
mod project;
mod project_set;
Expand Down Expand Up @@ -73,6 +74,7 @@ use self::lookup::*;
use self::lookup_union::*;
use self::merge::*;
use self::mview::*;
use self::no_op::*;
use self::now::NowExecutorBuilder;
use self::project::*;
use self::project_set::*;
Expand Down Expand Up @@ -161,5 +163,6 @@ pub async fn create_executor(
NodeBody::Values => ValuesExecutorBuilder,
NodeBody::BarrierRecv => BarrierRecvExecutorBuilder,
NodeBody::AppendOnlyDedup => AppendOnlyDedupExecutorBuilder,
NodeBody::NoOp => NoOpExecutorBuilder,
}
}
38 changes: 38 additions & 0 deletions src/stream/src/from_proto/no_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::NoOpNode;
use risingwave_storage::StateStore;

use super::ExecutorBuilder;
use crate::error::StreamResult;
use crate::executor::{BoxedExecutor, Executor, NoOpExecutor};
use crate::task::{ExecutorParams, LocalStreamManagerCore};

pub struct NoOpExecutorBuilder;

#[async_trait::async_trait]
impl ExecutorBuilder for NoOpExecutorBuilder {
type Node = NoOpNode;

async fn new_boxed_executor(
params: ExecutorParams,
_node: &NoOpNode,
_store: impl StateStore,
_stream: &mut LocalStreamManagerCore,
) -> StreamResult<BoxedExecutor> {
let [input]: [_; 1] = params.input.try_into().unwrap();
Ok(NoOpExecutor::new(params.actor_context, input, params.executor_id).boxed())
}
}