Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv(ticdc): reduce eventfeed rate limited log (#4072) #4110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 64 additions & 49 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
// frequency of creating new goroutine.
defaultRegionChanSize = 128

// initial size for region rate limit queue
// initial size for region rate limit queue.
defaultRegionRateLimitQueueSize = 128
// Interval of check region retry rate limit queue.
defaultCheckRegionRateLimitInterval = 50 * time.Millisecond
// Duration of warning region retry rate limited too long.
defaultLogRegionRateLimitDuration = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
type regionErrorInfo struct {
singleRegionInfo
err error

retryLimitTime *time.Time
logRateLimitDuration time.Duration
}

func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
return regionErrorInfo{
singleRegionInfo: info,
err: err,

logRateLimitDuration: defaultLogRegionRateLimitDuration,
}
}

func (r *regionErrorInfo) logRateLimitedHint() bool {
now := time.Now()
if r.retryLimitTime == nil {
// Caller should log on the first rate limited.
r.retryLimitTime = &now
return true
}
if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration {
// Caller should log if it lasts too long.
r.retryLimitTime = &now
return true
}
return false
}

type regionFeedState struct {
Expand Down Expand Up @@ -518,22 +549,38 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

tableID, tableName := util.TableIDFromCtx(ctx)
cfID := util.ChangefeedIDFromCtx(ctx)
g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
timer := time.NewTimer(defaultCheckRegionRateLimitInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
s.handleRateLimit(ctx)
timer.Reset(checkRateLimitInterval)
timer.Reset(defaultCheckRegionRateLimitInterval)
case errInfo := <-s.errCh:
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
// rate limit triggers, add the error info to the rate limit queue
if errInfo.logRateLimitedHint() {
zapFieldAddr := zap.Skip()
if errInfo.singleRegionInfo.rpcCtx != nil {
// rpcCtx may be nil if we fails to get region info
// from pd. It could cause by pd down or the region
// has been merged.
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zapFieldAddr)
}
// rate limit triggers, add the error info to the rate limit queue.
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -626,14 +673,13 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error {
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
}
s.enqueueError(ctx, errorInfo)
return nil
}

// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token
Expand Down Expand Up @@ -727,13 +773,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &connectToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
s.addStream(rpcCtx.Addr, stream, streamCancel)
Expand Down Expand Up @@ -787,15 +828,8 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

// Wait for a while and retry sending the request
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &sendRequestToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
} else {
s.regionRouter.Acquire(rpcCtx.Addr)
}
Expand Down Expand Up @@ -856,15 +890,8 @@ func (s *eventFeedSession) dispatchRequest(
log.Info("cannot get rpcCtx, retry span",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &rpcCtxUnavailableErr{
verID: sri.verID,
},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
sri.rpcCtx = rpcCtx
Expand Down Expand Up @@ -980,15 +1007,11 @@ func (s *eventFeedSession) handleRateLimit(ctx context.Context) {
}

// checkRateLimit checks whether a region can be reconnected based on its rate limiter
func (s *eventFeedSession) checkRateLimit(regionID uint64) (allowed bool) {
func (s *eventFeedSession) checkRateLimit(regionID uint64) bool {
limiter := s.client.getRegionLimiter(regionID)
// use Limiter.Allow here since if exceed the rate limit, we skip this region
// and try it later.
allowed = limiter.Allow()
if !allowed {
log.Info("EventFeed retry rate limited", zap.Uint64("regionID", regionID))
}
return
return limiter.Allow()
}

// handleError handles error returned by a region. If some new EventFeed connection should be established, the region
Expand Down Expand Up @@ -1081,14 +1104,8 @@ func (s *eventFeedSession) receiveFromStream(

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
}, true /* revokeToken */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
s.onRegionFail(ctx, errInfo, true /* revokeToken */)
}
}()

Expand All @@ -1100,9 +1117,7 @@ func (s *eventFeedSession) receiveFromStream(
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)

defer func() {
worker.evictAllRegions() //nolint:errcheck
}()
defer worker.evictAllRegions()

g.Go(func() error {
return worker.run(ctx)
Expand Down
17 changes: 17 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3497,3 +3498,19 @@ func (s *etcdSuite) TestHandleRateLimit(c *check.C) {
c.Assert(session.rateLimitQueue, check.HasLen, 0)
c.Assert(cap(session.rateLimitQueue), check.Equals, 128)
}

func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) {
t.Parallel()

errInfo := newRegionErrorInfo(singleRegionInfo{}, nil)
errInfo.logRateLimitDuration = time.Second

// True on the first rate limited.
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())

// True if it lasts too long.
time.Sleep(2 * errInfo.logRateLimitDuration)
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
}
21 changes: 6 additions & 15 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
revokeToken := !state.initialized
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
if err2 != nil {
return err2
}
errInfo := newRegionErrorInfo(state.sri, err)
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)

return retErr
}
Expand Down Expand Up @@ -771,8 +766,7 @@ func (w *regionWorker) handleResolvedTs(

// evictAllRegions is used when gRPC stream meets error and re-establish, notify
// all existing regions to re-establish
func (w *regionWorker) evictAllRegions() error {
var err error
func (w *regionWorker) evictAllRegions() {
for _, states := range w.statesManager.states {
states.Range(func(_, value interface{}) bool {
state := value.(*regionFeedState)
Expand All @@ -792,14 +786,11 @@ func (w *regionWorker) evictAllRegions() error {
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs())
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
return true
})
}
return err
}

func getWorkerPoolSize() (size int) {
Expand Down