Skip to content

Commit

Permalink
feat: introduce watermark on source (#7750)
Browse files Browse the repository at this point in the history
Done:
- introduce watermark on source.

TODO:
- watermark expr display in frontend
- define multiple watermark on source
- define watermark on table

Approved-By: fuyufjh
  • Loading branch information
yuhao-su authored Feb 9, 2023
1 parent fa96b32 commit 42f37b5
Show file tree
Hide file tree
Showing 24 changed files with 464 additions and 56 deletions.
53 changes: 52 additions & 1 deletion dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 24 additions & 22 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ message ColumnIndex {
uint64 index = 1;
}

message WatermarkDesc {
// The column idx the watermark is on
uint32 watermark_idx = 1;
// The expression to calculate the watermark value.
expr.ExprNode expr = 2;
}

message StreamSourceInfo {
plan_common.RowFormatType row_format = 1;
string row_schema_location = 2;
Expand Down Expand Up @@ -44,6 +51,9 @@ message Source {
uint32 owner = 9;

StreamSourceInfo info = 10;
// Define watermarks on the source. The `repeated` is just for forward
// compatibility, currently, only one watermark on the source
repeated WatermarkDesc watermark_descs = 11;
}

message Sink {
Expand Down
10 changes: 4 additions & 6 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,10 @@ message LookupNode {

// WatermarkFilter needs to filter the upstream data by the water mark.
message WatermarkFilterNode {
// The expression to calculate the watermark value.
expr.ExprNode watermark_expr = 1;
// The column the event time belongs.
uint64 event_time_col_idx = 2;
// The table used to persist watermark, the key is vnode.
catalog.Table table = 3;
// The watermark descs
repeated catalog.WatermarkDesc watermark_descs = 1;
// The tables used to persist watermarks, the key is vnode.
repeated catalog.Table tables = 2;
}

// Acts like a merger, but on different inputs.
Expand Down
15 changes: 15 additions & 0 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: watermark on source
sql: |
create source t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) ROW FORMAT JSON;
select t.v1 - INTERVAL '2' SECOND as v1 from t;
logical_plan: |
LogicalProject { exprs: [(v1 - '00:00:02':Interval)] }
└─LogicalSource { source: t, columns: [v1, _row_id], time_range: [(Unbounded, Unbounded)] }
stream_plan: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [(v1 - '00:00:02':Interval), _row_id], watermark_columns: [(v1 - '00:00:02':Interval)] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
43 changes: 43 additions & 0 deletions src/frontend/src/binder/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, Field};
use risingwave_common::error::Result;

use crate::Binder;

impl Binder {
pub fn bind_columns_to_context(
&mut self,
name: String,
column_catalogs: Vec<ColumnCatalog>,
) -> Result<()> {
let columns = column_catalogs
.iter()
.map(|c| (c.is_hidden, Field::from(&c.column_desc)))
.collect_vec();
self.bind_table_to_context(columns, name, None)
}

pub fn get_column_binding_index(
&mut self,
table_name: String,
column_name: &String,
) -> Result<usize> {
Ok(self
.context
.get_column_binding_index(&Some(table_name), column_name)?)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod subquery;
mod value;

impl Binder {
pub(super) fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
pub fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
match expr {
// literal
Expr::Value(v) => Ok(ExprImpl::Literal(Box::new(self.bind_value(v)?))),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::session_config::SearchPath;
use risingwave_sqlparser::ast::Statement;

mod bind_context;
mod create;
mod delete;
mod expr;
mod insert;
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;

use risingwave_common::catalog::ColumnCatalog;
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo};
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo, WatermarkDesc};

use super::{ColumnId, RelationCatalog, SourceId};
use crate::user::UserId;
Expand All @@ -34,6 +34,7 @@ pub struct SourceCatalog {
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: HashMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
}

impl From<&ProstSource> for SourceCatalog {
Expand All @@ -56,6 +57,7 @@ impl From<&ProstSource> for SourceCatalog {

let append_only = row_id_index.is_some();
let owner = prost.owner;
let watermark_descs = prost.get_watermark_descs().clone();

Self {
id,
Expand All @@ -67,6 +69,7 @@ impl From<&ProstSource> for SourceCatalog {
info: prost.info.clone().unwrap(),
row_id_index,
properties: with_options.into_inner(),
watermark_descs,
}
}
}
Expand Down
Loading

0 comments on commit 42f37b5

Please sign in to comment.