-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce watermark on source #7750
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
fn bind_source_watermark( | ||
session: &SessionImpl, | ||
name: String, | ||
source_watermarks: Vec<SourceWatermark>, | ||
column_catalogs: &[ColumnCatalog], | ||
) -> Result<Vec<WatermarkDesc>> { | ||
let mut binder = Binder::new(session); | ||
binder.bind_column_defs(name.clone(), column_catalogs.to_vec())?; | ||
|
||
let watermark_descs = source_watermarks | ||
.into_iter() | ||
.map(|source_watermark| { | ||
let col_name = source_watermark.column.real_value(); | ||
let watermark_idx = binder.get_column_binding_index(name.clone(), &col_name)?; | ||
|
||
let expr = binder.bind_expr(source_watermark.expr)?.to_expr_proto(); | ||
|
||
Ok::<_, RwError>(WatermarkDesc { | ||
watermark_idx: Some(ProstColumnIndex { | ||
index: watermark_idx as u64, | ||
}), | ||
expr: Some(expr), | ||
}) | ||
}) | ||
.try_collect()?; | ||
Ok(watermark_descs) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is too heavy here to construct a new binder. we can extract some bind expression logic later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in later PR
Codecov Report
@@ Coverage Diff @@
## main #7750 +/- ##
========================================
Coverage 71.67% 71.68%
========================================
Files 1101 1103 +2
Lines 175867 176072 +205
========================================
+ Hits 126058 126214 +156
- Misses 49809 49858 +49
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
…hao/watermark_source
Do we have any simple solution for this? @st1page If not, we may defer it until real-world cases appear. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
@@ -356,7 +357,10 @@ impl ToBatch for LogicalSource { | |||
impl ToStream for LogicalSource { | |||
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> { | |||
let mut plan: PlanRef = StreamSource::new(self.clone()).into(); | |||
if let Some(row_id_index) = self.core.row_id_index && self.core.gen_row_id{ | |||
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty(){ | |||
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to test this in e2e or planner test? 🤔
(Perhaps after supporting watermark on append-only tables?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a planner test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can check the state cleaning in the e2e test because our state cleaning is based on the consistent range_delete
// The table used to persist watermark, the key is vnode. | ||
catalog.Table table = 3; | ||
// The watermark descs | ||
repeated catalog.WatermarkDesc watermark_descs = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't worry. repeated
field is compatible with non-repeated one, so you may leave them as non-repeated now and change it until needed, in order to make the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already changed🤣
The only thing we need to do is to refine the |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Done:
TODO:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
We can create at most 1 watermark on the source column now by specifying
WATERMARK FOR [column] as <expr>
. Then aWatermarkFilterExecutor
will be generated after theSourceExecutor
Just a simple example, not the actual use case:
To create a watermark on
bid.auction
Release note
Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.
Refer to a related PR or issue link (optional)
#6952
#6750