diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0b8aa86e7a9..fe4fde0c377 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1016,7 +1016,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr) + worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1062,7 +1062,7 @@ func (s *eventFeedSession) receiveFromStream( }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { - log.Debug( + log.Info( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 32cc523c90f..01d425558de 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -114,6 +114,8 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -147,6 +149,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric func newRegionWorker( ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ parentCtx: ctx, @@ -161,6 +164,8 @@ func newRegionWorker( concurrency: int(s.client.config.KVClient.WorkerConcurrent), metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -196,7 +201,7 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty { + if empty && w.pendingRegions.len() == 0 { w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 9438d518651..c8041246a96 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -322,7 +322,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, diff --git a/cdc/kv/sharedconn/conn_and_client_test.go b/cdc/kv/sharedconn/conn_and_client_test.go index 991ddbb61cc..797eb095601 100644 --- a/cdc/kv/sharedconn/conn_and_client_test.go +++ b/cdc/kv/sharedconn/conn_and_client_test.go @@ -152,6 +152,40 @@ func TestConnectToUnavailable(t *testing.T) { require.Nil(t, conn.Close()) } +func TestCancelStream(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + connCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() + + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + conn, err := pool.connect(connCtx, addr) + require.NotNil(t, conn) + require.Nil(t, err) + + rpcCtx, rpcCancel := context.WithCancel(context.Background()) + rpc := cdcpb.NewChangeDataClient(conn) + client, err := rpc.EventFeed(rpcCtx) + require.Nil(t, err) + + rpcCancel() + _, err = client.Recv() + require.Equal(t, grpccodes.Canceled, grpcstatus.Code(err)) + require.Nil(t, conn.Close()) +} + func runGrpcService(srv cdcpb.ChangeDataServer, addr *string, service chan<- *grpc.Server) error { defer close(service) lis, err := net.Listen("tcp", "127.0.0.1:0")