From 43848f2fb5646822903baee35b62c580498284e2 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Tue, 26 Sep 2023 13:30:16 +0800 Subject: [PATCH] kv(ticdc): remove backoff from newStream func (#9771) ref pingcap/tiflow#9741 --- cdc/kv/client.go | 22 +++++--- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 54 +++++++++---------- cdc/kv/region_worker.go | 2 +- cdc/kv/shared_client.go | 10 ++-- cdc/kv/shared_client_test.go | 2 +- cdc/processor/processor.go | 4 +- cdc/processor/sourcemanager/manager.go | 6 +-- .../sourcemanager/puller/puller_wrapper.go | 2 +- cdc/puller/ddl_puller.go | 6 +-- cdc/puller/puller.go | 2 +- cdc/puller/puller_test.go | 4 +- pkg/config/config_test_data.go | 3 +- pkg/config/debug.go | 3 ++ pkg/config/server_config.go | 3 +- 15 files changed, 69 insertions(+), 58 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 29c490d85b5..f70ec5e0e97 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client - config *config.KVClientConfig + config *config.ServerConfig clusterID uint64 grpcPool GrpcPool @@ -192,7 +192,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -218,7 +218,7 @@ func NewCDCClient( } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { - newStreamErr = retry.Do(ctx, func() (err error) { + streamFunc := func() (err error) { var conn *sharedConn defer func() { if err != nil && conn != nil { @@ -248,10 +248,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) + } + if c.config.Debug.EnableKVConnectBackOff { + newStreamErr = retry.Do(ctx, streamFunc, + retry.WithBackoffBaseDelay(100), + retry.WithMaxTries(2), + retry.WithIsRetryableErr(cerror.IsRetryableError), + ) + return + } + newStreamErr = streamFunc() return } @@ -842,7 +848,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } return nil }, retry.WithBackoffMaxDelay(500), - retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration))) if retryErr != nil { log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 6aedabd6d2c..668677a4f96 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -199,7 +199,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -293,7 +293,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index b00a4b59db6..c2288e3705e 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -74,7 +74,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) + config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -326,7 +326,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -428,7 +428,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -528,7 +528,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -687,7 +687,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -754,7 +754,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -823,7 +823,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1284,7 +1284,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1396,7 +1396,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1529,7 +1529,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1738,7 +1738,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1815,7 +1815,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1894,7 +1894,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2008,7 +2008,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2113,7 +2113,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2241,7 +2241,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2428,7 +2428,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2646,7 +2646,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2742,7 +2742,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2825,7 +2825,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2893,7 +2893,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2971,7 +2971,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3122,7 +3122,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3239,7 +3239,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3365,7 +3365,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3457,7 +3457,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3539,7 +3539,7 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( - &CDCClient{config: config.GetDefaultServerConfig().KVClient}, + &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 165230350dd..2b9b927ec14 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -157,7 +157,7 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, - concurrency: int(s.client.config.WorkerConcurrent), + concurrency: int(s.client.config.KVClient.WorkerConcurrent), metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index a67f69f8701..e0e146b36fb 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -59,7 +59,7 @@ type MultiplexingEvent struct { // SharedClient is shared in many tables. Methods are thread-safe. type SharedClient struct { changefeed model.ChangeFeedID - config *config.KVClientConfig + config *config.ServerConfig metrics sharedClientMetrics clusterID uint64 @@ -130,7 +130,7 @@ type requestedTable struct { // NewSharedClient creates a client. func NewSharedClient( changefeed model.ChangeFeedID, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, filterLoop bool, pd pd.Client, grpcPool *sharedconn.ConnAndClientPool, @@ -234,8 +234,8 @@ func (s *SharedClient) Run(ctx context.Context) error { s.clusterID = s.pd.GetClusterID(ctx) g, ctx := errgroup.WithContext(ctx) - s.workers = make([]*sharedRegionWorker, 0, s.config.WorkerConcurrent) - for i := uint(0); i < s.config.WorkerConcurrent; i++ { + s.workers = make([]*sharedRegionWorker, 0, s.config.KVClient.WorkerConcurrent) + for i := uint(0); i < s.config.KVClient.WorkerConcurrent; i++ { worker := newSharedRegionWorker(s) g.Go(func() error { return worker.run(ctx) }) s.workers = append(s.workers, worker) @@ -392,7 +392,7 @@ func (s *SharedClient) requestStore( rs = &requestedStore{storeID: storeID, storeAddr: storeAddr} s.requestedStores[storeAddr] = rs - for i := uint(0); i < s.config.GrpcStreamConcurrent; i++ { + for i := uint(0); i < s.config.KVClient.GrpcStreamConcurrent; i++ { stream := newStream(ctx, s, g, rs) rs.streams = append(rs.streams, stream) } diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go index 89caa807025..18041a19a70 100644 --- a/cdc/kv/shared_client_test.go +++ b/cdc/kv/shared_client_test.go @@ -125,7 +125,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) { cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 6) client := NewSharedClient(model.ChangeFeedID{ID: "test"}, - &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}, + &config.ServerConfig{KVClient: &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}}, false, pdClient, grpcPool, regionCache, pdClock, lockResolver) defer func() { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 56113e40225..af589634062 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -718,10 +718,10 @@ func (p *processor) initDDLHandler(ctx context.Context) error { return errors.Trace(err) } - kvCfg := config.GetGlobalServerConfig().KVClient + serverCfg := config.GetGlobalServerConfig() ddlPuller, err := puller.NewDDLJobPuller( ctx, p.upstream, ddlStartTs, - kvCfg, p.changefeedID, schemaStorage, + serverCfg, p.changefeedID, schemaStorage, f, false, /* isOwner */ ) if err != nil { diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 66874542a1d..59a377366dd 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -199,16 +199,16 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { if m.multiplexing { - clientConfig := config.GetGlobalServerConfig().KVClient + serverConfig := config.GetGlobalServerConfig() grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig) client := kv.NewSharedClient( - m.changefeedID, clientConfig, m.bdrMode, + m.changefeedID, serverConfig, m.bdrMode, m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock, txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID), ) m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper( m.changefeedID, client, m.engine, - int(clientConfig.FrontierConcurrent), + int(serverConfig.KVClient.FrontierConcurrent), ) close(m.ready) diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 9f8070bde61..06749fab252 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -103,7 +103,7 @@ func (n *WrapperImpl) Start( up.PDClock, n.startTs, []tablepb.Span{n.span}, - config.GetGlobalServerConfig().KVClient, + config.GetGlobalServerConfig(), n.changefeed, n.span.TableID, n.tableName, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 06de203d161..4777e8645a6 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -561,7 +561,7 @@ func NewDDLJobPuller( ctx context.Context, up *upstream.Upstream, checkpointTs uint64, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, schemaStorage entry.SchemaStorage, filter filter.Filter, @@ -586,7 +586,7 @@ func NewDDLJobPuller( jobPuller := &ddlJobPullerImpl{ changefeedID: changefeed, - multiplexing: cfg.EnableMultiplexing, + multiplexing: cfg.KVClient.EnableMultiplexing, schemaStorage: schemaStorage, kvStorage: kvStorage, filter: filter, @@ -669,7 +669,7 @@ func NewDDLPuller(ctx context.Context, // storage can be nil only in the test if up.KVStorage != nil { puller, err = NewDDLJobPuller( - ctx, up, startTs, config.GetGlobalServerConfig().KVClient, + ctx, up, startTs, config.GetGlobalServerConfig(), changefeed, schemaStorage, filter, true, /* isOwner */ ) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 30a0c38d980..f2d014b426a 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -82,7 +82,7 @@ func New(ctx context.Context, pdClock pdutil.Clock, checkpointTs uint64, spans []tablepb.Span, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 652a049ea12..26dfc555325 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -64,7 +64,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -132,7 +132,7 @@ func newPullerForTest( defer regionCache.Close() plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), - checkpointTs, spans, config.GetDefaultServerConfig().KVClient, + checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) wg.Add(1) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 47766b69301..e1f45a861a6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -155,7 +155,8 @@ const ( "max-task-concurrency": 10, "check-balance-interval": 60000000000, "add-table-batch-size": 50 - } + }, + "enable-kv-connect-backoff": false }, "cluster-id": "default", "max-memory-percentage": 0, diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 7e91188dafa..3a7815090bc 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -25,6 +25,9 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + + // EnableKVConnectBackOff enables the backoff for kv connect. + EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` } // ValidateAndAdjust validates and adjusts the debug configuration diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 8e9317303e9..71372ccf310 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -135,7 +135,8 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), + Scheduler: NewDefaultSchedulerConfig(), + EnableKVConnectBackOff: false, }, ClusterID: "default", GcTunerMemoryThreshold: DisableMemoryLimit,