-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler(cdc): add ProcessorEpoch #4768
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-leak-tests |
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #4768 +/- ##
================================================
- Coverage 55.6402% 55.4682% -0.1720%
================================================
Files 494 521 +27
Lines 61283 64289 +3006
================================================
+ Hits 34098 35660 +1562
- Misses 23750 25109 +1359
- Partials 3435 3520 +85 |
/run-leak-tests |
/run-leak-tests |
/run-leak-tests |
…nto scheduler-processor-epoch
/run-leak-tests |
@@ -272,6 +289,7 @@ func (a *BaseAgent) processOperations(ctx context.Context) error { | |||
for tableID, op := range a.tableOperations { | |||
switch op.status { | |||
case operationReceived: | |||
a.logger.Info("Agent start processing operation", zap.Any("op", op)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it acceptable that the operation related log is O(#table) scale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this log, it would be difficult to trace scheduling problems.
a.epochMu.Lock() | ||
defer a.epochMu.Unlock() | ||
|
||
a.epoch = uuid.New().String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only need the unique constraint of this epoch, and don't need the serialization guarantee. Should we add comment about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 7c093be
|
/run-verify |
/run-integration-tests |
1 similar comment
/run-integration-tests |
/run-integration-tests |
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
In response to a cherrypick label: new pull request created: #4789. |
What problem does this PR solve?
Issue Number: close #4769
What is changed and how it works?
epoch
inSync
andDispatchTable
messages, so that outdated dispatches will be ignored by the processor.Check List
Tests
Restarts all TiCDC nodes at once with a latency of 2000ms injected between each pair of them. The changefeed did not pause and report error, but recovered quickly.
Side effects
Related changes
Release note