From 37c711ac4011420f7a00a2bab964a00c02f46c3a Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 4 Dec 2023 13:42:34 +0800 Subject: [PATCH 1/4] init Signed-off-by: qupeng --- cdc/kv/sharedconn/conn_and_client_test.go | 34 +++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/cdc/kv/sharedconn/conn_and_client_test.go b/cdc/kv/sharedconn/conn_and_client_test.go index 991ddbb61cc..797eb095601 100644 --- a/cdc/kv/sharedconn/conn_and_client_test.go +++ b/cdc/kv/sharedconn/conn_and_client_test.go @@ -152,6 +152,40 @@ func TestConnectToUnavailable(t *testing.T) { require.Nil(t, conn.Close()) } +func TestCancelStream(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + connCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() + + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + conn, err := pool.connect(connCtx, addr) + require.NotNil(t, conn) + require.Nil(t, err) + + rpcCtx, rpcCancel := context.WithCancel(context.Background()) + rpc := cdcpb.NewChangeDataClient(conn) + client, err := rpc.EventFeed(rpcCtx) + require.Nil(t, err) + + rpcCancel() + _, err = client.Recv() + require.Equal(t, grpccodes.Canceled, grpcstatus.Code(err)) + require.Nil(t, conn.Close()) +} + func runGrpcService(srv cdcpb.ChangeDataServer, addr *string, service chan<- *grpc.Server) error { defer close(service) lis, err := net.Listen("tcp", "127.0.0.1:0") From 83f4e90d7e0cba306aa8c4eb2decf9b3148f887d Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 4 Dec 2023 14:29:17 +0800 Subject: [PATCH 2/4] fix Signed-off-by: qupeng --- cdc/kv/client.go | 2 +- cdc/kv/region_worker.go | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0b8aa86e7a9..265b63c4be8 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1062,7 +1062,7 @@ func (s *eventFeedSession) receiveFromStream( }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { - log.Debug( + log.Info( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 32cc523c90f..febd42ccfbb 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -193,13 +193,6 @@ func (w *regionWorker) checkRegionStateEmpty() (empty bool) { // checkShouldExit checks whether the region worker should exit, if exit return an error func (w *regionWorker) checkShouldExit() error { - empty := w.checkRegionStateEmpty() - // If there is no region maintained by this region worker, exit it and - // cancel the gRPC stream. - if empty { - w.cancelStream(time.Duration(0)) - return cerror.ErrRegionWorkerExit.GenWithStackByArgs() - } return nil } From e05fad6218c1e64181eb1e61b67645273386ae58 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 4 Dec 2023 14:46:34 +0800 Subject: [PATCH 3/4] fix Signed-off-by: qupeng --- cdc/kv/client.go | 2 +- cdc/kv/region_worker.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 265b63c4be8..fe4fde0c377 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1016,7 +1016,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr) + worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index febd42ccfbb..01d425558de 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -114,6 +114,8 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -147,6 +149,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric func newRegionWorker( ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ parentCtx: ctx, @@ -161,6 +164,8 @@ func newRegionWorker( concurrency: int(s.client.config.KVClient.WorkerConcurrent), metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -193,6 +198,13 @@ func (w *regionWorker) checkRegionStateEmpty() (empty bool) { // checkShouldExit checks whether the region worker should exit, if exit return an error func (w *regionWorker) checkShouldExit() error { + empty := w.checkRegionStateEmpty() + // If there is no region maintained by this region worker, exit it and + // cancel the gRPC stream. + if empty && w.pendingRegions.len() == 0 { + w.cancelStream(time.Duration(0)) + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } return nil } From 7fe7aae3de84ace60e582a1f6ae55902184b2150 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 4 Dec 2023 14:49:54 +0800 Subject: [PATCH 4/4] fix tests Signed-off-by: qupeng --- cdc/kv/region_worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 9438d518651..c8041246a96 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -322,7 +322,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionlock.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5,