From 0fa1910220a4d193018f04eceb8e01ad8fad6053 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 19 Sep 2023 16:25:43 +0800 Subject: [PATCH 1/6] remove backoff and add rate limiter for newStream func --- cdc/kv/client.go | 38 ++++++++----- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 54 +++++++++---------- cdc/kv/region_worker.go | 2 +- cdc/kv/shared_client.go | 10 ++-- cdc/processor/processor.go | 4 +- cdc/processor/sourcemanager/manager.go | 6 +-- .../sourcemanager/puller/puller_wrapper.go | 2 +- cdc/puller/ddl_puller.go | 6 +-- cdc/puller/puller.go | 2 +- cdc/puller/puller_test.go | 4 +- pkg/config/config_test_data.go | 3 +- pkg/config/debug.go | 3 ++ pkg/config/server_config.go | 3 +- 14 files changed, 80 insertions(+), 61 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 596ba43cce2..09339cf0631 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -46,6 +46,7 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -156,7 +157,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client - config *config.KVClientConfig + config *config.ServerConfig clusterID uint64 grpcPool GrpcPool @@ -177,6 +178,8 @@ type CDCClient struct { // filterLoop is used in BDR mode, when it is true, tikv cdc component // will filter data that are written by another TiCDC. filterLoop bool + + ratelimiterMap sync.Map } type tableStoreStat struct { @@ -192,7 +195,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -208,17 +211,18 @@ func NewCDCClient( regionCache: regionCache, pdClock: pdClock, - changefeed: changefeed, - tableID: tableID, - tableName: tableName, - filterLoop: filterLoop, + changefeed: changefeed, + tableID: tableID, + tableName: tableName, + filterLoop: filterLoop, + ratelimiterMap: sync.Map{}, } c.tableStoreStats.v = make(map[string]*tableStoreStat) return c } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { - newStreamErr = retry.Do(ctx, func() (err error) { + streamFunc := func() (err error) { var conn *sharedConn defer func() { if err != nil && conn != nil { @@ -248,10 +252,20 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) + } + if c.config.Debug.EnableKVConnectBackOff { + newStreamErr = retry.Do(ctx, streamFunc, + retry.WithBackoffBaseDelay(100), + retry.WithMaxTries(2), + retry.WithIsRetryableErr(cerror.IsRetryableError), + ) + } + limit, _ := c.ratelimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) + if !limit.(*rate.Limiter).Allow() { + newStreamErr = errors.Errorf("rate limit exceed, addr: %s", addr) + return + } + newStreamErr = streamFunc() return } @@ -842,7 +856,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } return nil }, retry.WithBackoffMaxDelay(500), - retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration))) if retryErr != nil { log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 6aedabd6d2c..668677a4f96 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -199,7 +199,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -293,7 +293,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index e79a54ca522..63338367802 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) + config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -324,7 +324,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -426,7 +426,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -526,7 +526,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -685,7 +685,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -752,7 +752,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -821,7 +821,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1282,7 +1282,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1394,7 +1394,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1527,7 +1527,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1736,7 +1736,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1813,7 +1813,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1892,7 +1892,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2006,7 +2006,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2111,7 +2111,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2239,7 +2239,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2426,7 +2426,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2644,7 +2644,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2740,7 +2740,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2823,7 +2823,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2891,7 +2891,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2969,7 +2969,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3120,7 +3120,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3237,7 +3237,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3363,7 +3363,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3455,7 +3455,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3537,7 +3537,7 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( - &CDCClient{config: config.GetDefaultServerConfig().KVClient}, + &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 165230350dd..2b9b927ec14 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -157,7 +157,7 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, - concurrency: int(s.client.config.WorkerConcurrent), + concurrency: int(s.client.config.KVClient.WorkerConcurrent), metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 9b08d5c07d3..39a0b226f91 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -61,7 +61,7 @@ type MultiplexingEvent struct { // SharedClient is shared in many tables. Methods are thread-safe. type SharedClient struct { changefeed model.ChangeFeedID - config *config.KVClientConfig + config *config.ServerConfig metrics sharedClientMetrics clusterID uint64 @@ -150,7 +150,7 @@ type requestedTable struct { // NewSharedClient creates a client. func NewSharedClient( changefeed model.ChangeFeedID, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, filterLoop bool, pd pd.Client, grpcPool GrpcPool, @@ -254,8 +254,8 @@ func (s *SharedClient) Run(ctx context.Context) error { s.clusterID = s.pd.GetClusterID(ctx) g, ctx := errgroup.WithContext(ctx) - s.workers = make([]*sharedRegionWorker, 0, s.config.WorkerConcurrent) - for i := uint(0); i < s.config.WorkerConcurrent; i++ { + s.workers = make([]*sharedRegionWorker, 0, s.config.KVClient.WorkerConcurrent) + for i := uint(0); i < s.config.KVClient.WorkerConcurrent; i++ { worker := newSharedRegionWorker(s) g.Go(func() error { return worker.run(ctx) }) s.workers = append(s.workers, worker) @@ -411,7 +411,7 @@ func (s *SharedClient) requestStore( rs = &requestedStore{storeID: storeID, storeAddr: storeAddr} s.requestedStores[storeAddr] = rs - for i := uint(0); i < s.config.GrpcStreamConcurrent; i++ { + for i := uint(0); i < s.config.KVClient.GrpcStreamConcurrent; i++ { stream := s.newStream(ctx, g, rs) rs.streams = append(rs.streams, stream) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index d27155f54d9..504a629237f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -789,7 +789,7 @@ func (p *processor) initDDLHandler(ctx context.Context) error { return errors.Trace(err) } - kvCfg := config.GetGlobalServerConfig().KVClient + serverCfg := config.GetGlobalServerConfig() ddlPuller, err := puller.NewDDLJobPuller( ctx, p.upstream.PDClient, @@ -798,7 +798,7 @@ func (p *processor) initDDLHandler(ctx context.Context) error { p.upstream.KVStorage, p.upstream.PDClock, ddlStartTs, - kvCfg, + serverCfg, p.changefeedID, schemaStorage, f, diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 1870057feeb..fb912452f9f 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -198,15 +198,15 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { if m.multiplexing { - clientConfig := config.GetGlobalServerConfig().KVClient + serverConfig := config.GetGlobalServerConfig() client := kv.NewSharedClient( - m.changefeedID, clientConfig, m.bdrMode, + m.changefeedID, serverConfig, m.bdrMode, m.up.PDClient, m.up.GrpcPool, m.up.RegionCache, m.up.PDClock, txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID), ) m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper( m.changefeedID, client, m.engine, - int(clientConfig.FrontierConcurrent), + int(serverConfig.KVClient.FrontierConcurrent), ) close(m.ready) diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 9f8070bde61..06749fab252 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -103,7 +103,7 @@ func (n *WrapperImpl) Start( up.PDClock, n.startTs, []tablepb.Span{n.span}, - config.GetGlobalServerConfig().KVClient, + config.GetGlobalServerConfig(), n.changefeed, n.span.TableID, n.tableName, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 3b8505c0f51..50ceb3df072 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -566,7 +566,7 @@ func NewDDLJobPuller( kvStorage tidbkv.Storage, pdClock pdutil.Clock, checkpointTs uint64, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, schemaStorage entry.SchemaStorage, filter filter.Filter, @@ -586,7 +586,7 @@ func NewDDLJobPuller( jobPuller := &ddlJobPullerImpl{ changefeedID: changefeed, - multiplexing: cfg.EnableMultiplexing, + multiplexing: cfg.KVClient.EnableMultiplexing, schemaStorage: schemaStorage, kvStorage: kvStorage, filter: filter, @@ -669,7 +669,7 @@ func NewDDLPuller(ctx context.Context, if up.KVStorage != nil { puller, err = NewDDLJobPuller( ctx, up.PDClient, up.GrpcPool, up.RegionCache, up.KVStorage, up.PDClock, - startTs, config.GetGlobalServerConfig().KVClient, + startTs, config.GetGlobalServerConfig(), changefeed, schemaStorage, filter, true, /* isOwner */ ) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 30a0c38d980..f2d014b426a 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -82,7 +82,7 @@ func New(ctx context.Context, pdClock pdutil.Clock, checkpointTs uint64, spans []tablepb.Span, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 652a049ea12..26dfc555325 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -64,7 +64,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -132,7 +132,7 @@ func newPullerForTest( defer regionCache.Close() plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), - checkpointTs, spans, config.GetDefaultServerConfig().KVClient, + checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) wg.Add(1) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 0a79cc042ba..4daa475e54b 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -155,7 +155,8 @@ const ( "max-task-concurrency": 10, "check-balance-interval": 60000000000, "add-table-batch-size": 50 - } + }, + "enable-kv-connect-backoff": false }, "cluster-id": "default", "max-memory-percentage": 70 diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 7e91188dafa..3a7815090bc 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -25,6 +25,9 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + + // EnableKVConnectBackOff enables the backoff for kv connect. + EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` } // ValidateAndAdjust validates and adjusts the debug configuration diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index feaff34fa03..5d36aec2c24 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -135,7 +135,8 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), + Scheduler: NewDefaultSchedulerConfig(), + EnableKVConnectBackOff: false, }, ClusterID: "default", MaxMemoryPercentage: DefaultMaxMemoryPercentage, From 9d425d760e2c9bfa5bec84377404545a5876c70a Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 19 Sep 2023 16:40:40 +0800 Subject: [PATCH 2/6] remove backoff and add rate limiter for newStream func --- cdc/kv/client.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 09339cf0631..84e9637fa53 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -179,7 +179,9 @@ type CDCClient struct { // will filter data that are written by another TiCDC. filterLoop bool - ratelimiterMap sync.Map + // rateLimiterMap is used to limit the rate of create new stream to connect to TiKV store + // return an error directly if the rate is limited, then kv client will try this region later. + rateLimiterMap sync.Map } type tableStoreStat struct { @@ -215,7 +217,7 @@ func NewCDCClient( tableID: tableID, tableName: tableName, filterLoop: filterLoop, - ratelimiterMap: sync.Map{}, + rateLimiterMap: sync.Map{}, } c.tableStoreStats.v = make(map[string]*tableStoreStat) return c @@ -260,7 +262,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) retry.WithIsRetryableErr(cerror.IsRetryableError), ) } - limit, _ := c.ratelimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) + limit, _ := c.rateLimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) if !limit.(*rate.Limiter).Allow() { newStreamErr = errors.Errorf("rate limit exceed, addr: %s", addr) return From 87503a093d57266ed1023584acdaeceba17abf9b Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 19 Sep 2023 19:19:39 +0800 Subject: [PATCH 3/6] fix ut --- cdc/kv/shared_client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index ea26364a0d5..8e4ec3e05df 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -126,7 +126,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 6) client := NewSharedClient(model.ChangeFeedID{ID: "test"}, - &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}, + &config.ServerConfig{KVClient: &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}}, false, pdClient, grpcPool, regionCache, pdClock, lockResolver) defer client.Close() From 7701a9a03cd838ae18b01dafcbf7aad4b8d32e8b Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 19 Sep 2023 23:10:30 +0800 Subject: [PATCH 4/6] fix ut --- cdc/kv/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 84e9637fa53..7414b7305c4 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -261,6 +261,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError), ) + return } limit, _ := c.rateLimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) if !limit.(*rate.Limiter).Allow() { From 13bbe9d36eed050bd29c6d7ac82989225313afa4 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Thu, 21 Sep 2023 19:49:29 +0800 Subject: [PATCH 5/6] address comment --- cdc/kv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 7414b7305c4..cb541dd0e6f 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -265,7 +265,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } limit, _ := c.rateLimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) if !limit.(*rate.Limiter).Allow() { - newStreamErr = errors.Errorf("rate limit exceed, addr: %s", addr) + newStreamErr = errors.Errorf("rate limit exceed 5 operations per second, addr: %s", addr) return } newStreamErr = streamFunc() From b4f223acc84ca466910335867c36c0280033be03 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Tue, 26 Sep 2023 00:00:21 +0800 Subject: [PATCH 6/6] fix ut --- cdc/kv/client.go | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index cb541dd0e6f..8ad91b138fe 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -46,7 +46,6 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -178,10 +177,6 @@ type CDCClient struct { // filterLoop is used in BDR mode, when it is true, tikv cdc component // will filter data that are written by another TiCDC. filterLoop bool - - // rateLimiterMap is used to limit the rate of create new stream to connect to TiKV store - // return an error directly if the rate is limited, then kv client will try this region later. - rateLimiterMap sync.Map } type tableStoreStat struct { @@ -213,11 +208,10 @@ func NewCDCClient( regionCache: regionCache, pdClock: pdClock, - changefeed: changefeed, - tableID: tableID, - tableName: tableName, - filterLoop: filterLoop, - rateLimiterMap: sync.Map{}, + changefeed: changefeed, + tableID: tableID, + tableName: tableName, + filterLoop: filterLoop, } c.tableStoreStats.v = make(map[string]*tableStoreStat) return c @@ -263,11 +257,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) ) return } - limit, _ := c.rateLimiterMap.LoadOrStore(addr, rate.NewLimiter(rate.Limit(5), 1)) - if !limit.(*rate.Limiter).Allow() { - newStreamErr = errors.Errorf("rate limit exceed 5 operations per second, addr: %s", addr) - return - } newStreamErr = streamFunc() return }