From 40fbebd84d107f25a02028174e0887c4d7f954eb Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 4 Dec 2023 17:05:55 +0800 Subject: [PATCH] This is an automated cherry-pick of #10237 Signed-off-by: ti-chi-bot --- cdc/kv/client.go | 4 +- cdc/kv/region_worker.go | 7 +- cdc/kv/region_worker_test.go | 4 +- cdc/kv/sharedconn/conn_and_client_test.go | 236 ++++++++++++++++++++++ 4 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 cdc/kv/sharedconn/conn_and_client_test.go diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5b78e727006..781c12d8fb8 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1011,7 +1011,7 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(parentCtx, s.changefeed, s, addr) + worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions) defer worker.evictAllRegions() ctx, cancel := context.WithCancel(parentCtx) @@ -1057,7 +1057,7 @@ func (s *eventFeedSession) receiveFromStream( }) if err != nil { if status.Code(errors.Cause(err)) == codes.Canceled { - log.Debug( + log.Info( "receive from stream canceled", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 60627768450..e7a4b24eea1 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -112,6 +112,8 @@ type regionWorker struct { // how many pending input events inputPending int32 + + pendingRegions *syncRegionFeedStateMap } func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { @@ -142,6 +144,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric func newRegionWorker( ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, + pendingRegions *syncRegionFeedStateMap, ) *regionWorker { return ®ionWorker{ parentCtx: ctx, @@ -156,6 +159,8 @@ func newRegionWorker( concurrency: s.client.config.KVClient.WorkerConcurrent, metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, + + pendingRegions: pendingRegions, } } @@ -191,7 +196,7 @@ func (w *regionWorker) checkShouldExit() error { empty := w.checkRegionStateEmpty() // If there is no region maintained by this region worker, exit it and // cancel the gRPC stream. - if empty { + if empty && w.pendingRegions.len() == 0 { w.cancelStream(time.Duration(0)) return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index b88875aea6c..c14c00e8fe9 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { &tikv.RPCContext{}), 0) state.sri.lockedRange = ®ionspan.LockedRange{} state.start() - worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -323,7 +323,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1.sri.lockedRange = ®ionspan.LockedRange{} s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") + w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap()) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, diff --git a/cdc/kv/sharedconn/conn_and_client_test.go b/cdc/kv/sharedconn/conn_and_client_test.go new file mode 100644 index 00000000000..797eb095601 --- /dev/null +++ b/cdc/kv/sharedconn/conn_and_client_test.go @@ -0,0 +1,236 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sharedconn + +import ( + "context" + "net" + "sync" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/tiflow/pkg/security" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + grpcstatus "google.golang.org/grpc/status" +) + +func TestConnAndClientPool(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + pool := newConnAndClientPool(&security.Credential{}, nil, 2) + cc1, err := pool.Connect(context.Background(), addr) + require.Nil(t, err) + require.NotNil(t, cc1) + require.Equal(t, 1, len(cc1.array.conns)) + require.Equal(t, 1, cc1.conn.streams) + require.False(t, cc1.Multiplexing()) + + cc2, err := pool.Connect(context.Background(), addr) + require.Nil(t, err) + require.NotNil(t, cc2) + require.Equal(t, 1, len(cc2.array.conns)) + require.Equal(t, 2, cc2.conn.streams) + require.False(t, cc2.Multiplexing()) + + cc3, err := pool.Connect(context.Background(), addr) + require.Nil(t, err) + require.NotNil(t, cc3) + require.Equal(t, 2, len(cc3.array.conns)) + require.Equal(t, 1, cc3.conn.streams) + require.False(t, cc3.Multiplexing()) + + cc1.Release() + cc1.Release() + cc2.Release() + require.Equal(t, 1, len(cc3.array.conns)) + require.Equal(t, 1, cc3.conn.streams) + + cc3.Release() + require.Equal(t, 0, len(pool.stores)) +} + +func TestConnAndClientPoolForV2(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{v2: true}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + pool := newConnAndClientPool(&security.Credential{}, nil, 2) + cc1, err := pool.Connect(context.Background(), addr) + require.Nil(t, err) + require.NotNil(t, cc1) + require.True(t, cc1.Multiplexing()) + + cc1.Release() + require.Equal(t, 0, len(pool.stores)) +} + +func TestConnectToUnavailable(t *testing.T) { + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + + targets := []string{"127.0.0.1:9999", "2.2.2.2:9999"} + for _, target := range targets { + ctx := context.Background() + conn, err := pool.connect(ctx, target) + require.NotNil(t, conn) + require.Nil(t, err) + + rpc := cdcpb.NewChangeDataClient(conn) + _, err = rpc.EventFeedV2(ctx) + require.NotNil(t, err) + + require.Nil(t, conn.Close()) + } + + service := make(chan *grpc.Server, 1) + var addr string + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + conn, err := pool.connect(context.Background(), addr) + require.NotNil(t, conn) + require.Nil(t, err) + + rpc := cdcpb.NewChangeDataClient(conn) + client, err := rpc.EventFeedV2(context.Background()) + require.Nil(t, err) + _ = client.CloseSend() + + _, err = client.Recv() + require.Equal(t, codes.Unimplemented, status.Code(err)) + + require.Nil(t, conn.Close()) +} + +func TestCancelStream(t *testing.T) { + service := make(chan *grpc.Server, 1) + var addr string + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + require.Nil(t, runGrpcService(&srv{}, &addr, service)) + }() + + svc := <-service + require.NotNil(t, svc) + defer svc.GracefulStop() + + connCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() + + pool := newConnAndClientPool(&security.Credential{}, nil, 1) + conn, err := pool.connect(connCtx, addr) + require.NotNil(t, conn) + require.Nil(t, err) + + rpcCtx, rpcCancel := context.WithCancel(context.Background()) + rpc := cdcpb.NewChangeDataClient(conn) + client, err := rpc.EventFeed(rpcCtx) + require.Nil(t, err) + + rpcCancel() + _, err = client.Recv() + require.Equal(t, grpccodes.Canceled, grpcstatus.Code(err)) + require.Nil(t, conn.Close()) +} + +func runGrpcService(srv cdcpb.ChangeDataServer, addr *string, service chan<- *grpc.Server) error { + defer close(service) + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return err + } + defer lis.Close() + + kaep := keepalive.EnforcementPolicy{ + MinTime: 3 * time.Second, + PermitWithoutStream: true, + } + kasp := keepalive.ServerParameters{ + MaxConnectionIdle: 10 * time.Second, + MaxConnectionAge: 10 * time.Second, + MaxConnectionAgeGrace: 5 * time.Second, + Time: 3 * time.Second, + Timeout: 1 * time.Second, + } + grpcServer := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) + cdcpb.RegisterChangeDataServer(grpcServer, srv) + *addr = lis.Addr().String() + service <- grpcServer + return grpcServer.Serve(lis) +} + +type srv struct { + v2 bool +} + +func (s *srv) EventFeed(server cdcpb.ChangeData_EventFeedServer) error { + for { + if _, err := server.Recv(); err != nil { + return err + } + } +} + +func (s *srv) EventFeedV2(server cdcpb.ChangeData_EventFeedV2Server) error { + if !s.v2 { + return grpcstatus.Error(grpccodes.Unimplemented, "srv") + } + for { + if _, err := server.Recv(); err != nil { + return err + } + } +}