-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36690][runtime] Fix schema operator hanging under extreme parallelized pressure #3680
base: master
Are you sure you want to change the base?
[FLINK-36690][runtime] Fix schema operator hanging under extreme parallelized pressure #3680
Conversation
Need more eyes on this PR since this PR tweaks schema evolution communication process. cc @leonardBang @ruanhang1993 |
7da4d15
to
ecd20b1
Compare
Based on previous discussions, I've made the following changes:
As we're not going to merge it into 3.2.1, I would propose downgrading parallelized E2e cases to single-parallelism ones (#3718) to avoid breaking our CI pipeline. |
Thanks @yuxiqian for the contribution, it's great that you've added the flow picture to make the PR easy to catch. |
@leonardBang Will this PR be reviewed soon? I'm planning to implement FLINK-36763 based on this. |
Added to my todo list |
ecd20b1
to
9a17bb2
Compare
…llelized pressure Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
9a17bb2
to
3109474
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work! Left some minor comments.
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Outdated
Show resolved
Hide resolved
...mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java
Show resolved
Hide resolved
.../org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
Outdated
Show resolved
Hide resolved
.../org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
Outdated
Show resolved
Hide resolved
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Thanks for @Shawn-Hx's review! Addressed your comments. Sorry to make your review more painful, but considering FLINK-36763 is expected to modify current codebase anyway, I've baked changes in this PR into #3801. Looking forward to your comments on that, too. |
This closes FLINK-36690 by fixing schema operator hanging glitch under extreme parallelized pressure.
After FLINK-36114,
SchemaOperator
s will ask for schema evolve permission first, before sendingFlushEvent
s to downstream sinks.However, Flink regards
FlushEvent
s as normal data records and might block it to align checkpoint barriers. It might cause the following deadlock situation:SchemaOperator
A has obtained schema evolution permissionSchemaOperator
B does not get the permission and hangsSchemaOperator
A sends aFlushEvent
, but after a checkpoint barrierSchemaOperator
B received a checkpoint barrier after the schema change event (which is blocked)Now, neither A nor B can post any event records to downstream, and the entire job blocks with the following iconic error message (in TM):
This PR changes the schema evolution permission requesting workflow by:
SchemaOperator
s emitFlushEvent
immediately when they received aSchemaChangeEvent
.SchemaOperator
has finished data flushing already.Since
FlushEvent
might be emitted from multipleSchemaOperator
s simultaneously, a nonce value that is uniquely bound to a schema change event is added intoFlushEvent
payload.WAITING_FOR_FLUSH
stage is no longer necessary since this state will not block theSchemaRegistry
but one singleSchemaOperator
now.It should be noted that current schema evolution design implicitly assumes that for each table, it won't be schema evolving in subTask A and emits normal data change events without blocking in subTask B at the same time.
So, after a
SchemaOperator
successfully triggered a flush event, there can't be any more uncommitted dirty data got written down since 1) any following data from this subTask is still being blocked and 2) other subTask can't carry any data belonging to this tableId (according to our previous guarantee).