diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index c498c726cb0..b92d06dff07 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -235,7 +235,7 @@ func (w *regionWorker) checkShouldExit() error { return nil } -func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, state *regionFeedState) error { +func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { if state.lastResolvedTs > state.sri.ts { state.sri.ts = state.lastResolvedTs } @@ -273,7 +273,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s } revokeToken := !state.initialized - err2 := w.session.onRegionFail(ctx, regionErrorInfo{ + // since the context used in region worker will be cancelled after region + // worker exits, we must use the parent context to prevent regionErrorInfo loss. + err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{ singleRegionInfo: state.sri, err: err, }, revokeToken) @@ -388,26 +390,25 @@ func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEv case *cdcpb.Event_Entries_: err = w.handleEventEntry(ctx, x, event.state) if err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) + err = w.handleSingleRegionError(err, event.state) } case *cdcpb.Event_Admin_: log.Info("receive admin event", zap.Stringer("event", event.changeEvent)) case *cdcpb.Event_Error: err = w.handleSingleRegionError( - ctx, cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}), event.state, ) case *cdcpb.Event_ResolvedTs: if err = w.handleResolvedTs(ctx, x.ResolvedTs, event.state); err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) + err = w.handleSingleRegionError(err, event.state) } } } if event.resolvedTs != nil { if err = w.handleResolvedTs(ctx, event.resolvedTs.Ts, event.state); err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) + err = w.handleSingleRegionError(err, event.state) } } event.state.lock.Unlock()