Skip to content

Commit

Permalink
Merge branch 'main' into zhidong/show-params
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 14, 2023
2 parents a2a41ae + 680bc23 commit 76d08bc
Show file tree
Hide file tree
Showing 30 changed files with 1,185 additions and 55 deletions.
130 changes: 128 additions & 2 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ message SimpleAggNode {
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
}

message HashAggNode {
Expand All @@ -224,6 +225,7 @@ message HashAggNode {
// Whether to optimize for append only stream.
// It is true when the input is append-only
bool is_append_only = 5;
map<uint32, catalog.Table> distinct_dedup_tables = 6;
}

message TopNNode {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [(v1 - '00:00:02':Interval), _row_id], watermark_columns: [(v1 - '00:00:02':Interval)] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
17 changes: 17 additions & 0 deletions src/frontend/src/expr/function_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,23 @@ impl FunctionCall {
pub fn inputs_mut(&mut self) -> &mut [ExprImpl] {
self.inputs.as_mut()
}

pub(super) fn from_expr_proto(
function_call: &risingwave_pb::expr::FunctionCall,
expr_type: ExprType,
ret_type: DataType,
) -> Result<Self> {
let inputs: Vec<_> = function_call
.get_children()
.iter()
.map(ExprImpl::from_expr_proto)
.try_collect()?;
Ok(Self {
func_type: expr_type,
return_type: ret_type,
inputs,
})
}
}

impl Expr for FunctionCall {
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/expr/input_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ impl InputRef {
r#type: Some(self.data_type.to_protobuf()),
}
}

pub(super) fn from_expr_proto(
input_ref: &risingwave_pb::expr::InputRefExpr,
ret_type: DataType,
) -> risingwave_common::error::Result<Self> {
Ok(Self {
index: input_ref.get_column_idx() as usize,
data_type: ret_type,
})
}
}

impl Expr for InputRef {
Expand Down
33 changes: 31 additions & 2 deletions src/frontend/src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use risingwave_common::array::list_array::display_for_explain;
use risingwave_common::types::to_text::ToText;
use risingwave_common::types::{literal_type_match, DataType, Datum};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_common::util::value_encoding::{deserialize_datum, serialize_datum};
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::expr::expr_node::RexNode;

use super::Expr;
Expand Down Expand Up @@ -77,6 +78,16 @@ impl Literal {
pub fn get_data(&self) -> &Datum {
&self.data
}

pub(super) fn from_expr_proto(
proto: &risingwave_pb::expr::ExprNode,
) -> risingwave_common::error::Result<Self> {
let data_type = proto.get_return_type()?;
Ok(Self {
data: value_encoding_to_literal(&proto.rex_node, &data_type.into())?,
data_type: data_type.into(),
})
}
}

impl Expr for Literal {
Expand All @@ -99,12 +110,30 @@ fn literal_to_value_encoding(d: &Datum) -> Option<RexNode> {
if d.is_none() {
return None;
}
use risingwave_pb::data::Datum as ProstDatum;

let body = serialize_datum(d.as_ref());
Some(RexNode::Constant(ProstDatum { body }))
}

/// Convert protobuf into a literal value (datum).
fn value_encoding_to_literal(
proto: &Option<RexNode>,
ty: &DataType,
) -> risingwave_common::error::Result<Datum> {
if let Some(rex_node) = proto {
if let RexNode::Constant(prost_datum) = rex_node {
let datum = deserialize_datum(prost_datum.body.as_ref(), ty)?;
// remove unwrap
// https://github.com/risingwavelabs/risingwave/issues/7862
Ok(Some(datum.unwrap()))
} else {
unreachable!()
}
} else {
Ok(None)
}
}

#[cfg(test)]
mod tests {
use risingwave_common::array::{ListValue, StructValue};
Expand Down
19 changes: 19 additions & 0 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::array::ListValue;
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, Datum, Scalar};
use risingwave_expr::expr::{build_from_prost, AggKind};
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::{ExprNode, ProjectSetSelectItem};

mod agg_call;
Expand Down Expand Up @@ -726,6 +727,24 @@ impl ExprImpl {
}),
}
}

pub fn from_expr_proto(proto: &ExprNode) -> Result<Self> {
let rex_node = proto.get_rex_node()?;
let ret_type = proto.get_return_type()?.into();
let expr_type = proto.get_expr_type()?;
Ok(match rex_node {
RexNode::InputRef(input_ref) => {
Self::InputRef(Box::new(InputRef::from_expr_proto(input_ref, ret_type)?))
}
RexNode::Constant(_) => Self::Literal(Box::new(Literal::from_expr_proto(proto)?)),
RexNode::Udf(udf) => Self::UserDefinedFunction(Box::new(
UserDefinedFunction::from_expr_proto(udf, ret_type)?,
)),
RexNode::FuncCall(function_call) => Self::FunctionCall(Box::new(
FunctionCall::from_expr_proto(function_call, expr_type, ret_type)?,
)),
})
}
}

impl Expr for ExprImpl {
Expand Down
32 changes: 32 additions & 0 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::FunctionId;
use risingwave_common::types::DataType;

use super::{Expr, ExprImpl};
Expand All @@ -29,6 +31,36 @@ impl UserDefinedFunction {
pub fn new(catalog: Arc<FunctionCatalog>, args: Vec<ExprImpl>) -> Self {
Self { args, catalog }
}

pub(super) fn from_expr_proto(
udf: &risingwave_pb::expr::UserDefinedFunction,
ret_type: DataType,
) -> risingwave_common::error::Result<Self> {
let args: Vec<_> = udf
.get_children()
.iter()
.map(ExprImpl::from_expr_proto)
.try_collect()?;

// function catalog
let arg_types = udf.get_arg_types().iter().map_into().collect_vec();
let catalog = FunctionCatalog {
// FIXME(yuhao): function id is not in udf proto.
id: FunctionId::placeholder(),
name: udf.get_name().clone(),
// FIXME(yuhao): owner is not in udf proto.
owner: u32::MAX - 1,
arg_types,
return_type: ret_type,
language: udf.get_language().clone(),
path: udf.get_path().clone(),
};

Ok(Self {
args,
catalog: Arc::new(catalog),
})
}
}

impl Expr for UserDefinedFunction {
Expand Down
Loading

0 comments on commit 76d08bc

Please sign in to comment.