-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mysql] Snapshot split the chunks asynchronously (ververica#915) #931
[mysql] Snapshot split the chunks asynchronously (ververica#915) #931
Conversation
executor.submit( | ||
() -> { | ||
while (remainingTables.size() > 0) { | ||
TableId nextTable = remainingTables.pollFirst(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How we make sure the table splits generation has finished if failover happened ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comments. I have revised the code again.
2058b7c
to
73e8520
Compare
c375bdf
to
00b97c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fuyun2024 for the contribution, could you also add some unit tests to cover the change?
@@ -180,27 +191,60 @@ private void captureNewlyAddedTables() { | |||
} | |||
} | |||
|
|||
private void asynchronouslySplitIfNeed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void asynchronouslySplitIfNeed() { | |
private void startaAsynchronouslySplit() { |
if (!remainingTables.isEmpty()) { | ||
if (executor == null) { | ||
ThreadFactory threadFactory = | ||
new ThreadFactoryBuilder().setNameFormat("snapshot-split").build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new ThreadFactoryBuilder().setNameFormat("snapshot-split").build(); | |
new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build(); |
} | ||
return getNext(); | ||
} else { | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we release the executor resource after splitting finished ?
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please throw FlinkRuntimeException
with a detail error message.
return getNext(); | ||
} else { | ||
return Optional.empty(); | ||
} else if (!remainingTables.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition that remainingTables
is empty but the asynchronous thread just added the snapshot splits of last table to remainingSplits
, and then the last table will be missed.
f8d9e5b
to
0af1600
Compare
ec6c681
to
c8836ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fuyun2024 for the contribution, LGTM
No description provided.