diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0ba2f18c28a..e656570ced2 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -997,8 +997,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(s.changefeed, s, addr) - + worker := newRegionWorker(s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() g.Go(func() error { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0f1e5eb63b9..08fd2a3c956 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -112,10 +112,13 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorker( changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") @@ -149,6 +152,8 @@ func newRegionWorker( concurrency: s.client.config.WorkerConcurrent, metrics: metrics, inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -184,7 +189,7 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty { + if empty && w.pendingRegions.len() == 0 { w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index fef317d4c0d..c8fcebe4a18 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -154,7 +154,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { regionspan.ToComparableSpan(span), 0, &tikv.RPCContext{}), 0) state.start() - worker := newRegionWorker(model.ChangeFeedID{}, s, "") + worker := newRegionWorker(model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -313,7 +313,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { rpcCtx: &tikv.RPCContext{}, }, 0) s1.start() - w := newRegionWorker(model.ChangeFeedID{}, s, "") + w := newRegionWorker(model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5,