Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9771
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Sep 26, 2023
1 parent d4ca8ef commit 5156cd8
Show file tree
Hide file tree
Showing 15 changed files with 1,147 additions and 44 deletions.
22 changes: 14 additions & 8 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
config *config.ServerConfig
clusterID uint64

grpcPool GrpcPool
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewCDCClient(
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdutil.Clock,
cfg *config.KVClientConfig,
cfg *config.ServerConfig,
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
Expand All @@ -218,7 +218,7 @@ func NewCDCClient(
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) {
newStreamErr = retry.Do(ctx, func() (err error) {
streamFunc := func() (err error) {
var conn *sharedConn
defer func() {
if err != nil && conn != nil {
Expand Down Expand Up @@ -248,10 +248,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
zap.String("changefeed", c.changefeed.ID),
zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
}
if c.config.Debug.EnableKVConnectBackOff {
newStreamErr = retry.Do(ctx, streamFunc,
retry.WithBackoffBaseDelay(100),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
return
}
newStreamErr = streamFunc()
return
}

Expand Down Expand Up @@ -843,7 +849,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
}
return nil
}, retry.WithBackoffMaxDelay(500),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration)))
if retryErr != nil {
log.Warn("load regions failed",
zap.String("namespace", s.changefeed.Namespace),
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
54 changes: 27 additions & 27 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) {
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false)
config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false)
require.NotNil(t, cli)
}

Expand Down Expand Up @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
// It will stuck the normal routine
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
Expand Down Expand Up @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

wg.Add(1)
Expand Down Expand Up @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2108,7 +2108,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2236,7 +2236,7 @@ func testEventAfterFeedStop(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2423,7 +2423,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2641,7 +2641,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2737,7 +2737,7 @@ func TestFailRegionReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2820,7 +2820,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2888,7 +2888,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2966,7 +2966,7 @@ func testKVClientForceReconnect(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3117,7 +3117,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3234,7 +3234,7 @@ func TestEvTimeUpdate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3360,7 +3360,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3452,7 +3452,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
baseAllocatedID := currentRequestID()

Expand Down Expand Up @@ -3534,7 +3534,7 @@ func TestPrewriteNotMatchError(t *testing.T) {

func createFakeEventFeedSession() *eventFeedSession {
return newEventFeedSession(
&CDCClient{config: config.GetDefaultServerConfig().KVClient},
&CDCClient{config: config.GetDefaultServerConfig()},
tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")},
nil, /*lockResolver*/
100, /*startTs*/
Expand Down
5 changes: 5 additions & 0 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ func newRegionWorker(
rtsManager: newRegionTsManager(),
rtsUpdateCh: make(chan *rtsUpdateEvent, 1024),
storeAddr: addr,
<<<<<<< HEAD
concurrency: s.client.config.WorkerConcurrent,
metrics: metrics,
=======
concurrency: int(s.client.config.KVClient.WorkerConcurrent),
metrics: newRegionWorkerMetrics(changefeedID),
>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771))
inputPending: 0,
}
}
Expand Down
Loading

0 comments on commit 5156cd8

Please sign in to comment.