diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 423eea05c5d..2397db4b391 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client - config *config.KVClientConfig + config *config.ServerConfig clusterID uint64 grpcPool GrpcPool @@ -192,7 +192,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -218,7 +218,7 @@ func NewCDCClient( } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { - newStreamErr = retry.Do(ctx, func() (err error) { + streamFunc := func() (err error) { var conn *sharedConn defer func() { if err != nil && conn != nil { @@ -248,10 +248,15 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) + } + if c.config.Debug.EnableKVConnectBackOff { + newStreamErr = retry.Do(ctx, streamFunc, retry.WithBackoffBaseDelay(500), + retry.WithMaxTries(2), + retry.WithIsRetryableErr(cerror.IsRetryableError), + ) + return + } + newStreamErr = streamFunc() return } @@ -835,7 +840,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } return nil }, retry.WithBackoffMaxDelay(500), - retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration))) if retryErr != nil { log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index 3b31c125e0d..4c00a77064c 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 8a0bb559097..b9f9cf98e60 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) + config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2108,7 +2108,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2236,7 +2236,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2423,7 +2423,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2641,7 +2641,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2737,7 +2737,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2820,7 +2820,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2888,7 +2888,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2966,7 +2966,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3117,7 +3117,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3234,7 +3234,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3360,7 +3360,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3452,7 +3452,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3534,7 +3534,7 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( - &CDCClient{config: config.GetDefaultServerConfig().KVClient}, + &CDCClient{config: config.GetDefaultServerConfig()}, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 08fd2a3c956..d336e03b4a5 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -149,7 +149,7 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, - concurrency: s.client.config.WorkerConcurrent, + concurrency: s.client.config.KVClient.WorkerConcurrent, metrics: metrics, inputPending: 0, diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index a6f2ce031d3..188b0167e97 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -68,7 +68,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, ctxC, cancel := context.WithCancel(ctx) ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor) - kvCfg := config.GetGlobalServerConfig().KVClient + serverCfg := config.GetGlobalServerConfig() // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/tiflow/issues/2301. n.plr = puller.New( @@ -80,7 +80,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, up.PDClock, n.startTs, n.tableSpan(), - kvCfg, + serverCfg, n.changefeed, n.tableID, n.tableName, diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8289c0579f1..8a724ac7bc0 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -959,7 +959,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S ddlStartTs = checkpointTs - 1 } - kvCfg := config.GetGlobalServerConfig().KVClient + serverCfg := config.GetGlobalServerConfig() stdCtx := contextutil.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName) stdCtx = contextutil.PutChangefeedIDInCtx(stdCtx, p.changefeedID) stdCtx = contextutil.PutRoleInCtx(stdCtx, util.RoleProcessor) @@ -987,7 +987,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S p.upstream.KVStorage, p.upstream.PDClock, ddlStartTs, - kvCfg, + serverCfg, p.changefeedID, schemaStorage, f, diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index c2f0f64cc4b..5aa38008a97 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -85,7 +85,7 @@ func (n *Wrapper) Start( ctxC, cancel := context.WithCancel(ctx) ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor) - kvCfg := config.GetGlobalServerConfig().KVClient + serverConfig := config.GetGlobalServerConfig() // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/tiflow/issues/2301. n.p = puller.New( @@ -97,7 +97,7 @@ func (n *Wrapper) Start( up.PDClock, n.startTs, n.tableSpan(), - kvCfg, + serverConfig, n.changefeed, n.tableID, n.tableName, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index a9ce95c2954..520378e6aae 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -459,7 +459,7 @@ func NewDDLJobPuller( kvStorage tidbkv.Storage, pdClock pdutil.Clock, checkpointTs uint64, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, schemaStorage entry.SchemaStorage, filter filter.Filter, @@ -541,7 +541,7 @@ func NewDDLPuller(ctx context.Context, storage, up.PDClock, startTs, - config.GetGlobalServerConfig().KVClient, + config.GetGlobalServerConfig(), changefeed, schemaStorage, filter, diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index b147ebbbc4f..30cdbda9769 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -84,7 +84,7 @@ func New(ctx context.Context, pdClock pdutil.Clock, checkpointTs uint64, spans []regionspan.Span, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 60407042b94..408a86ba903 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -62,7 +62,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -130,7 +130,7 @@ func newPullerForTest( defer regionCache.Close() plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), - checkpointTs, spans, config.GetDefaultServerConfig().KVClient, + checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) wg.Add(1) go func() { diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 029a6984279..a0b2349a953 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -145,7 +145,8 @@ const ( "check-balance-interval": 60000000000, "add-table-batch-size": 50 }, - "enable-new-sink": true + "enable-new-sink": true, + "enable-kv-connect-backoff": false }, "cluster-id": "default", "max-memory-percentage": 70 diff --git a/pkg/config/debug.go b/pkg/config/debug.go index e4ed8b8d0ec..e1a76f18230 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -48,6 +48,9 @@ type DebugConfig struct { // EnableNewSink enables the new sink. // The default value is true. EnableNewSink bool `toml:"enable-new-sink" json:"enable-new-sink"` + + // EnableKVConnectBackOff enables the backoff for kv connect. + EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` } // ValidateAndAdjust validates and adjusts the debug configuration diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 2d327103032..fc7ed53a8d6 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -155,9 +155,10 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), - EnableNewSink: true, - EnablePullBasedSink: true, + Scheduler: NewDefaultSchedulerConfig(), + EnableNewSink: true, + EnablePullBasedSink: true, + EnableKVConnectBackOff: false, }, ClusterID: "default", MaxMemoryPercentage: DefaultMaxMemoryPercentage,