Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce watermark on source #7750

Merged
merged 13 commits into from
Feb 9, 2023
53 changes: 52 additions & 1 deletion dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 24 additions & 22 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ message ColumnIndex {
uint64 index = 1;
}

message WatermarkDesc {
// The column idx the watermark is on
uint32 watermark_idx = 1;
// The expression to calculate the watermark value.
expr.ExprNode expr = 2;
}

message StreamSourceInfo {
plan_common.RowFormatType row_format = 1;
string row_schema_location = 2;
Expand Down Expand Up @@ -44,6 +51,9 @@ message Source {
uint32 owner = 9;

StreamSourceInfo info = 10;
// Define watermarks on the source. The `repeated` is just for forward
// compatibility, currently, only one watermark on the source
repeated WatermarkDesc watermark_descs = 11;
}

message Sink {
Expand Down
10 changes: 4 additions & 6 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,10 @@ message LookupNode {

// WatermarkFilter needs to filter the upstream data by the water mark.
message WatermarkFilterNode {
// The expression to calculate the watermark value.
expr.ExprNode watermark_expr = 1;
// The column the event time belongs.
uint64 event_time_col_idx = 2;
// The table used to persist watermark, the key is vnode.
catalog.Table table = 3;
// The watermark descs
repeated catalog.WatermarkDesc watermark_descs = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry. repeated field is compatible with non-repeated one, so you may leave them as non-repeated now and change it until needed, in order to make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already changed🤣

// The tables used to persist watermarks, the key is vnode.
repeated catalog.Table tables = 2;
}

// Acts like a merger, but on different inputs.
Expand Down
44 changes: 44 additions & 0 deletions src/frontend/src/binder/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::Field;
use risingwave_common::error::Result;

use crate::catalog::column_catalog::ColumnCatalog;
use crate::Binder;

impl Binder {
pub fn bind_columns_to_context(
&mut self,
name: String,
column_catalogs: Vec<ColumnCatalog>,
) -> Result<()> {
let columns = column_catalogs
.iter()
.map(|c| (c.is_hidden, Field::from(&c.column_desc)))
.collect_vec();
self.bind_table_to_context(columns, name, None)
}

pub fn get_column_binding_index(
&mut self,
table_name: String,
column_name: &String,
) -> Result<usize> {
Ok(self
.context
.get_column_binding_index(&Some(table_name), column_name)?)
}
st1page marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod subquery;
mod value;

impl Binder {
pub(super) fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
pub fn bind_expr(&mut self, expr: Expr) -> Result<ExprImpl> {
match expr {
// literal
Expr::Value(v) => Ok(ExprImpl::Literal(Box::new(self.bind_value(v)?))),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::session_config::SearchPath;
use risingwave_sqlparser::ast::Statement;

mod bind_context;
mod create;
mod delete;
mod expr;
mod insert;
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo};
use risingwave_pb::catalog::{Source as ProstSource, StreamSourceInfo, WatermarkDesc};

use super::column_catalog::ColumnCatalog;
use super::{ColumnId, RelationCatalog, SourceId};
Expand All @@ -34,6 +34,7 @@ pub struct SourceCatalog {
pub info: StreamSourceInfo,
pub row_id_index: Option<usize>,
pub properties: HashMap<String, String>,
pub watermark_descs: Vec<WatermarkDesc>,
}

impl From<&ProstSource> for SourceCatalog {
Expand All @@ -56,6 +57,7 @@ impl From<&ProstSource> for SourceCatalog {

let append_only = row_id_index.is_some();
let owner = prost.owner;
let watermark_descs = prost.get_watermark_descs().clone();

Self {
id,
Expand All @@ -67,6 +69,7 @@ impl From<&ProstSource> for SourceCatalog {
info: prost.info.clone().unwrap(),
row_id_index,
properties: with_options.into_inner(),
watermark_descs,
}
}
}
Expand Down
Loading