Skip to content

Commit

Permalink
fix(kvclient): fix the problem that region merging would block the pr…
Browse files Browse the repository at this point in the history
…ogress of resolveLock.

close  pingcap#3061
  • Loading branch information
maxshuang authored and ti-chi-bot committed Oct 19, 2021
1 parent e123b03 commit d2be754
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
32 changes: 21 additions & 11 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,11 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
rts.ts.eventTime = time.Now()
w.rtsManager.Upsert(rts)
}
w.rtsManager.Upsert(rts)

continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
Expand All @@ -387,9 +388,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
continue
}
} else {
rts.ts.resolvedTs = lastResolvedTs
w.rtsManager.Upsert(rts)
}
rts.ts.resolvedTs = lastResolvedTs
w.rtsManager.Upsert(rts)
}
}
}
Expand Down Expand Up @@ -743,14 +745,29 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()

// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
// (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress;
// (2) if it is normal one, update rtsManager and check sinceLastResolvedTs
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
default:
}

if resolvedTs < state.lastResolvedTs {
log.Warn("The resolvedTs is fallen back in kvclient",
zap.String("Event Type", "RESOLVED"),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("lastResolvedTs", state.lastResolvedTs),
zap.Uint64("regionID", regionID))

return nil
}

state.lastResolvedTs = resolvedTs

// emit a checkpointTs
revent := model.RegionFeedEvent{
RegionID: regionID,
Expand All @@ -759,13 +776,6 @@ func (w *regionWorker) handleResolvedTs(
ResolvedTs: resolvedTs,
},
}
state.lastResolvedTs = resolvedTs
// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
default:
}

select {
case w.outputCh <- revent:
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func newRegionTsManager() *regionTsManager {
// Upsert implements insert and update on duplicated key
func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
// in a single resolved ts manager, we use the fallback resolved ts item to increase penalty
if !item.ts.sortByEvTime {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (
ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"))
ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"))
ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError"))
ErrMySQLInvalidConfig = errors.Normalize("MySQL config invaldi", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig"))
ErrMySQLInvalidConfig = errors.Normalize("MySQL config invalid", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig"))
ErrMySQLWorkerPanic = errors.Normalize("MySQL worker panic", errors.RFCCodeText("CDC:ErrMySQLWorkerPanic"))
ErrAvroToEnvelopeError = errors.Normalize("to envelope failed", errors.RFCCodeText("CDC:ErrAvroToEnvelopeError"))
ErrAvroUnknownType = errors.Normalize("unknown type for Avro: %v", errors.RFCCodeText("CDC:ErrAvroUnknownType"))
Expand Down

0 comments on commit d2be754

Please sign in to comment.