Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4072
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Dec 28, 2021
1 parent f4048d8 commit cdea444
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 57 deletions.
105 changes: 63 additions & 42 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
// frequency of creating new goroutine.
defaultRegionChanSize = 128

// initial size for region rate limit queue
// initial size for region rate limit queue.
defaultRegionRateLimitQueueSize = 128
// Interval of check region retry rate limit queue.
defaultCheckRegionRateLimitInterval = 50 * time.Millisecond
// Duration of warning region retry rate limited too long.
defaultLogRegionRateLimitDuration = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -135,6 +139,33 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
type regionErrorInfo struct {
singleRegionInfo
err error

retryLimitTime *time.Time
logRateLimitDuration time.Duration
}

func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo {
return regionErrorInfo{
singleRegionInfo: info,
err: err,

logRateLimitDuration: defaultLogRegionRateLimitDuration,
}
}

func (r *regionErrorInfo) logRateLimitedHint() bool {
now := time.Now()
if r.retryLimitTime == nil {
// Caller should log on the first rate limited.
r.retryLimitTime = &now
return true
}
if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration {
// Caller should log if it lasts too long.
r.retryLimitTime = &now
return true
}
return false
}

type regionFeedState struct {
Expand Down Expand Up @@ -519,21 +550,39 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
})

g.Go(func() error {
checkRateLimitInterval := 50 * time.Millisecond
timer := time.NewTimer(checkRateLimitInterval)
timer := time.NewTimer(defaultCheckRegionRateLimitInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
s.handleRateLimit(ctx)
timer.Reset(checkRateLimitInterval)
timer.Reset(defaultCheckRegionRateLimitInterval)
case errInfo := <-s.errCh:
s.errChSizeGauge.Dec()
allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID())
if !allowed {
<<<<<<< HEAD
// rate limit triggers, add the error info to the rate limit queue
=======
if errInfo.logRateLimitedHint() {
zapFieldAddr := zap.Skip()
if errInfo.singleRegionInfo.rpcCtx != nil {
// rpcCtx may be nil if we fails to get region info
// from pd. It could cause by pd down or the region
// has been merged.
zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr)
}
log.Info("EventFeed retry rate limited",
zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()),
zap.Uint64("ts", errInfo.singleRegionInfo.ts),
zap.String("changefeed", cfID), zap.Stringer("span", errInfo.span),
zap.Int64("tableID", tableID), zap.String("tableName", tableName),
zapFieldAddr)
}
// rate limit triggers, add the error info to the rate limit queue.
>>>>>>> ce2556ef6 (kv(ticdc): reduce eventfeed rate limited log (#4072))
s.rateLimitQueue = append(s.rateLimitQueue, errInfo)
} else {
err := s.handleError(ctx, errInfo)
Expand Down Expand Up @@ -626,14 +675,13 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for
// error handling. This function is non blocking even if error channel is full.
// CAUTION: Note that this should only be called in a context that the region has locked it's range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error {
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) {
log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err))
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts)
if revokeToken {
s.regionRouter.Release(errorInfo.rpcCtx.Addr)
}
s.enqueueError(ctx, errorInfo)
return nil
}

// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token
Expand Down Expand Up @@ -727,13 +775,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &connectToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
s.addStream(rpcCtx.Addr, stream, streamCancel)
Expand Down Expand Up @@ -787,15 +830,8 @@ func (s *eventFeedSession) requestRegionToStore(
continue
}

// Wait for a while and retry sending the request
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &sendRequestToStoreErr{},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
} else {
s.regionRouter.Acquire(rpcCtx.Addr)
}
Expand Down Expand Up @@ -856,15 +892,8 @@ func (s *eventFeedSession) dispatchRequest(
log.Info("cannot get rpcCtx, retry span",
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span))
err = s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: sri,
err: &rpcCtxUnavailableErr{
verID: sri.verID,
},
}, false /* revokeToken */)
if err != nil {
return errors.Trace(err)
}
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo, false /* revokeToken */)
continue
}
sri.rpcCtx = rpcCtx
Expand Down Expand Up @@ -1081,14 +1110,8 @@ func (s *eventFeedSession) receiveFromStream(

remainingRegions := pendingRegions.takeAll()
for _, state := range remainingRegions {
err := s.onRegionFail(ctx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(),
}, true /* revokeToken */)
if err != nil {
// The only possible is that the ctx is cancelled. Simply return.
return
}
errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs())
s.onRegionFail(ctx, errInfo, true /* revokeToken */)
}
}()

Expand All @@ -1100,9 +1123,7 @@ func (s *eventFeedSession) receiveFromStream(
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)

defer func() {
worker.evictAllRegions() //nolint:errcheck
}()
defer worker.evictAllRegions()

g.Go(func() error {
return worker.run(ctx)
Expand Down
17 changes: 17 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3497,3 +3498,19 @@ func (s *etcdSuite) TestHandleRateLimit(c *check.C) {
c.Assert(session.rateLimitQueue, check.HasLen, 0)
c.Assert(cap(session.rateLimitQueue), check.Equals, 128)
}

func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) {
t.Parallel()

errInfo := newRegionErrorInfo(singleRegionInfo{}, nil)
errInfo.logRateLimitDuration = time.Second

// True on the first rate limited.
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())

// True if it lasts too long.
time.Sleep(2 * errInfo.logRateLimitDuration)
require.True(t, errInfo.logRateLimitedHint())
require.False(t, errInfo.logRateLimitedHint())
}
21 changes: 6 additions & 15 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,8 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState
revokeToken := !state.initialized
// since the context used in region worker will be cancelled after region
// worker exits, we must use the parent context to prevent regionErrorInfo loss.
err2 := w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: err,
}, revokeToken)
if err2 != nil {
return err2
}
errInfo := newRegionErrorInfo(state.sri, err)
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)

return retErr
}
Expand Down Expand Up @@ -771,8 +766,7 @@ func (w *regionWorker) handleResolvedTs(

// evictAllRegions is used when gRPC stream meets error and re-establish, notify
// all existing regions to re-establish
func (w *regionWorker) evictAllRegions() error {
var err error
func (w *regionWorker) evictAllRegions() {
for _, states := range w.statesManager.states {
states.Range(func(_, value interface{}) bool {
state := value.(*regionFeedState)
Expand All @@ -792,14 +786,11 @@ func (w *regionWorker) evictAllRegions() error {
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
errInfo := newRegionErrorInfo(state.sri, cerror.ErrEventFeedAborted.FastGenByArgs())
w.session.onRegionFail(w.parentCtx, errInfo, revokeToken)
return true
})
}
return err
}

func getWorkerPoolSize() (size int) {
Expand Down

0 comments on commit cdea444

Please sign in to comment.