Skip to content

Commit

Permalink
Merge branch 'main' into fix/nexmark-properties-non-string
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 11, 2022
2 parents a22a638 + f93d8e3 commit b6b5d76
Show file tree
Hide file tree
Showing 37 changed files with 512 additions and 332 deletions.
44 changes: 42 additions & 2 deletions e2e_test/streaming/struct_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,48 @@ select * from mv1;
(1,2)
(1,3)


statement ok
create table t1 (v1 int, v2 struct<v3 int, v4 struct<v5 int, v6 int>>);

statement ok
insert into t1 values(4,(3,(2, 2)));

statement ok
insert into t1 values(1,(2,(1, 0)));

statement ok
insert into t1 values(2,(5,(4, 1)));

statement ok
insert into t1 values(3,(6,(3, 4)));

statement ok
flush;

query II
select * from t1 order by v1;
----
1 (2,(1,0))
2 (5,(4,1))
3 (6,(3,4))
4 (3,(2,2))

statement ok
create materialized view mv3 as select * from t1 order by (v2).v3;

# The result of select * from mv3 is not expected


statement ok
drop materialized view mv3;

statement ok
drop materialized view mv1
drop table t1;

statement ok
drop table st
drop materialized view mv1;

statement ok
drop table st;

6 changes: 6 additions & 0 deletions e2e_test/streaming/tpch/drop_views.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ drop materialized view tpch_q2;
statement ok
drop materialized view tpch_q3;

statement ok
drop materialized view tpch_q4;

statement ok
drop materialized view tpch_q5;

Expand Down Expand Up @@ -40,6 +43,9 @@ drop materialized view tpch_q14;
statement ok
drop materialized view tpch_q17;

statement ok
drop materialized view tpch_q18;

statement ok
drop materialized view tpch_q19;

Expand Down
14 changes: 14 additions & 0 deletions e2e_test/streaming/tpch_snapshot.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ include ./tpch/views/q17.slt.part
# include ./tpch/views/q18.slt.part
include ./tpch/views/q19.slt.part
# include ./tpch/views/q20.slt.part
include ./tpch/views/q22.slt.part

include ./tpch/q1.slt.part
include ./tpch/q2.slt.part
Expand All @@ -46,6 +47,7 @@ include ./tpch/q17.slt.part
# include ./tpch/q18.slt.part
include ./tpch/q19.slt.part
# include ./tpch/q20.slt.part
include ./tpch/q22.slt.part

statement ok
drop materialized view tpch_q1;
Expand All @@ -56,6 +58,9 @@ drop materialized view tpch_q2;
statement ok
drop materialized view tpch_q3;

# statement ok
# drop materialized view tpch_q4;

statement ok
drop materialized view tpch_q5;

Expand Down Expand Up @@ -89,7 +94,16 @@ drop materialized view tpch_q14;
statement ok
drop materialized view tpch_q17;

# statement ok
# drop materialized view tpch_q18;

statement ok
drop materialized view tpch_q19;

# statement ok
# drop materialized view tpch_q20;

statement ok
drop materialized view tpch_q22;

include ../tpch/drop_tables.slt.part
14 changes: 14 additions & 0 deletions e2e_test/streaming/tpch_upstream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ include ./tpch/views/q17.slt.part
# include ./tpch/views/q18.slt.part
include ./tpch/views/q19.slt.part
# include ./tpch/views/q20.slt.part
include ./tpch/views/q22.slt.part

include ./tpch/q1.slt.part
include ./tpch/q2.slt.part
Expand All @@ -49,6 +50,7 @@ include ./tpch/q17.slt.part
# include ./tpch/q18.slt.part
include ./tpch/q19.slt.part
# include ./tpch/q20.slt.part
include ./tpch/q22.slt.part

statement ok
drop materialized view tpch_q1;
Expand All @@ -59,6 +61,9 @@ drop materialized view tpch_q2;
statement ok
drop materialized view tpch_q3;

# statement ok
# drop materialized view tpch_q4;

statement ok
drop materialized view tpch_q5;

Expand Down Expand Up @@ -92,7 +97,16 @@ drop materialized view tpch_q14;
statement ok
drop materialized view tpch_q17;

# statement ok
# drop materialized view tpch_q18;

statement ok
drop materialized view tpch_q19;

# statement ok
# drop materialized view tpch_q20;

statement ok
drop materialized view tpch_q22;

include ../tpch/drop_tables.slt.part
9 changes: 5 additions & 4 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ message HashJoinNode {
message DynamicFilterNode {
uint32 left_key = 1;
// Must be one of <, <=, >, >=
expr.ExprNode condition = 3;
// Used for internal table states.
// catalog.Table left_table = 4;
// uint32 dist_key_l = 8;
expr.ExprNode condition = 2;
// Left table stores all states with predicate possibly not NULL.
catalog.Table left_table = 3;
// Right table stores single value from RHS of predicate.
catalog.Table right_table = 4;
// It is true when the right side of the inequality predicate is monotonically:
// - decreasing for <, <=, increasing for >, >=
// bool is_monotonic = 10;
Expand Down
3 changes: 2 additions & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ impl OrderByExecutor {
Interval,
NaiveDate,
NaiveTime,
NaiveDateTime
NaiveDateTime,
Struct
]
);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bench/ss_bench/operations/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Operations {
let start = Instant::now();
let batch = batch
.into_iter()
.map(|(k, v)| (k, StorageValue::new(Default::default(), v)))
.map(|(k, v)| (k, StorageValue::new(v)))
.collect_vec();
let epoch = ctx.epoch.load(Ordering::Acquire);
store
Expand Down
5 changes: 0 additions & 5 deletions src/expr/src/expr/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub enum AggKind {
Max,
Sum,
Count,
RowCount,
Avg,
StringAgg,
SingleValue,
Expand All @@ -38,7 +37,6 @@ impl std::fmt::Display for AggKind {
AggKind::Max => write!(f, "max"),
AggKind::Sum => write!(f, "sum"),
AggKind::Count => write!(f, "count"),
AggKind::RowCount => write!(f, "row_count"),
AggKind::Avg => write!(f, "avg"),
AggKind::StringAgg => write!(f, "string_agg"),
AggKind::SingleValue => write!(f, "single_value"),
Expand Down Expand Up @@ -75,9 +73,6 @@ impl AggKind {
Self::Count => Type::Count,
Self::StringAgg => Type::StringAgg,
Self::SingleValue => Type::SingleValue,
Self::RowCount => {
panic!("cannot convert RowCount to prost, TODO: remove RowCount from AggKind")
}
Self::ApproxCountDistinct => Type::ApproxCountDistinct,
}
}
Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/expr/agg_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ impl AggCall {
/// Infer the return type for the given agg call.
/// Returns error if not supported or the arguments are invalid.
pub fn infer_return_type(agg_kind: &AggKind, inputs: &[DataType]) -> Result<DataType> {
let unsupported = || {
let args = inputs.iter().map(|t| format!("{:?}", t)).join(", ");
Err(RwError::from(ErrorCode::NotImplemented(
format!("Unsupported aggregation: {}({})", agg_kind, args),
112.into(),
)))
};
let invalid = || {
let args = inputs.iter().map(|t| format!("{:?}", t)).join(", ");
Err(RwError::from(ErrorCode::InvalidInputSyntax(format!(
Expand Down Expand Up @@ -107,9 +100,6 @@ impl AggCall {
// SingleValue
(AggKind::SingleValue, [input]) => input.clone(),
(AggKind::SingleValue, _) => return invalid(),

// Others
_ => return unsupported(),
};

Ok(return_type)
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ impl PlanAggCall {
| AggKind::StringAgg
| AggKind::SingleValue => self.agg_kind.clone(),

AggKind::Count | AggKind::RowCount | AggKind::Sum | AggKind::ApproxCountDistinct => {
AggKind::Sum
}
AggKind::Count | AggKind::Sum | AggKind::ApproxCountDistinct => AggKind::Sum,
};
PlanAggCall {
agg_kind: total_agg_kind,
Expand Down Expand Up @@ -184,7 +182,6 @@ impl LogicalAgg {
}
AggKind::Sum
| AggKind::Count
| AggKind::RowCount
| AggKind::Avg
| AggKind::SingleValue
| AggKind::ApproxCountDistinct => {
Expand Down
61 changes: 61 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
// limitations under the License.
use std::fmt;

use risingwave_common::catalog::{DatabaseId, SchemaId};
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;

use super::utils::TableCatalogBuilder;
use crate::catalog::TableCatalog;
use crate::expr::Expr;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, ToStreamProst};
use crate::optimizer::PlanRef;
Expand Down Expand Up @@ -83,6 +87,63 @@ impl ToStreamProst for StreamDynamicFilter {
.predicate
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
left_table: Some(
infer_left_internal_table_catalog(self.clone().into(), self.left_index).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
),
right_table: Some(
infer_right_internal_table_catalog(self.right.clone()).to_prost(
SchemaId::placeholder() as u32,
DatabaseId::placeholder() as u32,
),
),
})
}
}

fn infer_left_internal_table_catalog(input: PlanRef, left_key_index: usize) -> TableCatalog {
let base = input.plan_base();
let schema = &base.schema;

let append_only = input.append_only();
let dist_keys = base.dist.dist_column_indices().to_vec();

// The pk of dynamic filter internal table should be left_key + input_pk.
let mut pk_indices = vec![left_key_index];
// TODO(yuhao): dedup the dist key and pk.
pk_indices.extend(&base.pk_indices);

let mut internal_table_catalog_builder = TableCatalogBuilder::new();

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column_desc_from_field_without_order_type(field)
});

pk_indices.iter().for_each(|idx| {
internal_table_catalog_builder.add_order_column(*idx, OrderType::Ascending)
});

internal_table_catalog_builder.build(dist_keys, append_only)
}

fn infer_right_internal_table_catalog(input: PlanRef) -> TableCatalog {
let base = input.plan_base();
let schema = &base.schema;

// We require that the right table has distribution `Single`
assert_eq!(
base.dist.dist_column_indices().to_vec(),
Vec::<usize>::new()
);

let mut internal_table_catalog_builder = TableCatalogBuilder::new();

schema.fields().iter().for_each(|field| {
internal_table_catalog_builder.add_column_desc_from_field_without_order_type(field)
});

// No distribution keys
internal_table_catalog_builder.build(vec![], false)
}
12 changes: 11 additions & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ impl StreamFragmenter {
NodeBody::HashAgg(_)
| NodeBody::HashJoin(_)
| NodeBody::DeltaIndexJoin(_)
| NodeBody::Chain(_) => {
| NodeBody::Chain(_)
| NodeBody::DynamicFilter(_) => {
if insert_exchange_flag {
let child_node =
self.rewrite_stream_node_inner(state, child_node, false)?;
Expand Down Expand Up @@ -355,6 +356,15 @@ impl StreamFragmenter {
append_only_top_n_node.table_id_h = state.gen_table_id();
}

NodeBody::DynamicFilter(dynamic_filter_node) => {
if let Some(left_table) = &mut dynamic_filter_node.left_table {
left_table.id = state.gen_table_id();
}
if let Some(right_table) = &mut dynamic_filter_node.right_table {
right_table.id = state.gen_table_id();
}
}

_ => {}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub fn record_table_vnode_mappings(
hash_mapping_manager
.set_fragment_state_table(fragment_id, node.right_table.as_ref().unwrap().id);
}
NodeBody::DynamicFilter(node) => {
hash_mapping_manager
.set_fragment_state_table(fragment_id, node.left_table.as_ref().unwrap().id);
hash_mapping_manager
.set_fragment_state_table(fragment_id, node.right_table.as_ref().unwrap().id);
}
_ => {}
}
let input_nodes = stream_node.get_input();
Expand Down
Loading

0 comments on commit b6b5d76

Please sign in to comment.