From 645d825cd36d20626aa5ac220d946efd79533e9d Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 22 Nov 2023 22:30:11 +0800 Subject: [PATCH] kv-client(cdc): add more logs and metrics for kv-client (#10135) Signed-off-by: qupeng Co-authored-by: Ti Chi Robot --- cdc/kv/client.go | 398 +++++++++++------- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 54 +-- cdc/kv/region_state.go | 91 ++-- cdc/kv/region_state_bench_test.go | 12 +- cdc/kv/region_worker.go | 247 +++++------ cdc/kv/region_worker_test.go | 44 +- cdc/kv/regionlock/region_range_lock.go | 148 ++++++- cdc/kv/regionlock/region_range_lock_test.go | 8 +- cdc/processor/processor.go | 4 +- .../sourcemanager/puller/puller_wrapper.go | 2 +- cdc/puller/ddl_puller.go | 9 +- cdc/puller/puller.go | 2 +- cdc/puller/puller_test.go | 4 +- pkg/config/config_test_data.go | 3 +- pkg/config/debug.go | 3 + pkg/config/server_config.go | 3 +- 17 files changed, 634 insertions(+), 402 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index ab3e3f33839..a419c787ecd 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -19,6 +19,7 @@ import ( "io" "math/rand" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/version" "github.com/prometheus/client_golang/prometheus" tidbkv "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -156,7 +158,7 @@ var NewCDCKVClient = NewCDCClient type CDCClient struct { pd pd.Client - config *config.KVClientConfig + config *config.ServerConfig clusterID uint64 grpcPool GrpcPool @@ -192,7 +194,7 @@ func NewCDCClient( grpcPool GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -218,7 +220,7 @@ func NewCDCClient( } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { - newStreamErr = retry.Do(ctx, func() (err error) { + streamFunc := func() (err error) { var conn *sharedConn defer func() { if err != nil && conn != nil { @@ -248,10 +250,16 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) zap.String("changefeed", c.changefeed.ID), zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), - retry.WithMaxTries(2), - retry.WithIsRetryableErr(cerror.IsRetryableError), - ) + } + if c.config.Debug.EnableKVConnectBackOff { + newStreamErr = retry.Do(ctx, streamFunc, + retry.WithBackoffBaseDelay(100), + retry.WithMaxTries(2), + retry.WithIsRetryableErr(cerror.IsRetryableError), + ) + return + } + newStreamErr = streamFunc() return } @@ -265,7 +273,7 @@ func (c *CDCClient) EventFeed( eventCh chan<- model.RegionFeedEvent, ) error { s := newEventFeedSession(c, span, lockResolver, ts, eventCh) - return s.eventFeed(ctx, ts) + return s.eventFeed(ctx) } // RegionCount returns the number of captured regions. @@ -359,7 +367,6 @@ type eventFeedSession struct { type rangeRequestTask struct { span tablepb.Span - ts uint64 } func newEventFeedSession( @@ -369,9 +376,10 @@ func newEventFeedSession( startTs uint64, eventCh chan<- model.RegionFeedEvent, ) *eventFeedSession { - id := strconv.FormatUint(allocID(), 10) + id := allocID() + idStr := strconv.FormatUint(id, 10) rangeLock := regionlock.NewRegionRangeLock( - totalSpan.StartKey, totalSpan.EndKey, startTs, + id, totalSpan.StartKey, totalSpan.EndKey, startTs, client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ client: client, @@ -384,7 +392,7 @@ func newEventFeedSession( eventCh: eventCh, rangeLock: rangeLock, lockResolver: lockResolver, - id: id, + id: idStr, regionChSizeGauge: clientChannelSize.WithLabelValues("region"), errChSizeGauge: clientChannelSize.WithLabelValues("err"), rangeChSizeGauge: clientChannelSize.WithLabelValues("range"), @@ -400,7 +408,7 @@ func newEventFeedSession( } } -func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { +func (s *eventFeedSession) eventFeed(ctx context.Context) error { s.requestRangeCh = chann.NewAutoDrainChann[rangeRequestTask]() s.regionCh = chann.NewAutoDrainChann[singleRegionInfo]() s.regionRouter = chann.NewAutoDrainChann[singleRegionInfo]() @@ -417,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return s.dispatchRequest(ctx) - }) + g.Go(func() error { return s.dispatchRequest(ctx) }) - g.Go(func() error { - return s.requestRegionToStore(ctx, g) - }) + g.Go(func() error { return s.requestRegionToStore(ctx, g) }) + + g.Go(func() error { return s.logSlowRegions(ctx) }) g.Go(func() error { for { @@ -441,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { // Besides the count or frequency of range request is limited, // we use ephemeral goroutine instead of permanent goroutine. g.Go(func() error { - return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts) + return s.divideAndSendEventFeedToRegions(ctx, task.span) }) } } @@ -462,7 +468,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) - s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts} + s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan} s.rangeChSizeGauge.Inc() log.Info("event feed started", @@ -470,7 +476,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.Uint64("startTs", ts), + zap.Uint64("startTs", s.startTs), zap.Stringer("span", &s.totalSpan)) return g.Wait() @@ -479,9 +485,9 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { // scheduleDivideRegionAndRequest schedules a range to be divided by regions, // and these regions will be then scheduled to send ChangeData requests. func (s *eventFeedSession) scheduleDivideRegionAndRequest( - ctx context.Context, span tablepb.Span, ts uint64, + ctx context.Context, span tablepb.Span, ) { - task := rangeRequestTask{span: span, ts: ts} + task := rangeRequestTask{span: span} select { case s.requestRangeCh.In() <- task: s.rangeChSizeGauge.Inc() @@ -495,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single handleResult := func(res regionlock.LockRangeResult) { switch res.Status { case regionlock.LockRangeStatusSuccess: - sri.resolvedTs = res.CheckpointTs + sri.lockedRange = res.LockedRange select { case s.regionCh.In() <- sri: s.regionChSizeGauge.Inc() @@ -507,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single zap.String("changefeed", s.changefeed.ID), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs), zap.Any("retrySpans", res.RetryRanges)) for _, r := range res.RetryRanges { // This call is always blocking, otherwise if scheduling in a new // goroutine, it won't block the caller of `schedulerRegionRequest`. - s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, r) } case regionlock.LockRangeStatusCancel: return @@ -523,11 +528,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single res := s.rangeLock.LockRange( ctx, sri.span.StartKey, sri.span.EndKey, sri.verID.GetID(), sri.verID.GetVer()) + failpoint.Inject("kvClientMockRangeLock", func(val failpoint.Value) { // short sleep to wait region has split time.Sleep(time.Second) s.rangeLock.UnlockRange(sri.span.StartKey, sri.span.EndKey, - sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs) + sri.verID.GetID(), sri.verID.GetVer()) regionNum := val.(int) retryRanges := make([]tablepb.Span, 0, regionNum) start := []byte("a") @@ -555,8 +561,15 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // CAUTION: Note that this should only be called in a context that the region has locked its range. func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) { s.rangeLock.UnlockRange(errorInfo.span.StartKey, errorInfo.span.EndKey, - errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs) - s.enqueueError(ctx, errorInfo) + errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs()) + log.Info("region failed", zap.Stringer("span", &errorInfo.span), + zap.Any("regionId", errorInfo.verID.GetID()), + zap.Error(errorInfo.err)) + select { + case s.errCh.In() <- errorInfo: + s.errChSizeGauge.Inc() + case <-ctx.Done(): + } } // requestRegionToStore gets singleRegionInfo from regionRouter, which is a token @@ -598,7 +611,7 @@ func (s *eventFeedSession) requestRegionToStore( RegionId: regionID, RequestId: requestID, RegionEpoch: regionEpoch, - CheckpointTs: sri.resolvedTs, + CheckpointTs: sri.resolvedTs(), StartKey: sri.span.StartKey, EndKey: sri.span.EndKey, ExtraOp: extraOp, @@ -658,7 +671,7 @@ func (s *eventFeedSession) requestRegionToStore( g.Go(func() error { defer s.deleteStream(storeAddr) - return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions) + return s.receiveFromStream(ctx, storeAddr, storeID, stream.client, pendingRegions) }) } @@ -676,13 +689,13 @@ func (s *eventFeedSession) requestRegionToStore( state := newRegionFeedState(sri, requestID) pendingRegions.setByRequestID(requestID, state) - log.Debug("start new request", + log.Info("start new request", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), - zap.String("addr", storeAddr), - zap.Any("request", req)) + zap.Uint64("regionID", sri.verID.GetID()), + zap.String("addr", storeAddr)) err = stream.client.Send(req) @@ -769,7 +782,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { Region: sri.verID.GetID(), }, }, - ResolvedTs: sri.resolvedTs, + ResolvedTs: sri.resolvedTs(), }, } select { @@ -791,7 +804,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { zap.String("tableName", s.tableName), zap.Uint64("regionID", sri.verID.GetID()), zap.Stringer("span", &sri.span), - zap.Uint64("resolvedTs", sri.resolvedTs)) + zap.Uint64("resolvedTs", sri.resolvedTs())) errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) s.onRegionFail(ctx, errInfo) continue @@ -805,7 +818,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { // to region boundaries. When region merging happens, it's possible that it // will produce some overlapping spans. func (s *eventFeedSession) divideAndSendEventFeedToRegions( - ctx context.Context, span tablepb.Span, ts uint64, + ctx context.Context, span tablepb.Span, ) error { limit := 20 nextSpan := span @@ -836,7 +849,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } return nil }, retry.WithBackoffMaxDelay(500), - retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration))) + retry.WithTotalRetryDuratoin(time.Duration(s.client.config.KVClient.RegionRetryDuration))) if retryErr != nil { log.Warn("load regions failed", zap.String("namespace", s.changefeed.Namespace), @@ -857,7 +870,7 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( // the End key return by the PD API will be nil to represent the biggest key, partialSpan = spanz.HackSpan(partialSpan) - sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil) + sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil) s.scheduleRegionRequest(ctx, sri) // return if no more regions if spanz.EndCompare(nextSpan.StartKey, span.EndKey) >= 0 { @@ -867,16 +880,6 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( } } -// enqueueError sends error to the eventFeedSession's error channel in a none blocking way -// TODO: refactor enqueueError to avoid too many goroutines spawned when a lot of regions meet error. -func (s *eventFeedSession) enqueueError(ctx context.Context, errorInfo regionErrorInfo) { - select { - case s.errCh.In() <- errorInfo: - s.errChSizeGauge.Inc() - case <-ctx.Done(): - } -} - // handleError handles error returned by a region. If some new EventFeed connection should be established, the region // info will be sent to `regionCh`. Note if region channel is full, this function will be blocked. // CAUTION: Note that this should only be invoked in a context that the region is not locked, otherwise use onRegionFail @@ -886,17 +889,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI switch eerr := errors.Cause(err).(type) { case *eventError: innerErr := eerr.err + log.Info("cdc region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Stringer("error", innerErr)) + if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. metricFeedEpochNotMatchCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil } else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil { metricFeedDuplicateRequestCounter.Inc() @@ -926,7 +936,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI } case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs) + s.scheduleDivideRegionAndRequest(ctx, errInfo.span) return nil case *connectToStoreErr: metricConnectToStoreErr.Inc() @@ -964,8 +974,7 @@ func (s *eventFeedSession) getRPCContextForRegion(ctx context.Context, id tikv.R // 2. pending regions: call `s.onRegionFail` for each pending region before this // routine exits to establish these regions. func (s *eventFeedSession) receiveFromStream( - ctx context.Context, - g *errgroup.Group, + parentCtx context.Context, addr string, storeID uint64, stream cdcpb.ChangeData_EventFeedClient, @@ -994,7 +1003,7 @@ func (s *eventFeedSession) receiveFromStream( remainingRegions := pendingRegions.takeAll() for _, state := range remainingRegions { errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) - s.onRegionFail(ctx, errInfo) + s.onRegionFail(parentCtx, errInfo) } }() @@ -1003,121 +1012,147 @@ func (s *eventFeedSession) receiveFromStream( // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic - worker := newRegionWorker(s.changefeed, s, addr) - + worker := newRegionWorker(parentCtx, s.changefeed, s, addr) defer worker.evictAllRegions() - g.Go(func() error { - return worker.run(ctx) + ctx, cancel := context.WithCancel(parentCtx) + var retErr error + once := sync.Once{} + handleExit := func(err error) error { + once.Do(func() { + cancel() + retErr = err + }) + return err + } + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + err := handleExit(worker.run()) + if err != nil { + log.Error("region worker exited with error", zap.Error(err), + zap.Any("changefeed", s.changefeed), + zap.Any("addr", addr), + zap.Any("storeID", storeID)) + } + return err }) - maxCommitTs := model.Ts(0) - for { - cevent, err := stream.Recv() + receiveEvents := func() error { + maxCommitTs := model.Ts(0) + for { + cevent, err := stream.Recv() - failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { - if op.(string) == "error" { - _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) - } - }) - failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { - errStr := msg.(string) - if errStr == io.EOF.Error() { - err = io.EOF - } else { - err = errors.New(errStr) - } - }) - if err != nil { - if status.Code(errors.Cause(err)) == codes.Canceled { - log.Debug( - "receive from stream canceled", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.String("addr", addr), - zap.Uint64("storeID", storeID), - ) - } else { - log.Warn( - "failed to receive from stream", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.String("addr", addr), - zap.Uint64("storeID", storeID), - zap.Error(err), - ) - // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down - // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new - // election - } + failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { + if op.(string) == "error" { + _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) + } + }) + failpoint.Inject("kvClientStreamRecvError", func(msg failpoint.Value) { + errStr := msg.(string) + if errStr == io.EOF.Error() { + err = io.EOF + } else { + err = errors.New(errStr) + } + }) + if err != nil { + if status.Code(errors.Cause(err)) == codes.Canceled { + log.Debug( + "receive from stream canceled", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.String("addr", addr), + zap.Uint64("storeID", storeID), + ) + } else { + log.Warn( + "failed to receive from stream", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.String("addr", addr), + zap.Uint64("storeID", storeID), + zap.Error(err), + ) + // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down + // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new + // election + } - // Use the same delay mechanism as `stream.Send` error handling, since - // these two errors often mean upstream store suffers an accident, which - // needs time to recover, kv client doesn't need to retry frequently. - // TODO: add a better retry backoff or rate limitter - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + // Use the same delay mechanism as `stream.Send` error handling, since + // these two errors often mean upstream store suffers an accident, which + // needs time to recover, kv client doesn't need to retry frequently. + // TODO: add a better retry backoff or rate limitter + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - // TODO: better to closes the send direction of the stream to notify - // the other side, but it is not safe to call CloseSend concurrently - // with SendMsg, in future refactor we should refine the recv loop - s.deleteStream(addr) + // TODO: better to closes the send direction of the stream to notify + // the other side, but it is not safe to call CloseSend concurrently + // with SendMsg, in future refactor we should refine the recv loop + s.deleteStream(addr) - // send nil regionStatefulEvent to signal worker exit - err = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) - if err != nil { - return err - } + // send nil regionStatefulEvent to signal worker exit + err = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) + if err != nil { + return err + } - // Do no return error but gracefully stop the goroutine here. Then the whole job will not be canceled and - // connection will be retried. - return nil - } + // Do no return error but gracefully stop the goroutine here. Then the whole job will not be canceled and + // connection will be retried. + return nil + } - size := cevent.Size() - if size > warnRecvMsgSizeThreshold { - regionCount := 0 - if cevent.ResolvedTs != nil { - regionCount = len(cevent.ResolvedTs.Regions) + size := cevent.Size() + if size > warnRecvMsgSizeThreshold { + regionCount := 0 + if cevent.ResolvedTs != nil { + regionCount = len(cevent.ResolvedTs.Regions) + } + log.Warn("change data event size too large", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)), + zap.Int("resolvedRegionCount", regionCount)) } - log.Warn("change data event size too large", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)), - zap.Int("resolvedRegionCount", regionCount)) - } - if len(cevent.Events) != 0 { - if entries, ok := cevent.Events[0].Event.(*cdcpb.Event_Entries_); ok { - commitTs := entries.Entries.Entries[0].CommitTs - if maxCommitTs < commitTs { - maxCommitTs = commitTs + if len(cevent.Events) != 0 { + if entries, ok := cevent.Events[0].Event.(*cdcpb.Event_Entries_); ok && len(entries.Entries.Entries) > 0 { + commitTs := entries.Entries.Entries[0].CommitTs + if maxCommitTs < commitTs { + maxCommitTs = commitTs + } } } - } - err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) - if err != nil { - return err - } - if cevent.ResolvedTs != nil { - metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions))) - err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker) + err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) if err != nil { return err } - // NOTE(qupeng): what if all regions are removed from the store? - // TiKV send resolved ts events every second by default. - // We check and update region count here to save CPU. - tsStat.regionCount.Store(uint64(worker.statesManager.regionCount())) - tsStat.resolvedTs.Store(cevent.ResolvedTs.Ts) - if maxCommitTs == 0 { - // In case, there is no write for the table, - // we use resolved ts as maxCommitTs to make the stats meaningful. - tsStat.commitTs.Store(cevent.ResolvedTs.Ts) - } else { - tsStat.commitTs.Store(maxCommitTs) + if cevent.ResolvedTs != nil { + metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions))) + err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker) + if err != nil { + return err + } + // NOTE(qupeng): what if all regions are removed from the store? + // TiKV send resolved ts events every second by default. + // We check and update region count here to save CPU. + tsStat.regionCount.Store(uint64(worker.statesManager.regionCount())) + tsStat.resolvedTs.Store(cevent.ResolvedTs.Ts) + if maxCommitTs == 0 { + // In case, there is no write for the table, + // we use resolved ts as maxCommitTs to make the stats meaningful. + tsStat.commitTs.Store(cevent.ResolvedTs.Ts) + } else { + tsStat.commitTs.Store(maxCommitTs) + } } } } + eg.Go(func() error { + return handleExit(receiveEvents()) + }) + + _ = eg.Wait() + return retErr } func (s *eventFeedSession) sendRegionChangeEvents( @@ -1178,7 +1213,7 @@ func (s *eventFeedSession) sendRegionChangeEvents( } state.start() worker.setRegionState(event.RegionId, state) - } else if state.isStopped() { + } else if state.isStale() { log.Warn("drop event due to region feed stopped", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), @@ -1188,6 +1223,17 @@ func (s *eventFeedSession) sendRegionChangeEvents( continue } + switch x := event.Event.(type) { + case *cdcpb.Event_Error: + log.Info("event feed receives a region error", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Uint64("regionID", event.RegionId), + zap.Any("error", x.Error)) + } + slot := worker.inputCalcSlot(event.RegionId) statefulEvents[slot] = append(statefulEvents[slot], ®ionStatefulEvent{ changeEvent: event, @@ -1280,6 +1326,54 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can return } +func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + currTime, err := s.client.pdClock.CurrentTime() + if err != nil { + continue + } + attr := s.rangeLock.CollectLockedRangeAttrs(nil) + if attr.SlowestRegion.Initialized { + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + if currTime.Sub(ckptTime) > 20*time.Second { + log.Info("event feed finds a slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { + log.Info("event feed initializes a region too slow", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) + } + if len(attr.Holes) > 0 { + holes := make([]string, 0, len(attr.Holes)) + for _, hole := range attr.Holes { + holes = append(holes, fmt.Sprintf("[%s,%s)", hole.StartKey, hole.EndKey)) + } + log.Info("event feed holes exist", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.String("holes", strings.Join(holes, ", "))) + } + } +} + func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index ce97494ab33..fd813903df2 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -196,7 +196,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { @@ -290,7 +290,7 @@ func prepareBench(b *testing.B, regionNum int) ( defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 1000000) wg.Add(1) go func() { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index b1862dec8d5..9cc31215b14 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -72,7 +72,7 @@ func TestNewClient(t *testing.T) { defer regionCache.Close() cli := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, model.DefaultChangeFeedID(""), 0, "", false) + config.GetDefaultServerConfig(), model.DefaultChangeFeedID(""), 0, "", false) require.NotNil(t, cli) } @@ -320,7 +320,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( context.Background(), pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // Take care of the eventCh, it's used to output resolvedTs event or kv event // It will stuck the normal routine eventCh := make(chan model.RegionFeedEvent, 50) @@ -422,7 +422,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -522,7 +522,7 @@ func TestHandleError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -681,7 +681,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup wg2.Add(1) @@ -748,7 +748,7 @@ func TestClusterIDMismatch(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var wg2 sync.WaitGroup @@ -817,7 +817,7 @@ func testHandleFeedEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1278,7 +1278,7 @@ func TestStreamSendWithError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1390,7 +1390,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1523,7 +1523,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -1732,7 +1732,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) // NOTICE: eventCh may block the main logic of EventFeed eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) @@ -1809,7 +1809,7 @@ func TestNoPendingRegionError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) @@ -1888,7 +1888,7 @@ func TestDropStaleRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2002,7 +2002,7 @@ func TestResolveLock(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2108,7 +2108,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) var clientWg sync.WaitGroup clientWg.Add(1) @@ -2236,7 +2236,7 @@ func testEventAfterFeedStop(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2423,7 +2423,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2641,7 +2641,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2737,7 +2737,7 @@ func TestFailRegionReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2820,7 +2820,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2888,7 +2888,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -2966,7 +2966,7 @@ func testKVClientForceReconnect(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3117,7 +3117,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 100) wg.Add(1) go func() { @@ -3234,7 +3234,7 @@ func TestEvTimeUpdate(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3360,7 +3360,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) wg.Add(1) go func() { @@ -3452,7 +3452,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer regionCache.Close() cdcClient := NewCDCClient( ctx, pdClient, grpcPool, regionCache, pdutil.NewClock4Test(), - config.GetDefaultServerConfig().KVClient, changefeed, 0, "", false) + config.GetDefaultServerConfig(), changefeed, 0, "", false) eventCh := make(chan model.RegionFeedEvent, 50) baseAllocatedID := currentRequestID() @@ -3534,7 +3534,7 @@ func TestPrewriteNotMatchError(t *testing.T) { func createFakeEventFeedSession() *eventFeedSession { return newEventFeedSession( - &CDCClient{config: config.GetDefaultServerConfig().KVClient}, + &CDCClient{config: config.GetDefaultServerConfig()}, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 52113f6c970..520c6366e8a 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -16,9 +16,8 @@ package kv import ( "runtime" "sync" - "sync/atomic" - "time" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/tikv/client-go/v2/tikv" ) @@ -26,68 +25,84 @@ import ( const ( minRegionStateBucket = 4 maxRegionStateBucket = 16 + + stateNormal uint32 = 0 + stateStopped uint32 = 1 + stateRemoved uint32 = 2 ) type singleRegionInfo struct { - verID tikv.RegionVerID - span tablepb.Span - resolvedTs uint64 - rpcCtx *tikv.RPCContext + verID tikv.RegionVerID + span tablepb.Span + rpcCtx *tikv.RPCContext + + lockedRange *regionlock.LockedRange } func newSingleRegionInfo( verID tikv.RegionVerID, span tablepb.Span, - ts uint64, rpcCtx *tikv.RPCContext, ) singleRegionInfo { return singleRegionInfo{ - verID: verID, - span: span, - resolvedTs: ts, - rpcCtx: rpcCtx, + verID: verID, + span: span, + rpcCtx: rpcCtx, } } +func (s singleRegionInfo) resolvedTs() uint64 { + return s.lockedRange.CheckpointTs.Load() +} + type regionFeedState struct { sri singleRegionInfo requestID uint64 - stopped int32 - - initialized atomic.Bool - matcher *matcher - startFeedTime time.Time - lastResolvedTs uint64 + matcher *matcher + + // Transform: normal -> stopped -> removed. + // normal: the region is in replicating. + // stopped: some error happens. + // removed: the region is returned into the pending list, + // will be re-resolved and re-scheduled later. + state struct { + sync.RWMutex + v uint32 + } } func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState { return ®ionFeedState{ sri: sri, requestID: requestID, - stopped: 0, } } func (s *regionFeedState) start() { - s.startFeedTime = time.Now() - s.lastResolvedTs = s.sri.resolvedTs s.matcher = newMatcher() } +// mark regionFeedState as stopped with the given error if possible. func (s *regionFeedState) markStopped() { - atomic.StoreInt32(&s.stopped, 1) + s.state.Lock() + defer s.state.Unlock() + if s.state.v == stateNormal { + s.state.v = stateStopped + } } -func (s *regionFeedState) isStopped() bool { - return atomic.LoadInt32(&s.stopped) > 0 +func (s *regionFeedState) isStale() bool { + s.state.RLock() + defer s.state.RUnlock() + return s.state.v == stateStopped || s.state.v == stateRemoved } func (s *regionFeedState) isInitialized() bool { - return s.initialized.Load() + return s.sri.lockedRange.Initialzied.Load() } func (s *regionFeedState) setInitialized() { - s.initialized.Store(true) + s.sri.lockedRange.Initialzied.Store(true) } func (s *regionFeedState) getRegionID() uint64 { @@ -95,31 +110,29 @@ func (s *regionFeedState) getRegionID() uint64 { } func (s *regionFeedState) getLastResolvedTs() uint64 { - return atomic.LoadUint64(&s.lastResolvedTs) + return s.sri.lockedRange.CheckpointTs.Load() } // updateResolvedTs update the resolved ts of the current region feed func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) { - if resolvedTs > s.getLastResolvedTs() { - atomic.StoreUint64(&s.lastResolvedTs, resolvedTs) - } -} - -// setRegionInfoResolvedTs is only called when the region disconnect, -// to update the `singleRegionInfo` which is reused by reconnect. -func (s *regionFeedState) setRegionInfoResolvedTs() { - if s.getLastResolvedTs() <= s.sri.resolvedTs { - return + state := s.sri.lockedRange + for { + last := state.CheckpointTs.Load() + if last > resolvedTs { + return + } + if state.CheckpointTs.CompareAndSwap(last, resolvedTs) { + break + } } - s.sri.resolvedTs = s.lastResolvedTs } func (s *regionFeedState) getRegionInfo() singleRegionInfo { return s.sri } -func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, time.Time, string) { - return s.sri.verID.GetID(), s.sri.span, s.startFeedTime, s.sri.rpcCtx.Addr +func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, string) { + return s.sri.verID.GetID(), s.sri.span, s.sri.rpcCtx.Addr } type syncRegionFeedStateMap struct { diff --git a/cdc/kv/region_state_bench_test.go b/cdc/kv/region_state_bench_test.go index 5b9fe592080..250f265d28c 100644 --- a/cdc/kv/region_state_bench_test.go +++ b/cdc/kv/region_state_bench_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/pkg/spanz" "github.com/tikv/client-go/v2/tikv" ) @@ -40,9 +41,9 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { return default: } - m.setByRequestID(1, ®ionFeedState{}) - m.setByRequestID(2, ®ionFeedState{}) - m.setByRequestID(3, ®ionFeedState{}) + m.setByRequestID(1, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) + m.setByRequestID(2, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) + m.setByRequestID(3, ®ionFeedState{sri: singleRegionInfo{lockedRange: ®ionlock.LockedRange{}}}) } }() wg.Add(1) @@ -55,7 +56,7 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) { default: } m.iter(func(requestID uint64, state *regionFeedState) bool { - _ = state.initialized.Load() + state.isInitialized() return true }) } @@ -118,7 +119,8 @@ func benchmarkGetRegionState(b *testing.B, bench func(b *testing.B, sm regionSta state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionlock.LockedRange{} regionCount := []int{100, 1000, 10000, 20000, 40000, 80000, 160000, 320000} for _, count := range regionCount { diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 6493cf9b535..5a3771d65cb 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -61,19 +61,18 @@ const ( ) type regionWorkerMetrics struct { - // kv events related metrics - metricReceivedEventSize prometheus.Observer - metricDroppedEventSize prometheus.Observer + metricReceivedEventSize prometheus.Observer + metricDroppedEventSize prometheus.Observer + metricPullEventInitializedCounter prometheus.Counter + metricPullEventCommittedCounter prometheus.Counter metricPullEventPrewriteCounter prometheus.Counter metricPullEventCommitCounter prometheus.Counter - metricPullEventCommittedCounter prometheus.Counter metricPullEventRollbackCounter prometheus.Counter - metricSendEventResolvedCounter prometheus.Counter - metricSendEventCommitCounter prometheus.Counter - metricSendEventCommittedCounter prometheus.Counter - // TODO: add region runtime related metrics + metricSendEventResolvedCounter prometheus.Counter + metricSendEventCommitCounter prometheus.Counter + metricSendEventCommittedCounter prometheus.Counter } /* @@ -114,22 +113,22 @@ type regionWorker struct { inputPending int32 } -func newRegionWorker( - changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, -) *regionWorker { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") + metrics.metricPullEventInitializedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventCommittedCounter = pullEventCounter. WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.Namespace, changefeedID.ID) - metrics.metricPullEventCommitCounter = pullEventCounter. - WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventPrewriteCounter = pullEventCounter. WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricPullEventCommitCounter = pullEventCounter. + WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID) metrics.metricPullEventRollbackCounter = pullEventCounter. WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.Namespace, changefeedID.ID) + metrics.metricSendEventResolvedCounter = sendEventCounter. WithLabelValues("native-resolved", changefeedID.Namespace, changefeedID.ID) metrics.metricSendEventCommitCounter = sendEventCounter. @@ -137,7 +136,14 @@ func newRegionWorker( metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) + return metrics +} + +func newRegionWorker( + ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string, +) *regionWorker { return ®ionWorker{ + parentCtx: ctx, session: s, inputCh: make(chan []*regionStatefulEvent, regionWorkerInputChanSize), outputCh: s.eventCh, @@ -146,8 +152,8 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), storeAddr: addr, - concurrency: s.client.config.WorkerConcurrent, - metrics: metrics, + concurrency: s.client.config.KVClient.WorkerConcurrent, + metrics: newRegionWorkerMetrics(changefeedID), inputPending: 0, } } @@ -192,21 +198,22 @@ func (w *regionWorker) checkShouldExit() error { } func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState) error { - state.setRegionInfoResolvedTs() regionID := state.getRegionID() + isStale := state.isStale() log.Info("single region event feed disconnected", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID), zap.Uint64("regionID", regionID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.sri.span), - zap.Uint64("resolvedTs", state.sri.resolvedTs), + zap.Uint64("resolvedTs", state.sri.resolvedTs()), + zap.Bool("isStale", isStale), zap.Error(err)) // if state is already marked stopped, it must have been or would be processed by `onRegionFail` - if state.isStopped() { + if isStale { return w.checkShouldExit() } - // We need to ensure when the error is handled, `isStopped` must be set. So set it before sending the error. + // We need to ensure when the error is handled, `isStale` must be set. So set it before sending the error. state.markStopped() w.delRegionState(regionID) failpoint.Inject("kvClientSingleFeedProcessDelay", nil) @@ -294,7 +301,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0) for _, rts := range expired { state, ok := w.getRegionState(rts.regionID) - if !ok || state.isStopped() { + if !ok || state.isStale() { // state is already deleted or stopped, just continue, // and don't need to push resolved ts back to heap. continue @@ -335,7 +342,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { zap.Uint64("resolvedTs", lastResolvedTs), ) } - err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) + err := w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), @@ -354,6 +361,12 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { } func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEvent) error { + // event.state is nil when resolvedTsEvent is not nil + skipEvent := event.state != nil && event.state.isStale() + if skipEvent { + return nil + } + if event.finishedCallbackCh != nil { event.finishedCallbackCh <- struct{}{} return nil @@ -416,41 +429,40 @@ func (w *regionWorker) onHandleExit(err error) { } func (w *regionWorker) eventHandler(ctx context.Context) error { - preprocess := func(event *regionStatefulEvent, ok bool) ( - exitEventHandler bool, - skipEvent bool, - ) { - // event == nil means the region worker should exit and re-establish - // all existing regions. - if !ok || event == nil { + pollEvents := func() ([]*regionStatefulEvent, error) { + exitFn := func() error { log.Info("region worker closed by error", zap.String("namespace", w.session.client.changefeed.Namespace), zap.String("changefeed", w.session.client.changefeed.ID)) - exitEventHandler = true - return + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } - // event.state is nil when resolvedTsEvent is not nil - if event.state != nil && event.state.isStopped() { - skipEvent = true - } - return - } - pollEvents := func() (events []*regionStatefulEvent, ok bool, err error) { + select { case <-ctx.Done(): - err = errors.Trace(ctx.Err()) - case err = <-w.errorCh: - case events, ok = <-w.inputCh: - if ok && len(events) == 0 { + return nil, errors.Trace(ctx.Err()) + case err := <-w.errorCh: + return nil, errors.Trace(err) + case events, ok := <-w.inputCh: + if !ok { + return nil, exitFn() + } + if len(events) == 0 { log.Panic("regionWorker.inputCh doesn't accept empty slice") } + for _, event := range events { + // event == nil means the region worker should exit and re-establish + // all existing regions. + if event == nil { + return nil, exitFn() + } + } + return events, nil } - return } highWatermarkMet := false for { - events, ok, err := pollEvents() + events, err := pollEvents() if err != nil { return err } @@ -504,15 +516,9 @@ func (w *regionWorker) eventHandler(ctx context.Context) error { // throughput. Otherwise, we process event in local region worker to // ensure low processing latency. for _, event := range events { - exitEventHandler, skipEvent := preprocess(event, ok) - if exitEventHandler { - return cerror.ErrRegionWorkerExit.GenWithStackByArgs() - } - if !skipEvent { - err = w.processEvent(ctx, event) - if err != nil { - return err - } + err = w.processEvent(ctx, event) + if err != nil { + return err } } } @@ -574,14 +580,13 @@ func (w *regionWorker) cancelStream(delay time.Duration) { } } -func (w *regionWorker) run(parentCtx context.Context) error { +func (w *regionWorker) run() error { defer func() { for _, h := range w.handles { h.Unregister() } }() - w.parentCtx = parentCtx - ctx, cancel := context.WithCancel(parentCtx) + ctx, cancel := context.WithCancel(w.parentCtx) wg, ctx := errgroup.WithContext(ctx) w.initPoolHandles() @@ -617,52 +622,50 @@ func (w *regionWorker) handleEventEntry( x *cdcpb.Event_Entries_, state *regionFeedState, ) error { - regionID, regionSpan, startTime, _ := state.getRegionMeta() + emit := func(assembled model.RegionFeedEvent) bool { + select { + case w.outputCh <- assembled: + return true + case <-ctx.Done(): + return false + } + } + return handleEventEntry(x, w.session.startTs, state, w.metrics, emit) +} + +func handleEventEntry( + x *cdcpb.Event_Entries_, + startTs uint64, + state *regionFeedState, + metrics *regionWorkerMetrics, + emit func(assembled model.RegionFeedEvent) bool, +) error { + regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { - // if a region with kv range [a, z), and we only want the get [b, c) from this region, - // tikv will return all key events in the region, although specified [b, c) int the request. - // we can make tikv only return the events about the keys in the specified range. + // NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side. + // We can remove the check in future. comparableKey := spanz.ToComparableKey(entry.GetKey()) - // key for initialized event is nil if entry.Type != cdcpb.Event_INITIALIZED && !spanz.KeyInSpan(comparableKey, regionSpan) { - w.metrics.metricDroppedEventSize.Observe(float64(entry.Size())) + metrics.metricDroppedEventSize.Observe(float64(entry.Size())) continue } switch entry.Type { case cdcpb.Event_INITIALIZED: - if time.Since(startTime) > 20*time.Second { - log.Warn("The time cost of initializing is too much", - zap.String("namespace", w.session.client.changefeed.Namespace), - zap.String("changefeed", w.session.client.changefeed.ID), - zap.Duration("duration", time.Since(startTime)), - zap.Uint64("regionID", regionID)) - } - w.metrics.metricPullEventInitializedCounter.Inc() - + metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() - // state is just initialized, so we know this must be true - cachedEvents := state.matcher.matchCachedRow(true) - for _, cachedEvent := range cachedEvents { + for _, cachedEvent := range state.matcher.matchCachedRow(true) { revent, err := assembleRowEvent(regionID, cachedEvent) if err != nil { return errors.Trace(err) } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !emit(revent) { + return nil } + metrics.metricSendEventCommitCounter.Inc() } state.matcher.matchCachedRollbackRow(true) case cdcpb.Event_COMMITTED: - w.metrics.metricPullEventCommittedCounter.Inc() - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - resolvedTs := state.getLastResolvedTs() if entry.CommitTs <= resolvedTs { logPanic("The CommitTs must be greater than the resolvedTs", @@ -672,30 +675,22 @@ func (w *regionWorker) handleEventEntry( zap.Uint64("regionID", regionID)) return errUnreachable } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommittedCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + + metrics.metricPullEventCommittedCounter.Inc() + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) } + if !emit(revent) { + return nil + } + metrics.metricSendEventCommittedCounter.Inc() case cdcpb.Event_PREWRITE: - w.metrics.metricPullEventPrewriteCounter.Inc() + metrics.metricPullEventPrewriteCounter.Inc() state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: - w.metrics.metricPullEventCommitCounter.Inc() - // NOTE: state.getLastResolvedTs() will never less than session.startTs. - resolvedTs := state.getLastResolvedTs() - // TiKV can send events with StartTs/CommitTs less than startTs. - isStaleEvent := entry.CommitTs <= w.session.startTs - if entry.CommitTs <= resolvedTs && !isStaleEvent { - logPanic("The CommitTs must be greater than the resolvedTs", - zap.String("EventType", "COMMIT"), - zap.Uint64("CommitTs", entry.CommitTs), - zap.Uint64("resolvedTs", resolvedTs), - zap.Uint64("regionID", regionID)) - return errUnreachable - } - + metrics.metricPullEventCommitCounter.Inc() + // NOTE: matchRow should always be called even if the event is stale. if !state.matcher.matchRow(entry, state.isInitialized()) { if !state.isInitialized() { state.matcher.cacheCommitRow(entry) @@ -707,20 +702,33 @@ func (w *regionWorker) handleEventEntry( entry.GetType(), entry.GetOpType()) } - if !isStaleEvent { - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - } + // TiKV can send events with StartTs/CommitTs less than startTs. + isStaleEvent := entry.CommitTs <= startTs + if isStaleEvent { + continue + } + + // NOTE: state.getLastResolvedTs() will never less than startTs. + resolvedTs := state.getLastResolvedTs() + if entry.CommitTs <= resolvedTs { + logPanic("The CommitTs must be greater than the resolvedTs", + zap.String("EventType", "COMMIT"), + zap.Uint64("CommitTs", entry.CommitTs), + zap.Uint64("resolvedTs", resolvedTs), + zap.Uint64("regionID", regionID)) + return errUnreachable + } + + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) + } + if !emit(revent) { + return nil } + metrics.metricSendEventCommitCounter.Inc() case cdcpb.Event_ROLLBACK: - w.metrics.metricPullEventRollbackCounter.Inc() + metrics.metricPullEventRollbackCounter.Inc() if !state.isInitialized() { state.matcher.cacheRollbackRow(entry) continue @@ -740,7 +748,7 @@ func (w *regionWorker) handleResolvedTs( regions := make([]uint64, 0, len(revents.regions)) for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } regionID := state.getRegionID() @@ -775,7 +783,7 @@ func (w *regionWorker) handleResolvedTs( default: } for _, state := range revents.regions { - if state.isStopped() || !state.isInitialized() { + if state.isStale() || !state.isInitialized() { continue } state.updateResolvedTs(resolvedTs) @@ -801,7 +809,7 @@ func (w *regionWorker) evictAllRegions() { for _, states := range w.statesManager.states { deletes = deletes[:0] states.iter(func(regionID uint64, regionState *regionFeedState) bool { - if regionState.isStopped() { + if regionState.isStale() { return true } regionState.markStopped() @@ -815,7 +823,6 @@ func (w *regionWorker) evictAllRegions() { }) for _, del := range deletes { w.delRegionState(del.regionID) - del.regionState.setRegionInfoResolvedTs() // since the context used in region worker will be cancelled after // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index 4b9cd73f89a..9438d518651 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/tiflow/cdc/kv/regionlock" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/spanz" @@ -49,7 +50,11 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for i := 0; i < regionCount; i++ { regionID := uint64(1000 + i) regionIDs[i] = regionID - rsm.setState(regionID, ®ionFeedState{requestID: uint64(i + 1), lastResolvedTs: uint64(1000)}) + + state := ®ionFeedState{requestID: uint64(i + 1)} + state.sri.lockedRange = ®ionlock.LockedRange{} + state.updateResolvedTs(1000) + rsm.setState(regionID, state) } var wg sync.WaitGroup @@ -91,8 +96,8 @@ func TestRegionStateManagerThreadSafe(t *testing.T) { for _, regionID := range regionIDs { s, ok := rsm.getState(regionID) require.True(t, ok) - require.Greater(t, s.lastResolvedTs, uint64(1000)) - totalResolvedTs += s.lastResolvedTs + require.Greater(t, s.getLastResolvedTs(), uint64(1000)) + totalResolvedTs += s.getLastResolvedTs() } } @@ -151,9 +156,10 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { state := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 0, &tikv.RPCContext{}), 0) + &tikv.RPCContext{}), 0) + state.sri.lockedRange = ®ionlock.LockedRange{} state.start() - worker := newRegionWorker(model.ChangeFeedID{}, s, "") + worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") require.Equal(t, 2, cap(worker.outputCh)) // Receive prewrite2 with empty value. @@ -268,28 +274,30 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { s1 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(1, 1, 1), }, 1) - s1.initialized.Store(true) - s1.lastResolvedTs = 9 + s1.sri.lockedRange = ®ionlock.LockedRange{} + s1.setInitialized() + s1.updateResolvedTs(9) s2 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(2, 2, 2), }, 2) - s2.initialized.Store(true) - s2.lastResolvedTs = 11 + s2.sri.lockedRange = ®ionlock.LockedRange{} + s2.setInitialized() + s2.updateResolvedTs(11) s3 := newRegionFeedState(singleRegionInfo{ verID: tikv.NewRegionVerID(3, 3, 3), }, 3) - s3.initialized.Store(false) - s3.lastResolvedTs = 8 + s3.sri.lockedRange = ®ionlock.LockedRange{} + s3.updateResolvedTs(8) err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 10, regions: []*regionFeedState{s1, s2, s3}, }) require.Nil(t, err) - require.Equal(t, uint64(10), s1.lastResolvedTs) - require.Equal(t, uint64(11), s2.lastResolvedTs) - require.Equal(t, uint64(8), s3.lastResolvedTs) + require.Equal(t, uint64(10), s1.getLastResolvedTs()) + require.Equal(t, uint64(11), s2.getLastResolvedTs()) + require.Equal(t, uint64(8), s3.getLastResolvedTs()) re := <-w.rtsUpdateCh require.Equal(t, uint64(10), re.resolvedTs) @@ -309,17 +317,19 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { s1 := newRegionFeedState(newSingleRegionInfo( tikv.RegionVerID{}, spanz.ToSpan([]byte{}, spanz.UpperBoundKey), - 9, &tikv.RPCContext{}), + &tikv.RPCContext{}), 0) + s1.sri.lockedRange = ®ionlock.LockedRange{} + s1.sri.lockedRange.CheckpointTs.Store(9) s1.start() - w := newRegionWorker(model.ChangeFeedID{}, s, "") + w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "") err := w.handleResolvedTs(ctx, &resolvedTsEvent{ resolvedTs: 5, regions: []*regionFeedState{s1}, }) require.Nil(t, err) - require.Equal(t, uint64(9), s1.lastResolvedTs) + require.Equal(t, uint64(9), s1.getLastResolvedTs()) timer := time.NewTimer(time.Second) select { diff --git a/cdc/kv/regionlock/region_range_lock.go b/cdc/kv/regionlock/region_range_lock.go index 1fa5762173b..3c375f843df 100644 --- a/cdc/kv/regionlock/region_range_lock.go +++ b/cdc/kv/regionlock/region_range_lock.go @@ -21,10 +21,12 @@ import ( "math" "sync" "sync/atomic" + "time" "github.com/google/btree" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) @@ -116,6 +118,7 @@ type rangeLockEntry struct { regionID uint64 version uint64 waiters []chan<- interface{} + state LockedRange } func rangeLockEntryWithKey(key []byte) *rangeLockEntry { @@ -137,36 +140,36 @@ func (e *rangeLockEntry) String() string { len(e.waiters)) } -var currentID uint64 = 0 - -func allocID() uint64 { - return atomic.AddUint64(¤tID, 1) -} - // RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked // if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a // version number, which should comes from the Region's Epoch version. The version is used to compare which range is // new and which is old if two ranges are overlapping. type RegionRangeLock struct { + // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. + id uint64 + totalSpan tablepb.Span changefeedLogInfo string + mu sync.Mutex rangeCheckpointTs *rangeTsMap rangeLock *btree.BTreeG[*rangeLockEntry] regionIDLock map[uint64]*rangeLockEntry - // ID to identify different RegionRangeLock instances, so logs of different instances can be distinguished. - id uint64 + stopped bool + refCount uint64 } // NewRegionRangeLock creates a new RegionRangeLock. func NewRegionRangeLock( + id uint64, startKey, endKey []byte, startTs uint64, changefeedLogInfo string, ) *RegionRangeLock { return &RegionRangeLock{ + id: id, + totalSpan: tablepb.Span{StartKey: startKey, EndKey: endKey}, changefeedLogInfo: changefeedLogInfo, rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs), rangeLock: btree.NewG(16, rangeLockEntryLess), regionIDLock: make(map[uint64]*rangeLockEntry), - id: allocID(), } } @@ -208,6 +211,9 @@ func (l *RegionRangeLock) getOverlappedEntries(startKey, endKey []byte, regionID func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, version uint64) (LockRangeResult, []<-chan interface{}) { l.mu.Lock() defer l.mu.Unlock() + if l.stopped { + return LockRangeResult{Status: LockRangeStatusCancel}, nil + } overlappingEntries := l.getOverlappedEntries(startKey, endKey, regionID) @@ -219,6 +225,8 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio regionID: regionID, version: version, } + newEntry.state.CheckpointTs.Store(checkpointTs) + newEntry.state.Created = time.Now() l.rangeLock.ReplaceOrInsert(newEntry) l.regionIDLock[regionID] = newEntry @@ -230,9 +238,11 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + l.refCount += 1 return LockRangeResult{ Status: LockRangeStatusSuccess, CheckpointTs: checkpointTs, + LockedRange: &newEntry.state, }, nil } @@ -308,7 +318,9 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } // LockRange locks a range with specified version. -func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte, regionID, version uint64) LockRangeResult { +func (l *RegionRangeLock) LockRange( + ctx context.Context, startKey, endKey []byte, regionID, version uint64, +) LockRangeResult { res, signalChs := l.tryLockRange(startKey, endKey, regionID, version) if res.Status != LockRangeStatusWait { @@ -337,22 +349,23 @@ func (l *RegionRangeLock) LockRange(ctx context.Context, startKey, endKey []byte } // UnlockRange unlocks a range and update checkpointTs of the range to specified value. -func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version uint64, checkpointTs uint64) { +// If it returns true it means it is stopped and all ranges are unlocked correctly. +func (l *RegionRangeLock) UnlockRange( + startKey, endKey []byte, regionID, version uint64, + checkpointTs ...uint64, +) (drained bool) { l.mu.Lock() defer l.mu.Unlock() entry, ok := l.rangeLock.Get(rangeLockEntryWithKey(startKey)) - if !ok { log.Panic("unlocking a not locked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), - zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs)) + zap.Uint64("version", version)) } - if entry.regionID != regionID { log.Panic("unlocked a range but regionID mismatch", zap.String("changefeed", l.changefeedLogInfo), @@ -369,6 +382,8 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("regionIDLockEntry", l.regionIDLock[regionID].String())) } delete(l.regionIDLock, regionID) + l.refCount -= 1 + drained = l.stopped && l.refCount == 0 if entry.version != version || !bytes.Equal(entry.endKey, endKey) { log.Panic("unlocking region doesn't match the locked region", @@ -377,7 +392,6 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Uint64("version", version), - zap.Uint64("checkpointTs", checkpointTs), zap.String("foundLockEntry", entry.String())) } @@ -385,17 +399,40 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, regionID, version ch <- nil } - _, ok = l.rangeLock.Delete(entry) - if !ok { + if entry, ok = l.rangeLock.Delete(entry); !ok { panic("unreachable") } - l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) + + var newCheckpointTs uint64 + if len(checkpointTs) > 0 { + newCheckpointTs = checkpointTs[0] + } else { + newCheckpointTs = entry.state.CheckpointTs.Load() + } + + l.rangeCheckpointTs.Set(startKey, endKey, newCheckpointTs) log.Debug("unlocked range", zap.String("changefeed", l.changefeedLogInfo), zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), - zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("checkpointTs", newCheckpointTs), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) + return +} + +// RefCount returns how many ranges are locked. +func (l *RegionRangeLock) RefCount() uint64 { + l.mu.Lock() + defer l.mu.Unlock() + return l.refCount +} + +// Stop stops the instance. +func (l *RegionRangeLock) Stop() (drained bool) { + l.mu.Lock() + defer l.mu.Unlock() + l.stopped = true + return l.stopped && l.refCount == 0 } const ( @@ -410,15 +447,82 @@ const ( ) // LockRangeResult represents the result of LockRange method of RegionRangeLock. -// If Status is LockRangeStatusSuccess, the CheckpointTs field will be the minimal checkpoint ts among the locked -// range. +// If Status is LockRangeStatusSuccess: +// - CheckpointTs will be the minimal checkpoint ts among the locked range; +// - LockedRange is for recording real-time state changes; +// // If Status is LockRangeStatusWait, it means the lock cannot be acquired immediately. WaitFn must be invoked to // continue waiting and acquiring the lock. +// // If Status is LockRangeStatusStale, it means the LockRange request is stale because there's already a overlapping // locked range, whose version is greater or equals to the requested one. type LockRangeResult struct { Status int CheckpointTs uint64 + LockedRange *LockedRange WaitFn func() LockRangeResult RetryRanges []tablepb.Span } + +// LockedRange is returned by `RegionRangeLock.LockRange`, which can be used to +// collect informations for the range. And collected informations can be accessed +// by iterating `RegionRangeLock`. +type LockedRange struct { + CheckpointTs atomic.Uint64 + Initialzied atomic.Bool + Created time.Time +} + +// CollectLockedRangeAttrs collects locked range attributes. +func (l *RegionRangeLock) CollectLockedRangeAttrs( + action func(regionID uint64, state *LockedRange), +) (r CollectedLockedRangeAttrs) { + l.mu.Lock() + defer l.mu.Unlock() + r.FastestRegion.CheckpointTs = 0 + r.SlowestRegion.CheckpointTs = math.MaxUint64 + + lastEnd := l.totalSpan.StartKey + l.rangeLock.Ascend(func(item *rangeLockEntry) bool { + if action != nil { + action(item.regionID, &item.state) + } + if spanz.EndCompare(lastEnd, item.startKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: item.startKey}) + } + ckpt := item.state.CheckpointTs.Load() + if ckpt > r.FastestRegion.CheckpointTs { + r.FastestRegion.RegionID = item.regionID + r.FastestRegion.CheckpointTs = ckpt + r.FastestRegion.Initialized = item.state.Initialzied.Load() + r.FastestRegion.Created = item.state.Created + } + if ckpt < r.SlowestRegion.CheckpointTs { + r.SlowestRegion.RegionID = item.regionID + r.SlowestRegion.CheckpointTs = ckpt + r.SlowestRegion.Initialized = item.state.Initialzied.Load() + r.SlowestRegion.Created = item.state.Created + } + lastEnd = item.endKey + return true + }) + if spanz.EndCompare(lastEnd, l.totalSpan.EndKey) < 0 { + r.Holes = append(r.Holes, tablepb.Span{StartKey: lastEnd, EndKey: l.totalSpan.EndKey}) + } + return +} + +// CollectedLockedRangeAttrs returns by `RegionRangeLock.CollectedLockedRangeAttrs`. +type CollectedLockedRangeAttrs struct { + Holes []tablepb.Span + FastestRegion LockedRangeAttrs + SlowestRegion LockedRangeAttrs +} + +// LockedRangeAttrs is like `LockedRange`, but only contains some read-only attributes. +type LockedRangeAttrs struct { + RegionID uint64 + CheckpointTs uint64 + Initialized bool + Created time.Time +} diff --git a/cdc/kv/regionlock/region_range_lock_test.go b/cdc/kv/regionlock/region_range_lock_test.go index 8b1a5690190..af887248164 100644 --- a/cdc/kv/regionlock/region_range_lock_test.go +++ b/cdc/kv/regionlock/region_range_lock_test.go @@ -90,7 +90,7 @@ func TestRegionRangeLock(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64) unlockRange(l, "a", "e", 1, 1, 100) @@ -107,7 +107,7 @@ func TestRegionRangeLock(t *testing.T) { func TestRegionRangeLockStale(t *testing.T) { t.Parallel() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") ctx := context.TODO() mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64) mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64) @@ -130,7 +130,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) { t.Parallel() ctx := context.TODO() - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64) mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f") @@ -166,7 +166,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "") + l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "") mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64) wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12) cancel() diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index eb3a64b2529..78a7b0b6dd2 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -788,7 +788,7 @@ func (p *processor) initDDLHandler(ctx context.Context) error { return errors.Trace(err) } - kvCfg := config.GetGlobalServerConfig().KVClient + serverCfg := config.GetGlobalServerConfig() ctx = contextutil.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName) ddlPuller, err := puller.NewDDLJobPuller( ctx, @@ -798,7 +798,7 @@ func (p *processor) initDDLHandler(ctx context.Context) error { p.upstream.KVStorage, p.upstream.PDClock, ddlStartTs, - kvCfg, + serverCfg, p.changefeedID, schemaStorage, f, diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index c9143a7c64c..8861dacccac 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -103,7 +103,7 @@ func (n *WrapperImpl) Start( up.PDClock, n.startTs, []tablepb.Span{n.span}, - config.GetGlobalServerConfig().KVClient, + config.GetGlobalServerConfig(), n.changefeed, n.span.TableID, n.tableName, diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 2a30a1cc1a2..9f184b0a5ab 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -460,7 +460,7 @@ func NewDDLJobPuller( kvStorage tidbkv.Storage, pdClock pdutil.Clock, checkpointTs uint64, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, schemaStorage entry.SchemaStorage, filter filter.Filter, @@ -544,11 +544,8 @@ func NewDDLPuller(ctx context.Context, up.RegionCache, storage, up.PDClock, - startTs, - config.GetGlobalServerConfig().KVClient, - changefeed, - schemaStorage, - filter, + startTs, config.GetGlobalServerConfig(), + changefeed, schemaStorage, filter, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index cd4a2caca9e..7853275a287 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -83,7 +83,7 @@ func New(ctx context.Context, pdClock pdutil.Clock, checkpointTs uint64, spans []tablepb.Span, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 1845559c930..4291ce30675 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -64,7 +64,7 @@ func newMockCDCKVClient( grpcPool kv.GrpcPool, regionCache *tikv.RegionCache, pdClock pdutil.Clock, - cfg *config.KVClientConfig, + cfg *config.ServerConfig, changefeed model.ChangeFeedID, tableID model.TableID, tableName string, @@ -132,7 +132,7 @@ func newPullerForTest( defer regionCache.Close() plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), - checkpointTs, spans, config.GetDefaultServerConfig().KVClient, + checkpointTs, spans, config.GetDefaultServerConfig(), model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false, false) wg.Add(1) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index a83b5b5529b..d58484b9aae 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -146,7 +146,8 @@ const ( "max-task-concurrency": 10, "check-balance-interval": 60000000000, "add-table-batch-size": 50 - } + }, + "enable-kv-connect-backoff": false }, "cluster-id": "default", "max-memory-percentage": 70 diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 7e91188dafa..3a7815090bc 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -25,6 +25,9 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + + // EnableKVConnectBackOff enables the backoff for kv connect. + EnableKVConnectBackOff bool `toml:"enable-kv-connect-backoff" json:"enable-kv-connect-backoff"` } // ValidateAndAdjust validates and adjusts the debug configuration diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 7e00b4bb209..7707c903f46 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -138,7 +138,8 @@ var defaultServerConfig = &ServerConfig{ }, Messages: defaultMessageConfig.Clone(), - Scheduler: NewDefaultSchedulerConfig(), + Scheduler: NewDefaultSchedulerConfig(), + EnableKVConnectBackOff: false, }, ClusterID: "default", MaxMemoryPercentage: DefaultMaxMemoryPercentage,