Skip to content

Commit

Permalink
kv/client: fix region loss in single region handler (#3281)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Nov 5, 2021
1 parent 37bac66 commit 2a60472
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (w *regionWorker) checkShouldExit() error {
return nil
}

func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, state *regionFeedState) error {
func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error {
if state.lastResolvedTs > state.sri.ts {
state.sri.ts = state.lastResolvedTs
}
Expand Down Expand Up @@ -273,7 +273,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

revokeToken := !state.initialized
err2 := w.session.onRegionFail(ctx, regionErrorInfo{
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
Expand Down Expand Up @@ -388,26 +390,25 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv
case *cdcpb.Event_Entries_:
err = w.handleEventEntry(ctx, x, event.state)
if err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
case *cdcpb.Event_Admin_:
log.Info("receive admin event", zap.Stringer("event", event.changeEvent))
case *cdcpb.Event_Error:
err = w.handleSingleRegionError(
ctx,
cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}),
event.state,
)
case *cdcpb.Event_ResolvedTs:
if err = w.handleResolvedTs(ctx, x.ResolvedTs, event.state); err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
}
}

if event.resolvedTs != nil {
if err = w.handleResolvedTs(ctx, event.resolvedTs.Ts, event.state); err != nil {
err = w.handleSingleRegionError(ctx, err, event.state)
err = w.handleSingleRegionError(err, event.state)
}
}
event.state.lock.Unlock()
Expand Down

0 comments on commit 2a60472

Please sign in to comment.