Skip to content

Commit

Permalink
kv(ticdc): Remove store connect backoff (#9740) (#10387)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Dec 29, 2023
1 parent 3e07924 commit 69a9085
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 53 deletions.
21 changes: 13 additions & 8 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
config *config.ServerConfig
clusterID uint64

grpcPool GrpcPool
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewCDCClient(
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdutil.Clock,
cfg *config.KVClientConfig,
cfg *config.ServerConfig,
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
Expand All @@ -218,7 +218,7 @@ func NewCDCClient(
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) {
newStreamErr = retry.Do(ctx, func() (err error) {
streamFunc := func() (err error) {
var conn *sharedConn
defer func() {
if err != nil && conn != nil {
Expand Down Expand Up @@ -248,10 +248,15 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
zap.String("changefeed", c.changefeed.ID),
zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
}
if c.config.Debug.EnableKVConnectBackOff {
newStreamErr = retry.Do(ctx, streamFunc, retry.WithBackoffBaseDelay(500),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
return
}
newStreamErr = streamFunc()
return
}

Expand Down Expand Up @@ -835,7 +840,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
}
return nil
}, retry.WithBackoffMaxDelay(500),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration)))
if retryErr != nil {
log.Warn("load regions failed",
zap.String("namespace", s.changefeed.Namespace),
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
54 changes: 27 additions & 27 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) {
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false)
config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false)
require.NotNil(t, cli)
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
// It will stuck the normal routine
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
Expand Down Expand Up @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

wg.Add(1)
Expand Down Expand Up @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2108,7 +2108,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2236,7 +2236,7 @@ func testEventAfterFeedStop(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2423,7 +2423,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2641,7 +2641,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2737,7 +2737,7 @@ func TestFailRegionReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2820,7 +2820,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2888,7 +2888,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2966,7 +2966,7 @@ func testKVClientForceReconnect(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3117,7 +3117,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3234,7 +3234,7 @@ func TestEvTimeUpdate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3360,7 +3360,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3452,7 +3452,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
baseAllocatedID := currentRequestID()

Expand Down Expand Up @@ -3534,7 +3534,7 @@ func TestPrewriteNotMatchError(t *testing.T) {

func createFakeEventFeedSession() *eventFeedSession {
return newEventFeedSession(
&CDCClient{config: config.GetDefaultServerConfig().KVClient},
&CDCClient{config: config.GetDefaultServerConfig()},
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
nil, /*lockResolver*/
100, /*startTs*/
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func newRegionWorker(
rtsManager: newRegionTsManager(),
rtsUpdateCh: make(chan *rtsUpdateEvent, 1024),
storeAddr: addr,
concurrency: s.client.config.WorkerConcurrent,
concurrency: s.client.config.KVClient.WorkerConcurrent,
metrics: metrics,
inputPending: 0,

Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
ctxC, cancel := context.WithCancel(ctx)
ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor)
kvCfg := config.GetGlobalServerConfig().KVClient
serverCfg := config.GetGlobalServerConfig()
// NOTICE: always pull the old value internally
// See also: https://github.com/pingcap/tiflow/issues/2301.
n.plr = puller.New(
Expand All @@ -80,7 +80,7 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
up.PDClock,
n.startTs,
n.tableSpan(),
kvCfg,
serverCfg,
n.changefeed,
n.tableID,
n.tableName,
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
ddlStartTs = checkpointTs - 1
}

kvCfg := config.GetGlobalServerConfig().KVClient
serverCfg := config.GetGlobalServerConfig()
stdCtx := contextutil.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName)
stdCtx = contextutil.PutChangefeedIDInCtx(stdCtx, p.changefeedID)
stdCtx = contextutil.PutRoleInCtx(stdCtx, util.RoleProcessor)
Expand Down Expand Up @@ -987,7 +987,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
p.upstream.KVStorage,
p.upstream.PDClock,
ddlStartTs,
kvCfg,
serverCfg,
p.changefeedID,
schemaStorage,
f,
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sourcemanager/puller/puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (n *Wrapper) Start(
ctxC, cancel := context.WithCancel(ctx)
ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor)
kvCfg := config.GetGlobalServerConfig().KVClient
serverConfig := config.GetGlobalServerConfig()
// NOTICE: always pull the old value internally
// See also: https://github.com/pingcap/tiflow/issues/2301.
n.p = puller.New(
Expand All @@ -97,7 +97,7 @@ func (n *Wrapper) Start(
up.PDClock,
n.startTs,
n.tableSpan(),
kvCfg,
serverConfig,
n.changefeed,
n.tableID,
n.tableName,
Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func NewDDLJobPuller(
kvStorage tidbkv.Storage,
pdClock pdutil.Clock,
checkpointTs uint64,
cfg *config.KVClientConfig,
cfg *config.ServerConfig,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
Expand Down Expand Up @@ -541,7 +541,7 @@ func NewDDLPuller(ctx context.Context,
storage,
up.PDClock,
startTs,
config.GetGlobalServerConfig().KVClient,
config.GetGlobalServerConfig(),
changefeed,
schemaStorage,
filter,
Expand Down
Loading

0 comments on commit 69a9085

Please sign in to comment.