Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv-client(cdc): add more logs to help debug slow regions (#9981) #9982

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 108 additions & 39 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -69,6 +71,8 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region anymore.
regionScheduleReload = false

resolveLockMinInterval = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -248,7 +252,8 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
return nil
}
if c.config.Debug.EnableKVConnectBackOff {
newStreamErr = retry.Do(ctx, streamFunc, retry.WithBackoffBaseDelay(500),
newStreamErr = retry.Do(ctx, streamFunc,
retry.WithBackoffBaseDelay(100),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
Expand All @@ -268,7 +273,7 @@ func (c *CDCClient) EventFeed(
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx, ts)
return s.eventFeed(ctx)
}

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -362,7 +367,6 @@ type eventFeedSession struct {

type rangeRequestTask struct {
span regionspan.ComparableSpan
ts uint64
}

func newEventFeedSession(
Expand All @@ -372,9 +376,10 @@ func newEventFeedSession(
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs,
id, totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
Expand All @@ -387,7 +392,7 @@ func newEventFeedSession(
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: id,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
Expand All @@ -403,7 +408,7 @@ func newEventFeedSession(
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context) error {
s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]()
s.regionCh = chann.NewDrainableChann[singleRegionInfo]()
s.regionRouter = chann.NewDrainableChann[singleRegionInfo]()
Expand All @@ -420,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return s.dispatchRequest(ctx)
})
g.Go(func() error { return s.dispatchRequest(ctx) })

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
})
g.Go(func() error { return s.requestRegionToStore(ctx, g) })

g.Go(func() error { return s.logSlowRegions(ctx) })

g.Go(func() error {
for {
Expand All @@ -444,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
// Besides the count or frequency of range request is limited,
// we use ephemeral goroutine instead of permanent goroutine.
g.Go(func() error {
return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
return s.divideAndSendEventFeedToRegions(ctx, task.span)
})
}
}
Expand All @@ -465,24 +468,26 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts}
s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan}
s.rangeChSizeGauge.Inc()

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("startTs", ts),
zap.Uint64("startTs", s.startTs),
zap.Stringer("span", s.totalSpan))

return g.Wait()
}

// scheduleDivideRegionAndRequest schedules a range to be divided by regions, and these regions will be then scheduled
// to send ChangeData requests.
func (s *eventFeedSession) scheduleDivideRegionAndRequest(ctx context.Context, span regionspan.ComparableSpan, ts uint64) {
task := rangeRequestTask{span: span, ts: ts}
// scheduleDivideRegionAndRequest schedules a range to be divided by regions,
// and these regions will be then scheduled to send ChangeData requests.
func (s *eventFeedSession) scheduleDivideRegionAndRequest(
ctx context.Context, span regionspan.ComparableSpan,
) {
task := rangeRequestTask{span: span}
select {
case s.requestRangeCh.In() <- task:
s.rangeChSizeGauge.Inc()
Expand All @@ -496,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
handleResult := func(res regionspan.LockRangeResult) {
switch res.Status {
case regionspan.LockRangeStatusSuccess:
sri.resolvedTs = res.CheckpointTs
sri.lockedRange = res.LockedRange
select {
case s.regionCh.In() <- sri:
s.regionChSizeGauge.Inc()
Expand All @@ -508,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs),
zap.Any("retrySpans", res.RetryRanges))
for _, r := range res.RetryRanges {
// This call is always blocking, otherwise if scheduling in a new
// goroutine, it won't block the caller of `schedulerRegionRequest`.
s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, r)
}
case regionspan.LockRangeStatusCancel:
return
Expand All @@ -527,7 +531,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// short sleep to wait region has split
time.Sleep(time.Second)
s.rangeLock.UnlockRange(sri.span.Start, sri.span.End,
sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs)
sri.verID.GetID(), sri.verID.GetVer())
regionNum := val.(int)
retryRanges := make([]regionspan.ComparableSpan, 0, regionNum)
start := []byte("a")
Expand Down Expand Up @@ -556,7 +560,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// CAUTION: Note that this should only be called in a context that the region has locked its range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) {
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End,
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs)
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs())
log.Info("region failed", zap.Stringer("span", &errorInfo.span),
zap.Any("regionId", errorInfo.verID.GetID()),
zap.Error(errorInfo.err))
Expand Down Expand Up @@ -606,7 +610,7 @@ func (s *eventFeedSession) requestRegionToStore(
RegionId: regionID,
RequestId: requestID,
RegionEpoch: regionEpoch,
CheckpointTs: sri.resolvedTs,
CheckpointTs: sri.resolvedTs(),
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
Expand Down Expand Up @@ -684,13 +688,13 @@ func (s *eventFeedSession) requestRegionToStore(
state := newRegionFeedState(sri, requestID)
pendingRegions.setByRequestID(requestID, state)

log.Debug("start new request",
log.Info("start new request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Any("request", req))
zap.Uint64("regionID", sri.verID.GetID()),
zap.String("addr", storeAddr))

err = stream.client.Send(req)

Expand Down Expand Up @@ -777,7 +781,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
Region: sri.verID.GetID(),
},
},
ResolvedTs: sri.resolvedTs,
ResolvedTs: sri.resolvedTs(),
},
}
select {
Expand All @@ -799,7 +803,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
zap.String("tableName", s.tableName),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs))
zap.Uint64("resolvedTs", sri.resolvedTs()))
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo)
continue
Expand All @@ -813,7 +817,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
// to region boundaries. When region merging happens, it's possible that it
// will produce some overlapping spans.
func (s *eventFeedSession) divideAndSendEventFeedToRegions(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
ctx context.Context, span regionspan.ComparableSpan,
) error {
limit := 20
nextSpan := span
Expand All @@ -826,7 +830,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
retryErr := retry.Do(ctx, func() error {
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
start := time.Now()
regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange(
bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.Observe(time.Since(start).Seconds())
if err != nil {
return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err)
Expand Down Expand Up @@ -855,15 +860,16 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(

for _, tiRegion := range regions {
region := tiRegion.GetMeta()
partialSpan, err := regionspan.Intersect(s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey})
partialSpan, err := regionspan.Intersect(
s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey})
if err != nil {
return errors.Trace(err)
}
nextSpan.Start = region.EndKey
// the End key return by the PD API will be nil to represent the biggest key,
partialSpan = partialSpan.Hack()

sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil)
sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil)
s.scheduleRegionRequest(ctx, sri)
// return if no more regions
if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 {
Expand All @@ -882,17 +888,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
switch eerr := errors.Cause(err).(type) {
case *eventError:
innerErr := eerr.err
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
metricFeedEpochNotMatchCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if innerErr.GetRegionNotFound() != nil {
metricFeedRegionNotFoundCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil {
metricFeedDuplicateRequestCounter.Inc()
Expand Down Expand Up @@ -922,7 +935,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
}
case *rpcCtxUnavailableErr:
metricFeedRPCCtxUnavailable.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
case *connectToStoreErr:
metricConnectToStoreErr.Inc()
Expand Down Expand Up @@ -1199,7 +1212,7 @@ func (s *eventFeedSession) sendRegionChangeEvents(
}
state.start()
worker.setRegionState(event.RegionId, state)
} else if state.isStopped() {
} else if state.isStale() {
log.Warn("drop event due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand All @@ -1209,6 +1222,17 @@ func (s *eventFeedSession) sendRegionChangeEvents(
continue
}

switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Any("error", x.Error))
}

slot := worker.inputCalcSlot(event.RegionId)
statefulEvents[slot] = append(statefulEvents[slot], &regionStatefulEvent{
changeEvent: event,
Expand Down Expand Up @@ -1301,6 +1325,51 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.client.pdClock.CurrentTime()
attr := s.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.Start, hole.End))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("holes", strings.Join(holes, ", ")))
}
}
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
Loading
Loading