Skip to content

Commit

Permalink
kv(ticdc): remove backoff from newStream func (#9771)
Browse files Browse the repository at this point in the history
ref #9741
  • Loading branch information
sdojjy authored Sep 26, 2023
1 parent 865a40c commit 43848f2
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 58 deletions.
22 changes: 14 additions & 8 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
config *config.ServerConfig
clusterID uint64

grpcPool GrpcPool
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewCDCClient(
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
pdClock pdutil.Clock,
cfg *config.KVClientConfig,
cfg *config.ServerConfig,
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
Expand All @@ -218,7 +218,7 @@ func NewCDCClient(
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) {
newStreamErr = retry.Do(ctx, func() (err error) {
streamFunc := func() (err error) {
var conn *sharedConn
defer func() {
if err != nil && conn != nil {
Expand Down Expand Up @@ -248,10 +248,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
zap.String("changefeed", c.changefeed.ID),
zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
}
if c.config.Debug.EnableKVConnectBackOff {
newStreamErr = retry.Do(ctx, streamFunc,
retry.WithBackoffBaseDelay(100),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
return
}
newStreamErr = streamFunc()
return
}

Expand Down Expand Up @@ -842,7 +848,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
}
return nil
}, retry.WithBackoffMaxDelay(500),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration)))
if retryErr != nil {
log.Warn("load regions failed",
zap.String("namespace", s.changefeed.Namespace),
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -293,7 +293,7 @@ func prepareBench(b *testing.B, regionNum int) (
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
Expand Down
54 changes: 27 additions & 27 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestNewClient(t *testing.T) {
defer regionCache.Close()
cli := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false)
config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false)
require.NotNil(t, cli)
}

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestConnectOfflineTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// Take care of the eventCh, it's used to output resolvedTs event or kv event
// It will stuck the normal routine
eventCh := make(chan model.RegionFeedEvent, 50)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestRecvLargeMessageSize(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -528,7 +528,7 @@ func TestHandleError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestCompatibilityWithSameConn(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var wg2 sync.WaitGroup
wg2.Add(1)
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestClusterIDMismatch(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

var wg2 sync.WaitGroup
Expand Down Expand Up @@ -823,7 +823,7 @@ func testHandleFeedEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func TestStreamSendWithError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1396,7 +1396,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1529,7 +1529,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func TestIncompatibleTiKV(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
Expand Down Expand Up @@ -1815,7 +1815,7 @@ func TestNoPendingRegionError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)

wg.Add(1)
Expand Down Expand Up @@ -1894,7 +1894,7 @@ func TestDropStaleRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2008,7 +2008,7 @@ func TestResolveLock(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2113,7 +2113,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
var clientWg sync.WaitGroup
clientWg.Add(1)
Expand Down Expand Up @@ -2241,7 +2241,7 @@ func testEventAfterFeedStop(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2428,7 +2428,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2646,7 +2646,7 @@ func TestResolveLockNoCandidate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2742,7 +2742,7 @@ func TestFailRegionReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2825,7 +2825,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2893,7 +2893,7 @@ func testClientErrNoPendingRegion(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -2971,7 +2971,7 @@ func testKVClientForceReconnect(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3122,7 +3122,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 100)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3239,7 +3239,7 @@ func TestEvTimeUpdate(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3365,7 +3365,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -3457,7 +3457,7 @@ func TestPrewriteNotMatchError(t *testing.T) {
defer regionCache.Close()
cdcClient := NewCDCClient(
ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(),
config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false)
config.GetDefaultServerConfig(), changefeed, 0, "", false)
eventCh := make(chan model.RegionFeedEvent, 50)
baseAllocatedID := currentRequestID()

Expand Down Expand Up @@ -3539,7 +3539,7 @@ func TestPrewriteNotMatchError(t *testing.T) {

func createFakeEventFeedSession() *eventFeedSession {
return newEventFeedSession(
&CDCClient{config: config.GetDefaultServerConfig().KVClient},
&CDCClient{config: config.GetDefaultServerConfig()},
tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")},
nil, /*lockResolver*/
100, /*startTs*/
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func newRegionWorker(
rtsManager: newRegionTsManager(),
rtsUpdateCh: make(chan *rtsUpdateEvent, 1024),
storeAddr: addr,
concurrency: int(s.client.config.WorkerConcurrent),
concurrency: int(s.client.config.KVClient.WorkerConcurrent),
metrics: newRegionWorkerMetrics(changefeedID),
inputPending: 0,
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type MultiplexingEvent struct {
// SharedClient is shared in many tables. Methods are thread-safe.
type SharedClient struct {
changefeed model.ChangeFeedID
config *config.KVClientConfig
config *config.ServerConfig
metrics sharedClientMetrics

clusterID uint64
Expand Down Expand Up @@ -130,7 +130,7 @@ type requestedTable struct {
// NewSharedClient creates a client.
func NewSharedClient(
changefeed model.ChangeFeedID,
cfg *config.KVClientConfig,
cfg *config.ServerConfig,
filterLoop bool,
pd pd.Client,
grpcPool *sharedconn.ConnAndClientPool,
Expand Down Expand Up @@ -234,8 +234,8 @@ func (s *SharedClient) Run(ctx context.Context) error {
s.clusterID = s.pd.GetClusterID(ctx)

g, ctx := errgroup.WithContext(ctx)
s.workers = make([]*sharedRegionWorker, 0, s.config.WorkerConcurrent)
for i := uint(0); i < s.config.WorkerConcurrent; i++ {
s.workers = make([]*sharedRegionWorker, 0, s.config.KVClient.WorkerConcurrent)
for i := uint(0); i < s.config.KVClient.WorkerConcurrent; i++ {
worker := newSharedRegionWorker(s)
g.Go(func() error { return worker.run(ctx) })
s.workers = append(s.workers, worker)
Expand Down Expand Up @@ -392,7 +392,7 @@ func (s *SharedClient) requestStore(

rs = &requestedStore{storeID: storeID, storeAddr: storeAddr}
s.requestedStores[storeAddr] = rs
for i := uint(0); i < s.config.GrpcStreamConcurrent; i++ {
for i := uint(0); i < s.config.KVClient.GrpcStreamConcurrent; i++ {
stream := newStream(ctx, s, g, rs)
rs.streams = append(rs.streams, stream)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {
cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 6)

client := NewSharedClient(model.ChangeFeedID{ID: "test"},
&config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1},
&config.ServerConfig{KVClient: &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}},
false, pdClient, grpcPool, regionCache, pdClock, lockResolver)

defer func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,10 @@ func (p *processor) initDDLHandler(ctx context.Context) error {
return errors.Trace(err)
}

kvCfg := config.GetGlobalServerConfig().KVClient
serverCfg := config.GetGlobalServerConfig()
ddlPuller, err := puller.NewDDLJobPuller(
ctx, p.upstream, ddlStartTs,
kvCfg, p.changefeedID, schemaStorage,
serverCfg, p.changefeedID, schemaStorage,
f, false, /* isOwner */
)
if err != nil {
Expand Down
Loading

0 comments on commit 43848f2

Please sign in to comment.