Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): support StreamValues #8751

Merged
merged 21 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions e2e_test/streaming/values.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
statement ok
create materialized view mvb as values (233, 'risingwave'), (233, 'risingwave');

statement ok
flush;

query IR
select * from mvb;
----
233 risingwave
233 risingwave

statement ok
create table t (v int, c varchar);

statement ok
insert into t values (1, 'China'), (0, 'United States');

statement ok
create materialized view mv as
with dict(abbr, real) as (values ('cn', 'China'), ('us', 'United States'))
select * from t join dict on t.c = dict.real;

query IRRR
select * from mv order by v;
----
0 United States us United States
1 China cn China

statement ok
drop materialized view mvb;

statement ok
drop materialized view mv;

statement ok
drop table t;
24 changes: 17 additions & 7 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ message NowNode {
catalog.Table state_table = 1;
}

message ValuesNode {
message ExprTuple {
repeated expr.ExprNode cells = 1;
}
repeated ExprTuple tuples = 1;
repeated plan_common.Field fields = 2;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -565,6 +573,7 @@ message StreamNode {
GroupTopNNode append_only_group_top_n = 130;
TemporalJoinNode temporal_join = 131;
BarrierRecvNode barrier_recv = 132;
ValuesNode values = 133;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -646,13 +655,14 @@ message StreamActor {
}

enum FragmentTypeFlag {
FRAGMENT_UNSPECIFIED = 0;
SOURCE = 1;
MVIEW = 2;
SINK = 4;
NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
CHAIN_NODE = 16;
BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED = 0;
FRAGMENT_TYPE_FLAG_SOURCE = 1;
FRAGMENT_TYPE_FLAG_MVIEW = 2;
FRAGMENT_TYPE_FLAG_SINK = 4;
FRAGMENT_TYPE_FLAG_NOW = 8; // TODO: Remove this and insert a `BarrierRecv` instead.
FRAGMENT_TYPE_FLAG_CHAIN_NODE = 16;
FRAGMENT_TYPE_FLAG_BARRIER_RECV = 32;
FRAGMENT_TYPE_FLAG_VALUES = 64;
}

// The environment associated with a stream plan
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/planner_test/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
- sql: values (11, 22), (33+(1+2), 44);
batch_plan: |
BatchValues { rows: [[11:Int32, 22:Int32], [36:Int32, 44:Int32]] }
stream_plan: |
StreamMaterialize { columns: [*VALUES*_0.column_0, *VALUES*_0.column_1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck", watermark_columns: [*VALUES*_0.column_0, *VALUES*_0.column_1] }
└─StreamValues { rows: [[11:Int32, 22:Int32, 0:Int64], [(33:Int32 + (1:Int32 + 2:Int32)), 44:Int32, 1:Int64]] }
- sql: select * from t
binder_error: 'Catalog error: table or source not found: t'
- sql: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,19 @@
└─LogicalShare { id = 2 }
└─LogicalProject { exprs: [t1.x, t1.y] }
└─LogicalScan { table: t1, columns: [t1.x, t1.y, t1._row_id] }
- sql: |
create table t (v int, c varchar);
with dict(abbr, real) as (values ('cn', 'China'), ('us', 'United States')) select * from t join dict on t.c = dict.abbr;
logical_plan: |
LogicalProject { exprs: [t.v, t.c, *VALUES*_0.column_0, *VALUES*_0.column_1] }
└─LogicalJoin { type: Inner, on: (t.c = *VALUES*_0.column_0), output: all }
├─LogicalScan { table: t, columns: [t.v, t.c, t._row_id] }
└─LogicalShare { id = 1 }
└─LogicalValues { rows: [['cn':Varchar, 'China':Varchar], ['us':Varchar, 'United States':Varchar]], schema: Schema { fields: [*VALUES*_0.column_0:Varchar, *VALUES*_0.column_1:Varchar] } }
stream_plan: |
StreamMaterialize { columns: [v, c, abbr, real, t._row_id(hidden), _row_id(hidden)], stream_key: [t._row_id, _row_id, c], pk_columns: [t._row_id, _row_id, c], pk_conflict: "NoCheck" }
└─StreamHashJoin { type: Inner, predicate: t.c = *VALUES*_0.column_0, output: [t.v, t.c, *VALUES*_0.column_0, *VALUES*_0.column_1, t._row_id, _row_id] }
├─StreamExchange { dist: HashShard(t.c) }
| └─StreamTableScan { table: t, columns: [t.v, t.c, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(*VALUES*_0.column_0) }
└─StreamValues { rows: [['cn':Varchar, 'China':Varchar, 0:Int64], ['us':Varchar, 'United States':Varchar, 1:Int64]] }
5 changes: 3 additions & 2 deletions src/frontend/planner_test/tests/testdata/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@
| └─BatchScan { table: auction, columns: [auction.date_time], distribution: SomeShard }
└─BatchValues { rows: [[]] }
stream_error: |-
Feature is not yet implemented: Stream values executor is unimplemented!
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
st1page marked this conversation as resolved.
Show resolved Hide resolved
- sql: |
CREATE TABLE t (v int);
SELECT 1 FROM t AS t_inner WHERE EXISTS ( SELECT 1 HAVING t_inner.v > 1);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(slice_internals)]
#![feature(min_specialization)]
#![feature(is_some_and)]
#![feature(extend_one)]
#![recursion_limit = "256"]

#[macro_use]
Expand Down
57 changes: 45 additions & 12 deletions src/frontend/src/optimizer/plan_node/logical_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
use std::sync::Arc;
use std::{fmt, vec};

use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use itertools::Itertools;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, ScalarImpl};

use super::{
BatchValues, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown,
ToBatch, ToStream,
StreamValues, ToBatch, ToStream,
};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
use crate::expr::{Expr, ExprImpl, ExprRewriter, Literal};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down Expand Up @@ -53,6 +55,26 @@ impl LogicalValues {
}
}

/// Used only by `LogicalValues.rewrite_logical_for_stream`, set the `_row_id` column as pk
fn new_with_pk(
rows: Vec<Vec<ExprImpl>>,
schema: Schema,
ctx: OptimizerContextRef,
pk_index: usize,
) -> Self {
for exprs in &rows {
for (i, expr) in exprs.iter().enumerate() {
assert_eq!(schema.fields()[i].data_type(), expr.return_type())
}
}
let functional_dependency = FunctionalDependencySet::new(schema.len());
let base = PlanBase::new_logical(ctx, schema, vec![pk_index], functional_dependency);
Self {
rows: rows.into(),
base,
}
}

/// Create a [`LogicalValues`] node. Used by planner.
pub fn create(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> PlanRef {
// No additional checks after binder.
Expand Down Expand Up @@ -132,20 +154,31 @@ impl ToBatch for LogicalValues {

impl ToStream for LogicalValues {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
Err(RwError::from(ErrorCode::NotImplemented(
"Stream values executor is unimplemented!".to_string(),
None.into(),
)))
Ok(StreamValues::new(self.clone()).into())
}

fn logical_rewrite_for_stream(
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
Err(RwError::from(ErrorCode::NotImplemented(
"Stream values executor is unimplemented!".to_string(),
None.into(),
)))
let row_id_index = self.schema().len();
let col_index_mapping = ColIndexMapping::identity_or_none(row_id_index, row_id_index + 1);
let ctx = self.ctx();
let mut schema = self.schema().clone();
schema
.fields
.push(Field::with_name(DataType::Int64, "_row_id"));
let rows = self.rows().to_owned();
let row_with_id = rows
.into_iter()
.enumerate()
.map(|(i, mut r)| {
r.push(Literal::new(Some(ScalarImpl::Int64(i as i64)), DataType::Int64).into());
r
})
.collect_vec();
let logical_values = Self::new_with_pk(row_with_id, schema, ctx, row_id_index);
Ok((logical_values.into(), col_index_mapping))
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ mod stream_sink;
mod stream_source;
mod stream_table_scan;
mod stream_topn;
mod stream_values;
mod stream_watermark_filter;

mod derive;
Expand Down Expand Up @@ -745,6 +746,7 @@ pub use stream_table_scan::StreamTableScan;
pub use stream_temporal_join::StreamTemporalJoin;
pub use stream_topn::StreamTopN;
pub use stream_union::StreamUnion;
pub use stream_values::StreamValues;
pub use stream_watermark_filter::StreamWatermarkFilter;

use crate::expr::{ExprImpl, ExprRewriter, InputRef, Literal};
Expand Down Expand Up @@ -841,6 +843,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, Share }
, { Stream, WatermarkFilter }
, { Stream, TemporalJoin }
, { Stream, Values }
}
};
}
Expand Down Expand Up @@ -941,6 +944,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, Share }
, { Stream, WatermarkFilter }
, { Stream, TemporalJoin }
, { Stream, Values }
}
};
}
Expand Down
92 changes: 92 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_values.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;

use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamValues` implements `LogicalValues.to_stream()`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamValues {
pub base: PlanBase,
logical: LogicalValues,
}

impl_plan_tree_node_for_leaf! { StreamValues }

impl StreamValues {
/// `StreamValues` should enforce `Distribution::Single`
pub fn new(logical: LogicalValues) -> Self {
let ctx = logical.ctx();
let mut watermark_columns = FixedBitSet::with_capacity(logical.schema().len());
(0..(logical.schema().len() - 1)).for_each(|i| watermark_columns.set(i, true));
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
logical.logical_pk().to_vec(),
logical.functional_dependency().clone(),
Distribution::Single,
false,
watermark_columns,
);
Self { base, logical }
}

pub fn logical(&self) -> &LogicalValues {
&self.logical
}

fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
let cells = row.iter().map(|x| x.to_expr_proto()).collect();
ExprTuple { cells }
}
}

impl fmt::Display for StreamValues {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamValues")
.field("rows", &self.logical.rows())
.finish()
}
}

impl StreamNode for StreamValues {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
ProstStreamNode::Values(ValuesNode {
tuples: self
.logical
.rows()
.iter()
.map(|row| self.row_to_protobuf(row))
.collect(),
fields: self
.logical
.schema()
.fields()
.iter()
.map(|f| f.to_prost())
.collect(),
})
}
}

impl ExprRewritable for StreamValues {}
5 changes: 5 additions & 0 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ fn build_fragment(
current_fragment.requires_singleton = true;
}

NodeBody::Values(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
current_fragment.requires_singleton = true;
}

_ => {}
};

Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,15 @@ where
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
match &self.command {
Command::CreateStreamingJob { dispatchers, .. } => dispatchers
Command::CreateStreamingJob {
dispatchers,
table_fragments,
..
} => dispatchers
.values()
.flatten()
.flat_map(|dispatcher| dispatcher.downstream_actor_id.iter().copied())
.chain(table_fragments.values_actor_ids().into_iter())
.collect(),

_ => Default::default(),
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl TableFragments {
(fragment_type_mask
& (FragmentTypeFlag::Source as u32
| FragmentTypeFlag::Now as u32
| FragmentTypeFlag::Values as u32
| FragmentTypeFlag::BarrierRecv as u32))
!= 0
})
Expand All @@ -273,6 +274,13 @@ impl TableFragments {
})
}

/// Returns values actor ids.
pub fn values_actor_ids(&self) -> Vec<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & FragmentTypeFlag::Values as u32) != 0
})
}

/// Returns the fragment with the `Mview` type flag.
pub fn mview_fragment(&self) -> Option<Fragment> {
self.fragments
Expand Down
Loading