-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler,processor(ticdc): update capture liveness by heartbeat #6613
scheduler,processor(ticdc): update capture liveness by heartbeat #6613
Conversation
Signed-off-by: Neil Shen <overvenus@gmail.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
func (a *agent) handleLivenessUpdate(liveness model.Liveness) { | ||
currentLiveness := a.liveness.Load() | ||
if currentLiveness != liveness { | ||
if currentLiveness == model.LivenessCaptureStopping { | ||
// No way to go back, once it becomes shutting down, | ||
return | ||
} | ||
log.Info("schedulerv3: agent liveness changed", | ||
zap.String("old", a.liveness.String()), | ||
zap.String("new", liveness.String()), | ||
zap.String("source", source)) | ||
a.liveness = liveness | ||
log.Info("schedulerv3: agent updates liveness", | ||
zap.String("old", currentLiveness.String()), | ||
zap.String("new", liveness.String())) | ||
a.liveness.Store(liveness) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a data race? For example, a.liveness
is changed between the Load()
and Store()
calls by another thread?
@@ -77,7 +77,7 @@ type processor struct { | |||
|
|||
lazyInit func(ctx cdcContext.Context) error | |||
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (pipeline.TablePipeline, error) | |||
newAgent func(ctx cdcContext.Context) (scheduler.Agent, error) | |||
newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you consider making an AgentFactory
interface? So that we can put whatever extra arguments we need into the implementation of an AgentFactory
, so that further changes will not affect how the interface method NewAgent
is called.
Just a suggestion. If you think this part of code is not subject to anymore significant changes, I think the current way is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it will change anytime soon, let's add the factory when necessary.
Signed-off-by: Neil Shen <overvenus@gmail.com>
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #6613 +/- ##
================================================
+ Coverage 59.2138% 59.2508% +0.0370%
================================================
Files 780 780
Lines 88405 88505 +100
================================================
+ Hits 52348 52440 +92
- Misses 31415 31422 +7
- Partials 4642 4643 +1 |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 2441c06
|
What problem does this PR solve?
Issue Number: close #4757
What is changed and how it works?
Set liveness to stopping when it is drained by DrainCapture API.
It prevents unnecessary owner switch during rolling upgrade.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note