-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add kafka backfill frontend #15602
Conversation
228157d
to
b26f03c
Compare
b26f03c
to
2285c1f
Compare
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
src/frontend/src/optimizer/plan_node/logical_source_backfill.rs
Outdated
Show resolved
Hide resolved
src/frontend/src/optimizer/plan_node/logical_source_backfill.rs
Outdated
Show resolved
Hide resolved
proto/catalog.proto
Outdated
// Only used when `has_streaming_job` is `true`. | ||
// If `false`, `requires_singleton` will be set in the stream plan. | ||
bool is_distributed = 15; | ||
reserved "cdc_source_job"; // deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deprecating cdc_source_job
and adding has_streaming_job
will break backward compability of existing CDC jobs, right?
Why not renaming cdc_source_job
to has_streaming_job
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has_streaming_job
is misleading, too. I think we should mention source to indicate what it controls happens when create source
, eg. source_has_streaming_job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the field name could be breaking under SQL meta backend. 😐 Not sure why it's not linted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not renaming cdc_source_job to has_streaming_job?
This is indeed the change I made. Number 13
is renamed.
Changing the field name could be breaking under SQL meta backend.
Indeed. 😕 Need to come up with another plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you already have one? Keeping it or introducing another one dedicated to this case both LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New plan:
- keep
cdc_source_job
. New & old CDC sources will both set this field. This field is only used to setrequire_singleton
- Change
is_shared
tooptional
(explicit presence). Old CDC sources will have this field set toNone
. New CDC sources will set toSome(true)
.- added method
is_shared_compatibile
to testcdc_source_job
first. TBH, this is a little awkward. Better ideas are welcome. - Strictly speaking, it's not necessary to use
optional
,(cdc_source_job, is_shared) = (true,false)
can also work. But I feel this can make it (just slightly) less error-prone.
- added method
Old plan:
- Rename
cdc_source_job
tois_shared
. This is more elegant, but breaks JSON compatibility for SQL meta backend. - Added
is_distributed
to setrequire_singleton
in fragmenter for CDC source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename
cdc_source_job
tois_shared
. This is more elegant, but breaks JSON compatibility for SQL meta backend.
Maybe off-topic... I think storing protobuf into JSON sounds bad.
https://protobuf.dev/programming-guides/dos-donts/#text-format-interchange
proto/catalog.proto
Outdated
// Only used when `has_streaming_job` is `true`. | ||
// If `false`, `requires_singleton` will be set in the stream plan. | ||
bool is_distributed = 15; | ||
reserved "cdc_source_job"; // deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has_streaming_job
is misleading, too. I think we should mention source to indicate what it controls happens when create source
, eg. source_has_streaming_job
src/frontend/src/optimizer/plan_node/logical_source_backfill.rs
Outdated
Show resolved
Hide resolved
src/frontend/src/optimizer/plan_node/logical_source_backfill.rs
Outdated
Show resolved
Hide resolved
Had an offline discussion with @xxchan, if we enable the useable source, there will be real actors running and keep fetching data even if no subsquent MVs. Should have an optim to pause the source in this case to save resources, it is ok to impl it in next pr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest of meta part LGTM. Can we run single node test in this PR? I'm not sure if it could fail it.
proto/catalog.proto
Outdated
// Only used when `has_streaming_job` is `true`. | ||
// If `false`, `requires_singleton` will be set in the stream plan. | ||
bool is_distributed = 15; | ||
reserved "cdc_source_job"; // deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the field name could be breaking under SQL meta backend. 😐 Not sure why it's not linted.
@@ -294,6 +294,7 @@ fn build_fragment( | |||
} | |||
|
|||
NodeBody::StreamCdcScan(_) => { | |||
// XXX: Should we use a different flag for CDC scan? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on whether we find the need to distinguish between them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need to distinguish them. See the awkward code in tracking_progress_actor_ids
. But I'm worried about compatibility so don't want to change it.
a299d85
to
eb64efc
Compare
eb64efc
to
17e3779
Compare
Changes since last review (It should be OK to review commits since last review)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meta part LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// XXX: do we need to include partition and offset cols here? It's needed by Backfill's input, but maybe not output? | ||
// But the source's "schema" contains the hidden columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is by design, so that
- users can optionally use these columns in their batch query.
- users can optionally use these columns in their streaming query if they specify
include
in source DDL.
@@ -1313,12 +1313,20 @@ pub async fn handle_create_source( | |||
let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; | |||
|
|||
let create_cdc_source_job = with_properties.is_backfillable_cdc_connector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you renamed is_backfillable_cdc_connector
to is_shared_cdc_connector
but reverted. 😄 Personally, I will +1 for this renaming to reduce our terminology.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I reverted it by mistake when resolving conflicts 🤡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, it does have a reason for not using "shared". #15635 (comment)
Maybe we can call it "shareable" instead of "backfillable"
proto/catalog.proto
Outdated
// - Direct CDC sources (mysql & postgresql). Backwards compat note: For old CDC job it's `None`; For new CDC job it's `Some(true)`. | ||
// - MQ sources (Kafka, Pulsar, Kinesis, etc.) | ||
// | ||
// **Should also test `cdc_source_job` for backwards compatibility. Use `is_shared_compatible()` instead of this field directly.** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks error-prone to me. I would rather just reuse cdc_source_job
even without renaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄 Sure that's acceptable to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had better reuse and rename 😄
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Frontend/meta part of #14172 risingwavelabs/rfcs#72
Tracking issue #16003
How it works
enable_shared_source
to gate the featureplanner:
Logical/StreamSourceBackfill
. They are created when planning MV on source.When
to_stream_prost
, it becomesmerge -> backfill
CREATE SOURCE
, create stream job (with one single nodeLogicalSource
)fragmenter:
SourceBackfill
meta/source manager:
meta/build actor graph:
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.