From 5156cd83ffd3ee935c7526d1956ccafc5af7a245 Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Tue, 26 Sep 2023 13:30:16 +0800 Subject: [PATCH] This is an automated cherry-pick of #9771 Signed-off-by: ti-chi-bot --- cdc/kv/client.go | 22 +- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 54 +- cdc/kv/region_worker.go | 5 + cdc/kv/shared_client.go | 803 ++++++++++++++++++ cdc/kv/shared_client_test.go | 243 ++++++ cdc/processor/processor.go | 8 + cdc/processor/sourcemanager/manager.go | 21 + .../sourcemanager/puller/puller_wrapper.go | 2 +- cdc/puller/ddl_puller.go | 14 +- cdc/puller/puller.go | 2 +- cdc/puller/puller_test.go | 4 +- pkg/config/config_test_data.go | 3 +- pkg/config/debug.go | 3 + pkg/config/server_config.go | 3 +- 15 files changed, 1147 insertions(+), 44 deletions(-) create mode 100644 cdc/kv/shared_client.go create mode 100644 cdc/kv/shared_client_test.go diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 4ffc3bbe974..8805713e234 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -156,7 +156,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client - config *config.KVClientConfig + config *config.ServerConfig clusterID uint64 grpcPool GrpcPool @@ -192,7 +192,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -218,7 +218,7 @@ func NewCDCClient( } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { - newStreamErr = retry.Do(ctx, func() (err error) { + streamFunc := func() (err error) { var conn *sharedConn defer func() { if err != nil && conn != nil { @@ -248,10 +248,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) + } + if c.config.Debug.EnableKVConnectBackOff { + newStreamErr = retry.Do(ctx, streamFunc, + retry.WithBackoffBaseDelay(100), + retry.WithMaxTries(2), + retry.WithIsRetryableErr(cerror.IsRetryableError), + ) + return + } + newStreamErr = streamFunc() return } @@ -843,7 +849,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } return nil }, retry.WithBackoffMaxDelay(500), - retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration))) if retryErr != nil { log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index ce97494ab33..fd813903df2 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index b1862dec8d5..9cc31215b14 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) + config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2108,7 +2108,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2236,7 +2236,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2423,7 +2423,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2641,7 +2641,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2737,7 +2737,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2820,7 +2820,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2888,7 +2888,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2966,7 +2966,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3117,7 +3117,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3234,7 +3234,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3360,7 +3360,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3452,7 +3452,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3534,7 +3534,7 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( - &CDCClient{config: config.GetDefaultServerConfig().KVClient}, + &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index ae24974c42e..3783eb56e18 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -147,8 +147,13 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, +<<<<<<< HEAD concurrency: s.client.config.WorkerConcurrent, metrics: metrics, +======= + concurrency: int(s.client.config.KVClient.WorkerConcurrent), + metrics: newRegionWorkerMetrics(changefeedID), +>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771)) inputPending: 0, } } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go new file mode 100644 index 00000000000..e0e146b36fb --- /dev/null +++ b/cdc/kv/shared_client.go @@ -0,0 +1,803 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "encoding/binary" + "sync" + "sync/atomic" + "time" + + "blainsmith.com/go/seahash" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/kv/regionlock" + "github.com/pingcap/tiflow/cdc/kv/sharedconn" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/txnutil" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "github.com/prometheus/client_golang/prometheus" + kvclientv2 "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// SubscriptionID comes from `SharedClient.AllocSubscriptionID`. +type SubscriptionID uint64 + +// MultiplexingEvent is like model.RegionFeedEvent. +type MultiplexingEvent struct { + model.RegionFeedEvent + SubscriptionID SubscriptionID + Start time.Time +} + +// SharedClient is shared in many tables. Methods are thread-safe. +type SharedClient struct { + changefeed model.ChangeFeedID + config *config.ServerConfig + metrics sharedClientMetrics + + clusterID uint64 + filterLoop bool + + pd pd.Client + grpcPool *sharedconn.ConnAndClientPool + regionCache *tikv.RegionCache + pdClock pdutil.Clock + lockResolver txnutil.LockResolver + + // requestRangeCh is used to retrieve subscribed table tasks. + requestRangeCh *chann.DrainableChann[rangeTask] + // regionCh is used to cache region tasks after ranges locked. + regionCh *chann.DrainableChann[singleRegionInfo] + // regionRouter is used to cache region tasks with rpcCtx attached. + regionRouter *chann.DrainableChann[singleRegionInfo] + // resolveLockCh is used to retrieve resolve lock tasks. + resolveLockCh *chann.DrainableChann[resolveLockTask] + errCh *chann.DrainableChann[regionErrorInfo] + + workers []*sharedRegionWorker + + // only modified in requestRegionToStore so lock is unnecessary. + requestedStores map[string]*requestedStore + + totalSpans struct { + sync.RWMutex + v map[SubscriptionID]*requestedTable + } +} + +type resolveLockTask struct { + regionID uint64 + maxVersion uint64 + state *regionlock.LockedRange + enter time.Time +} + +type rangeTask struct { + span tablepb.Span + requestedTable *requestedTable +} + +type requestedStore struct { + storeID uint64 + storeAddr string + nextStream atomic.Uint32 + streams []*requestedStream +} + +type requestedTable struct { + subscriptionID SubscriptionID + + span tablepb.Span + startTs model.Ts + rangeLock *regionlock.RegionRangeLock + eventCh chan<- MultiplexingEvent + + // To handle table removing. + stopped atomic.Bool + + // To handle lock resolvings. + postUpdateRegionResolvedTs func(regionID uint64, state *regionlock.LockedRange) + staleLocksVersion atomic.Uint64 +} + +// NewSharedClient creates a client. +func NewSharedClient( + changefeed model.ChangeFeedID, + cfg *config.ServerConfig, + filterLoop bool, + pd pd.Client, + grpcPool *sharedconn.ConnAndClientPool, + regionCache *tikv.RegionCache, + pdClock pdutil.Clock, + lockResolver txnutil.LockResolver, +) *SharedClient { + s := &SharedClient{ + changefeed: changefeed, + config: cfg, + clusterID: 0, + filterLoop: filterLoop, + + pd: pd, + grpcPool: grpcPool, + regionCache: regionCache, + pdClock: pdClock, + lockResolver: lockResolver, + + requestRangeCh: chann.NewAutoDrainChann[rangeTask](), + regionCh: chann.NewAutoDrainChann[singleRegionInfo](), + regionRouter: chann.NewAutoDrainChann[singleRegionInfo](), + resolveLockCh: chann.NewAutoDrainChann[resolveLockTask](), + errCh: chann.NewAutoDrainChann[regionErrorInfo](), + + requestedStores: make(map[string]*requestedStore), + } + s.totalSpans.v = make(map[SubscriptionID]*requestedTable) + s.initMetrics() + return s +} + +// AllocSubscriptionID gets an ID can be used in `Subscribe`. +func (s *SharedClient) AllocSubscriptionID() SubscriptionID { + return SubscriptionID(subscriptionIDGen.Add(1)) +} + +// Subscribe the given table span. +// NOTE: `span.TableID` must be set correctly. +func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent) { + if span.TableID == 0 { + log.Panic("event feed subscribe with zero tablepb.Span.TableID", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + } + + rt := s.newRequestedTable(subID, span, startTs, eventCh) + s.totalSpans.Lock() + s.totalSpans.v[subID] = rt + s.totalSpans.Unlock() + + s.requestRangeCh.In() <- rangeTask{span: span, requestedTable: rt} + log.Info("event feed subscribes table success", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) +} + +// Unsubscribe the given table span. All covered regions will be deregistered asynchronously. +// NOTE: `span.TableID` must be set correctly. +func (s *SharedClient) Unsubscribe(subID SubscriptionID) { + // NOTE: `subID` is cleared from `s.totalSpans` in `onTableDrained`. + s.totalSpans.Lock() + rt := s.totalSpans.v[subID] + s.totalSpans.Unlock() + if rt != nil { + s.setTableStopped(rt) + } + + log.Info("event feed unsubscribes table", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.Bool("exists", rt != nil)) +} + +// ResolveLock is a function. If outsider subscribers find a span resolved timestamp is +// advanced slowly or stopped, they can try to resolve locks in the given span. +func (s *SharedClient) ResolveLock(subID SubscriptionID, maxVersion uint64) { + s.totalSpans.Lock() + rt := s.totalSpans.v[subID] + s.totalSpans.Unlock() + if rt != nil { + rt.updateStaleLocks(s, maxVersion) + } +} + +// RegionCount returns subscribed region count for the span. +func (s *SharedClient) RegionCount(subID SubscriptionID) uint64 { + s.totalSpans.RLock() + defer s.totalSpans.RUnlock() + if rt := s.totalSpans.v[subID]; rt != nil { + return rt.rangeLock.RefCount() + } + return 0 +} + +// Run the client. +func (s *SharedClient) Run(ctx context.Context) error { + s.clusterID = s.pd.GetClusterID(ctx) + + g, ctx := errgroup.WithContext(ctx) + s.workers = make([]*sharedRegionWorker, 0, s.config.KVClient.WorkerConcurrent) + for i := uint(0); i < s.config.KVClient.WorkerConcurrent; i++ { + worker := newSharedRegionWorker(s) + g.Go(func() error { return worker.run(ctx) }) + s.workers = append(s.workers, worker) + } + + g.Go(func() error { return s.handleRequestRanges(ctx, g) }) + g.Go(func() error { return s.dispatchRequest(ctx) }) + g.Go(func() error { return s.requestRegionToStore(ctx, g) }) + g.Go(func() error { return s.handleErrors(ctx) }) + g.Go(func() error { return s.resolveLock(ctx) }) + + log.Info("event feed started", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + defer log.Info("event feed exits", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) + return g.Wait() +} + +// Close closes the client. Must be called after `Run` returns. +func (s *SharedClient) Close() { + s.requestRangeCh.CloseAndDrain() + s.regionCh.CloseAndDrain() + s.regionRouter.CloseAndDrain() + s.resolveLockCh.CloseAndDrain() + s.errCh.CloseAndDrain() + s.clearMetrics() + + for _, rs := range s.requestedStores { + for _, stream := range rs.streams { + stream.requests.CloseAndDrain() + } + } +} + +// GetPDClock returns a pdutil.Clock. +func (s *SharedClient) GetPDClock() pdutil.Clock { + return s.pdClock +} + +func (s *SharedClient) setTableStopped(rt *requestedTable) { + log.Info("event feed starts to stop table", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) + + // Set stopped to true so we can stop handling region events from the table. + // Then send a special singleRegionInfo to regionRouter to deregister the table + // from all TiKV instances. + if rt.stopped.CompareAndSwap(false, true) { + s.regionRouter.In() <- singleRegionInfo{requestedTable: rt} + if rt.rangeLock.Stop() { + s.onTableDrained(rt) + } + } +} + +func (s *SharedClient) onTableDrained(rt *requestedTable) { + log.Info("event feed stop table is finished", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) + + s.totalSpans.Lock() + defer s.totalSpans.Unlock() + delete(s.totalSpans.v, rt.subscriptionID) +} + +func (s *SharedClient) onRegionFail(errInfo regionErrorInfo) { + s.errCh.In() <- errInfo +} + +func (s *SharedClient) dispatchRequest(ctx context.Context) error { + attachCtx := func(ctx context.Context, sri singleRegionInfo) { + rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID) + if rpcCtx != nil { + sri.rpcCtx = rpcCtx + locateTime := time.Since(sri.lockedRange.Created).Milliseconds() + s.metrics.regionLocateDuration.Observe(float64(locateTime)) + s.regionRouter.In() <- sri + return + } + if err != nil { + log.Debug("event feed get RPC context fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", sri.requestedTable.subscriptionID), + zap.Uint64("regionID", sri.verID.GetID()), + zap.Error(err)) + } + s.onRegionFail(newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})) + } + + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case sri := <-s.regionCh.Out(): + attachCtx(ctx, sri) + } + } +} + +func (s *SharedClient) requestRegionToStore(ctx context.Context, g *errgroup.Group) error { + for { + var sri singleRegionInfo + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case sri = <-s.regionRouter.Out(): + } + // If lockedRange is nil it means it's a special task from stopping the table. + if sri.lockedRange == nil { + for _, rs := range s.requestedStores { + rs.broadcastRequest(sri) + } + continue + } + + storeID := sri.rpcCtx.Peer.StoreId + storeAddr := sri.rpcCtx.Addr + log.Debug("event feed will request a region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("table", sri.requestedTable.span), + zap.Any("subscriptionID", sri.requestedTable.subscriptionID), + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("storeID", storeID), + zap.String("addr", storeAddr)) + s.requestStore(ctx, g, storeID, storeAddr).appendRequest(sri) + } +} + +func (s *SharedClient) getRPCContextForRegion(ctx context.Context, id tikv.RegionVerID) (*tikv.RPCContext, error) { + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + rpcCtx, err := s.regionCache.GetTiKVRPCContext(bo, id, kvclientv2.ReplicaReadLeader, 0) + if err != nil { + return nil, errors.Trace(err) + } + return rpcCtx, nil +} + +func (s *SharedClient) requestStore( + ctx context.Context, g *errgroup.Group, + storeID uint64, storeAddr string, +) *requestedStore { + var rs *requestedStore + if rs = s.requestedStores[storeAddr]; rs != nil { + return rs + } + + rs = &requestedStore{storeID: storeID, storeAddr: storeAddr} + s.requestedStores[storeAddr] = rs + for i := uint(0); i < s.config.KVClient.GrpcStreamConcurrent; i++ { + stream := newStream(ctx, s, g, rs) + rs.streams = append(rs.streams, stream) + } + + return rs +} + +func (s *SharedClient) createRegionRequest(sri singleRegionInfo) *cdcpb.ChangeDataRequest { + return &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ClusterId: s.clusterID, TicdcVersion: version.ReleaseSemver()}, + RegionId: sri.verID.GetID(), + RequestId: uint64(sri.requestedTable.subscriptionID), + RegionEpoch: sri.rpcCtx.Meta.RegionEpoch, + CheckpointTs: sri.resolvedTs(), + StartKey: sri.span.StartKey, + EndKey: sri.span.EndKey, + ExtraOp: kvrpcpb.ExtraOp_ReadOldValue, + FilterLoop: s.filterLoop, + } +} + +func (r *requestedStore) appendRequest(sri singleRegionInfo) { + offset := r.nextStream.Add(1) % uint32(len(r.streams)) + r.streams[offset].requests.In() <- sri +} + +func (r *requestedStore) broadcastRequest(sri singleRegionInfo) { + for _, stream := range r.streams { + stream.requests.In() <- sri + } +} + +func (s *SharedClient) handleRequestRanges(ctx context.Context, g *errgroup.Group) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case task := <-s.requestRangeCh.Out(): + g.Go(func() error { return s.divideAndScheduleRegions(ctx, task.span, task.requestedTable) }) + } + } +} + +func (s *SharedClient) divideAndScheduleRegions( + ctx context.Context, + span tablepb.Span, + requestedTable *requestedTable, +) error { + limit := 1024 + nextSpan := span + backoffBeforeLoad := false + for { + if backoffBeforeLoad { + if err := util.Hang(ctx, 5*time.Second); err != nil { + return err + } + backoffBeforeLoad = false + } + log.Debug("event feed is going to load regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", requestedTable.subscriptionID), + zap.Any("span", nextSpan)) + + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.StartKey, nextSpan.EndKey, limit) + if err != nil { + log.Warn("event feed load regions failed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", requestedTable.subscriptionID), + zap.Any("span", nextSpan), + zap.Error(err)) + backoffBeforeLoad = true + continue + } + + metas := make([]*metapb.Region, 0, len(regions)) + for _, region := range regions { + if meta := region.GetMeta(); meta != nil { + metas = append(metas, meta) + } + } + metas = regionlock.CutRegionsLeftCoverSpan(metas, nextSpan) + if len(metas) == 0 { + log.Warn("event feed load regions with holes", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", requestedTable.subscriptionID), + zap.Any("span", nextSpan)) + backoffBeforeLoad = true + continue + } + + for _, region := range metas { + // NOTE: the End key return by the PD API will be nil to represent the biggest key. + regionSpan := tablepb.Span{StartKey: region.StartKey, EndKey: region.EndKey} + regionSpan = spanz.HackSpan(regionSpan) + partialSpan, err := spanz.Intersect(requestedTable.span, regionSpan) + if err != nil { + log.Panic("event feed check spans intersect shouldn't fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", requestedTable.subscriptionID)) + } + verID := tikv.NewRegionVerID(region.Id, region.RegionEpoch.ConfVer, region.RegionEpoch.Version) + sri := newSingleRegionInfo(verID, partialSpan, nil) + sri.requestedTable = requestedTable + s.scheduleRegionRequest(ctx, sri) + + nextSpan.StartKey = region.EndKey + if spanz.EndCompare(nextSpan.StartKey, span.EndKey) >= 0 { + return nil + } + } + } +} + +func (s *SharedClient) scheduleRegionRequest(ctx context.Context, sri singleRegionInfo) { + handleResult := func(res regionlock.LockRangeResult) { + switch res.Status { + case regionlock.LockRangeStatusSuccess: + sri.lockedRange = res.LockedRange + lockTime := time.Since(sri.lockedRange.Created).Milliseconds() + s.metrics.regionLockDuration.Observe(float64(lockTime)) + select { + case s.regionCh.In() <- sri: + case <-ctx.Done(): + } + case regionlock.LockRangeStatusStale: + for _, r := range res.RetryRanges { + s.scheduleRangeRequest(ctx, r, sri.requestedTable) + } + default: + return + } + } + + rangeLock := sri.requestedTable.rangeLock + res := rangeLock.LockRange(ctx, sri.span.StartKey, sri.span.EndKey, sri.verID.GetID(), sri.verID.GetVer()) + if res.Status == regionlock.LockRangeStatusWait { + res = res.WaitFn() + } + handleResult(res) +} + +func (s *SharedClient) scheduleRangeRequest( + ctx context.Context, span tablepb.Span, + requestedTable *requestedTable, +) { + select { + case s.requestRangeCh.In() <- rangeTask{span: span, requestedTable: requestedTable}: + case <-ctx.Done(): + } +} + +func (s *SharedClient) handleErrors(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case errInfo := <-s.errCh.Out(): + if err := s.handleError(ctx, errInfo); err != nil { + return err + } + } + } +} + +func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo) error { + if errInfo.requestedTable.rangeLock.UnlockRange( + errInfo.span.StartKey, errInfo.span.EndKey, + errInfo.verID.GetID(), errInfo.verID.GetVer(), errInfo.resolvedTs()) { + s.onTableDrained(errInfo.requestedTable) + return nil + } + + err := errors.Cause(errInfo.err) + switch eerr := err.(type) { + case *eventError: + innerErr := eerr.err + log.Debug("cdc error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID), + zap.Stringer("error", innerErr)) + + if notLeader := innerErr.GetNotLeader(); notLeader != nil { + metricFeedNotLeaderCounter.Inc() + s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) + s.scheduleRegionRequest(ctx, errInfo.singleRegionInfo) + return nil + } + if innerErr.GetEpochNotMatch() != nil { + metricFeedEpochNotMatchCounter.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) + return nil + } + if innerErr.GetRegionNotFound() != nil { + metricFeedRegionNotFoundCounter.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) + return nil + } + if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil { + metricFeedDuplicateRequestCounter.Inc() + // TODO(qupeng): It's better to add a new machanism to deregister one region. + return errors.New("duplicate request") + } + if compatibility := innerErr.GetCompatibility(); compatibility != nil { + return cerror.ErrVersionIncompatible.GenWithStackByArgs(compatibility) + } + if mismatch := innerErr.GetClusterIdMismatch(); mismatch != nil { + return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request) + } + + log.Warn("empty or unknown cdc error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID), + zap.Stringer("error", innerErr)) + metricFeedUnknownErrorCounter.Inc() + s.scheduleRegionRequest(ctx, errInfo.singleRegionInfo) + return nil + case *rpcCtxUnavailableErr: + metricFeedRPCCtxUnavailable.Inc() + s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable) + return nil + case *sendRequestToStoreErr: + metricStoreSendRequestErr.Inc() + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) + s.scheduleRegionRequest(ctx, errInfo.singleRegionInfo) + return nil + default: + // TODO(qupeng): for some errors it's better to just deregister the region from TiKVs. + log.Warn("event feed meets an internal error, fail the changefeed", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID), + zap.String("span", errInfo.requestedTable.span.String()), + zap.Error(err)) + return err + } +} + +func (s *SharedClient) resolveLock(ctx context.Context) error { + resolveLastRun := make(map[uint64]time.Time) + + gcResolveLastRun := func() { + if len(resolveLastRun) > 1024 { + copied := make(map[uint64]time.Time) + now := time.Now() + for regionID, lastRun := range resolveLastRun { + if now.Sub(lastRun) < resolveLockMinInterval { + resolveLastRun[regionID] = lastRun + } + } + resolveLastRun = copied + } + } + + doResolve := func(regionID uint64, state *regionlock.LockedRange, maxVersion uint64) { + if state.CheckpointTs.Load() > maxVersion || !state.Initialzied.Load() { + return + } + if lastRun, ok := resolveLastRun[regionID]; ok { + if time.Since(lastRun) < resolveLockMinInterval { + return + } + } + start := time.Now() + defer s.metrics.lockResolveRunDuration.Observe(float64(time.Since(start).Milliseconds())) + + if err := s.lockResolver.Resolve(ctx, regionID, maxVersion); err != nil { + log.Warn("event feed resolve lock fail", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Uint64("regionID", regionID), + zap.Error(err)) + } + resolveLastRun[regionID] = time.Now() + } + + gcTicker := time.NewTicker(resolveLockMinInterval * 3 / 2) + defer gcTicker.Stop() + for { + var task resolveLockTask + select { + case <-ctx.Done(): + return ctx.Err() + case <-gcTicker.C: + gcResolveLastRun() + case task = <-s.resolveLockCh.Out(): + s.metrics.lockResolveWaitDuration.Observe(float64(time.Since(task.enter).Milliseconds())) + doResolve(task.regionID, task.state, task.maxVersion) + } + } +} + +func (s *SharedClient) newRequestedTable( + subID SubscriptionID, span tablepb.Span, startTs uint64, + eventCh chan<- MultiplexingEvent, +) *requestedTable { + cfName := s.changefeed.String() + rangeLock := regionlock.NewRegionRangeLock(span.StartKey, span.EndKey, startTs, cfName) + + rt := &requestedTable{ + subscriptionID: subID, + span: span, + startTs: startTs, + rangeLock: rangeLock, + eventCh: eventCh, + } + + rt.postUpdateRegionResolvedTs = func(regionID uint64, state *regionlock.LockedRange) { + maxVersion := rt.staleLocksVersion.Load() + if state.CheckpointTs.Load() <= maxVersion && state.Initialzied.Load() { + enter := time.Now() + s.resolveLockCh.In() <- resolveLockTask{regionID, maxVersion, state, enter} + } + } + return rt +} + +func (r *requestedTable) associateSubscriptionID(event model.RegionFeedEvent) MultiplexingEvent { + return MultiplexingEvent{ + RegionFeedEvent: event, + SubscriptionID: r.subscriptionID, + Start: time.Now(), + } +} + +func (r *requestedTable) updateStaleLocks(s *SharedClient, maxVersion uint64) { + for { + old := r.staleLocksVersion.Load() + if old >= maxVersion { + return + } + if r.staleLocksVersion.CompareAndSwap(old, maxVersion) { + break + } + } + + res := r.rangeLock.CollectLockedRangeAttrs(r.postUpdateRegionResolvedTs) + log.Warn("event feed finds slow locked ranges", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", r.subscriptionID), + zap.String("span", r.span.String()), + zap.Any("ranges", res)) +} + +type sharedClientMetrics struct { + regionLockDuration prometheus.Observer + regionLocateDuration prometheus.Observer + regionConnectDuration prometheus.Observer + batchResolvedSize prometheus.Observer + lockResolveWaitDuration prometheus.Observer + lockResolveRunDuration prometheus.Observer +} + +func (s *SharedClient) initMetrics() { + s.metrics.regionLockDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "lock") + s.metrics.regionLocateDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "locate") + s.metrics.regionConnectDuration = regionConnectDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "connect") + + s.metrics.lockResolveWaitDuration = lockResolveDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "wait") + s.metrics.lockResolveRunDuration = lockResolveDuration. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID, "run") + + s.metrics.batchResolvedSize = batchResolvedEventSize. + WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) +} + +func (s *SharedClient) clearMetrics() { + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "lock") + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "locate") + regionConnectDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "connect") + + lockResolveDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "wait") + lockResolveDuration.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID, "run") + + batchResolvedEventSize.DeleteLabelValues(s.changefeed.Namespace, s.changefeed.ID) +} + +func hashRegionID(regionID uint64, slots int) int { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, regionID) + return int(seahash.Sum64(b) % uint64(slots)) +} + +var ( + // To generate an ID for a new subscription. And the subscription ID will also be used as + // `RequestId` in region requests of the table. + subscriptionIDGen atomic.Uint64 + + // To generate a streamID in `newStream`. + streamIDGen atomic.Uint64 +) + +const ( + resolveLockMinInterval time.Duration = 10 * time.Second + invalidSubscriptionID SubscriptionID = SubscriptionID(0) +) diff --git a/cdc/kv/shared_client_test.go b/cdc/kv/shared_client_test.go new file mode 100644 index 00000000000..18041a19a70 --- /dev/null +++ b/cdc/kv/shared_client_test.go @@ -0,0 +1,243 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/tidb/store/mockstore/mockcopr" + "github.com/pingcap/tiflow/cdc/kv/regionlock" + "github.com/pingcap/tiflow/cdc/kv/sharedconn" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/txnutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/testutils" + "github.com/tikv/client-go/v2/tikv" +) + +func TestRequestedStreamRequestedRegions(t *testing.T) { + stream := &requestedStream{streamID: 100, requests: chann.NewAutoDrainChann[singleRegionInfo]()} + defer stream.requests.CloseAndDrain() + stream.requestedRegions.m = make(map[SubscriptionID]map[uint64]*regionFeedState) + + require.Nil(t, stream.getState(1, 2)) + require.Nil(t, stream.takeState(1, 2)) + + stream.setState(1, 2, ®ionFeedState{sri: singleRegionInfo{requestedTable: &requestedTable{}}}) + require.NotNil(t, stream.getState(1, 2)) + require.NotNil(t, stream.takeState(1, 2)) + require.Nil(t, stream.getState(1, 2)) +} + +func TestRequestedTable(t *testing.T) { + s := &SharedClient{resolveLockCh: chann.NewAutoDrainChann[resolveLockTask]()} + span := tablepb.Span{TableID: 1, StartKey: []byte{'a'}, EndKey: []byte{'z'}} + table := s.newRequestedTable(SubscriptionID(1), span, 100, nil) + s.totalSpans.v = make(map[SubscriptionID]*requestedTable) + s.totalSpans.v[SubscriptionID(1)] = table + s.pdClock = pdutil.NewClock4Test() + + // Lock a range, and then ResolveLock will trigger a task for it. + res := table.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 1, 100) + require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) + res.LockedRange.Initialzied.Store(true) + + s.ResolveLock(SubscriptionID(1), 200) + select { + case <-s.resolveLockCh.Out(): + case <-time.After(100 * time.Millisecond): + require.True(t, false, "must get a resolve lock task") + } + + // Lock another range, no task will be triggered before initialized. + res = table.rangeLock.LockRange(context.Background(), []byte{'c'}, []byte{'d'}, 2, 100) + require.Equal(t, regionlock.LockRangeStatusSuccess, res.Status) + state := newRegionFeedState(singleRegionInfo{lockedRange: res.LockedRange, requestedTable: table}, 1) + select { + case <-s.resolveLockCh.Out(): + require.True(t, false, "shouldn't get a resolve lock task") + case <-time.After(100 * time.Millisecond): + } + + // Task will be triggered after initialized. + state.setInitialized() + state.updateResolvedTs(101) + select { + case <-s.resolveLockCh.Out(): + case <-time.After(100 * time.Millisecond): + require.True(t, false, "must get a resolve lock task") + } + + s.resolveLockCh.CloseAndDrain() +} + +func TestConnectToOfflineOrFailedTiKV(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + events1 := make(chan *cdcpb.ChangeDataEvent, 10) + events2 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataServer(events1) + server1, addr1 := newMockService(ctx, t, srv1, wg) + srv2 := newMockChangeDataServer(events2) + server2, addr2 := newMockService(ctx, t, srv2, wg) + + rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler()) + + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + + grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}) + + regionCache := tikv.NewRegionCache(pdClient) + + pdClock := pdutil.NewClock4Test() + + kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + require.Nil(t, err) + lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{}) + + invalidStore := "localhost:1" + cluster.AddStore(1, addr1) + cluster.AddStore(2, addr2) + cluster.AddStore(3, invalidStore) + cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 6) + + client := NewSharedClient(model.ChangeFeedID{ID: "test"}, + &config.ServerConfig{KVClient: &config.KVClientConfig{WorkerConcurrent: 1, GrpcStreamConcurrent: 1}}, + false, pdClient, grpcPool, regionCache, pdClock, lockResolver) + + defer func() { + cancel() + client.Close() + _ = kvStorage.Close() + regionCache.Close() + pdClient.Close() + srv1.wg.Wait() + srv2.wg.Wait() + server1.Stop() + server2.Stop() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + err := client.Run(ctx) + require.Equal(t, context.Canceled, errors.Cause(err)) + }() + + subID := client.AllocSubscriptionID() + span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")} + eventCh := make(chan MultiplexingEvent, 50) + client.Subscribe(subID, span, 1, eventCh) + + makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent { + return &cdcpb.ChangeDataEvent{ + Events: []*cdcpb.Event{ + { + RegionId: regionID, + RequestId: requestID, + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts}, + }, + }, + } + } + + checkTsEvent := func(event model.RegionFeedEvent, ts uint64) { + require.Equal(t, ts, event.Resolved.ResolvedTs) + } + + events1 <- mockInitializedEvent(11, uint64(subID)) + ts := oracle.GoTimeToTS(pdClock.CurrentTime()) + events1 <- makeTsEvent(11, ts, uint64(subID)) + // After trying to receive something from the invalid store, + // it should auto switch to other stores and fetch events finally. + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } + + // Stop server1 and the client needs to handle it. + server1.Stop() + + events2 <- mockInitializedEvent(11, uint64(subID)) + ts = oracle.GoTimeToTS(pdClock.CurrentTime()) + events2 <- makeTsEvent(11, ts, uint64(subID)) + // After trying to receive something from a failed store, + // it should auto switch to other stores and fetch events finally. + select { + case event := <-eventCh: + checkTsEvent(event.RegionFeedEvent, ts) + case <-time.After(5 * time.Second): + require.True(t, false, "reconnection not succeed in 5 second") + } +} + +type mockChangeDataServer struct { + ch chan *cdcpb.ChangeDataEvent + wg sync.WaitGroup +} + +func newMockChangeDataServer(ch chan *cdcpb.ChangeDataEvent) *mockChangeDataServer { + return &mockChangeDataServer{ch: ch} +} + +func (m *mockChangeDataServer) EventFeed(s cdcpb.ChangeData_EventFeedServer) error { + closed := make(chan struct{}) + m.wg.Add(1) + go func() { + defer m.wg.Done() + defer close(closed) + for { + if _, err := s.Recv(); err != nil { + return + } + } + }() + m.wg.Add(1) + defer m.wg.Done() + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-closed: + return nil + case <-ticker.C: + } + select { + case event := <-m.ch: + if err := s.Send(event); err != nil { + return err + } + default: + } + } +} + +func (m *mockChangeDataServer) EventFeedV2(s cdcpb.ChangeData_EventFeedV2Server) error { + return m.EventFeed(s) +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index cdf8cdbeb3d..80343495281 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -787,6 +787,7 @@ func (p *processor) initDDLHandler(ctx context.Context) error { return errors.Trace(err) } +<<<<<<< HEAD kvCfg := config.GetGlobalServerConfig().KVClient ctx = contextutil.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName) ddlPuller, err := puller.NewDDLJobPuller( @@ -801,6 +802,13 @@ func (p *processor) initDDLHandler(ctx context.Context) error { p.changefeedID, schemaStorage, f, +======= + serverCfg := config.GetGlobalServerConfig() + ddlPuller, err := puller.NewDDLJobPuller( + ctx, p.upstream, ddlStartTs, + serverCfg, p.changefeedID, schemaStorage, + f, false, /* isOwner */ +>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771)) ) if err != nil { return errors.Trace(err) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 1ef797dda19..f8005bb6270 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -159,7 +159,28 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { +<<<<<<< HEAD m.ctx = ctx +======= + if m.multiplexing { + serverConfig := config.GetGlobalServerConfig() + grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig) + client := kv.NewSharedClient( + m.changefeedID, serverConfig, m.bdrMode, + m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock, + txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID), + ) + m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper( + m.changefeedID, client, m.engine, + int(serverConfig.KVClient.FrontierConcurrent), + ) + + close(m.ready) + return m.multiplexingPuller.puller.Run(ctx) + } + + m.tablePullers.ctx = ctx +>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771)) close(m.ready) select { case err := <-m.errChan: diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index c9143a7c64c..8861dacccac 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -103,7 +103,7 @@ func (n *WrapperImpl) Start( up.PDClock, n.startTs, []tablepb.Span{n.span}, - config.GetGlobalServerConfig().KVClient, + config.GetGlobalServerConfig(), n.changefeed, n.span.TableID, n.tableName, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index b808a3e04b2..d80fbb8913b 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -529,7 +529,7 @@ func NewDDLJobPuller( kvStorage tidbkv.Storage, pdClock pdutil.Clock, checkpointTs uint64, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, schemaStorage entry.SchemaStorage, filter filter.Filter, @@ -540,6 +540,12 @@ func NewDDLJobPuller( } return &ddlJobPullerImpl{ changefeedID: changefeed, +<<<<<<< HEAD +======= + multiplexing: cfg.KVClient.EnableMultiplexing, + schemaStorage: schemaStorage, + kvStorage: kvStorage, +>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771)) filter: filter, schemaStorage: schemaStorage, puller: New( @@ -607,6 +613,7 @@ func NewDDLPuller(ctx context.Context, // storage can be nil only in the test if storage != nil { puller, err = NewDDLJobPuller( +<<<<<<< HEAD ctx, up.PDClient, up.GrpcPool, @@ -618,6 +625,11 @@ func NewDDLPuller(ctx context.Context, changefeed, schemaStorage, filter, +======= + ctx, up, startTs, config.GetGlobalServerConfig(), + changefeed, schemaStorage, filter, + true, /* isOwner */ +>>>>>>> 43848f2fb5 (kv(ticdc): remove backoff from newStream func (#9771)) ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index cd4a2caca9e..7853275a287 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -83,7 +83,7 @@ func New(ctx context.Context, pdClock pdutil.Clock, checkpointTs uint64, spans []tablepb.Span, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 1845559c930..4291ce30675 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -64,7 +64,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -132,7 +132,7 @@ func newPullerForTest( defer regionCache.Close() plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), - checkpointTs, spans, config.GetDefaultServerConfig().KVClient, + checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false, false) wg.Add(1) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 32ffd71357b..cb859db9d97 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -147,7 +147,8 @@ const ( "max-task-concurrency": 10, "check-balance-interval": 60000000000, "add-table-batch-size": 50 - } + }, + "enable-kv-connect-backoff": false }, "cluster-id": "default", "max-memory-percentage": 70 diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 7e91188dafa..3a7815090bc 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -25,6 +25,9 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + + // EnableKVConnectBackOff enables the backoff for kv connect. + EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` } // ValidateAndAdjust validates and adjusts the debug configuration diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 7e00b4bb209..7707c903f46 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -138,7 +138,8 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), + Scheduler: NewDefaultSchedulerConfig(), + EnableKVConnectBackOff: false, }, ClusterID: "default", MaxMemoryPercentage: DefaultMaxMemoryPercentage,