Skip to content

Commit

Permalink
Merge branch 'master' into openapi-config-template-dmctl
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Jan 17, 2022
2 parents 0f3dd42 + e0caddb commit 69c8a71
Show file tree
Hide file tree
Showing 65 changed files with 2,069 additions and 719 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ dm_coverage: tools/bin/gocovmerge tools/bin/goveralls
go tool cover -html "$(DM_TEST_DIR)/all_cov.out" -o "$(DM_TEST_DIR)/all_cov.html"
go tool cover -html "$(DM_TEST_DIR)/unit_test.out" -o "$(DM_TEST_DIR)/unit_test_cov.html"


tools/bin/failpoint-ctl: tools/check/go.mod
cd tools/check && $(GO) build -mod=mod -o ../bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl

Expand Down
101 changes: 60 additions & 41 deletions cdc/kv/client_test.go

Large diffs are not rendered by default.

68 changes: 4 additions & 64 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"net/url"
"regexp"
"sort"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -93,19 +92,6 @@ func (s FeedState) IsNeeded(need string) bool {
return need == string(s)
}

const (
// errorHistoryGCInterval represents how long we keep error record in changefeed info
errorHistoryGCInterval = time.Minute * 10

// errorHistoryCheckInterval represents time window for failure check
errorHistoryCheckInterval = time.Minute * 2

// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
)

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
SinkURI string `json:"sink-uri"`
Expand All @@ -123,10 +109,9 @@ type ChangeFeedInfo struct {
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
ErrorHis []int64 `json:"history"`
Error *RunningError `json:"error"`
Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Error *RunningError `json:"error"`

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
Expand Down Expand Up @@ -266,6 +251,7 @@ func (info *ChangeFeedInfo) VerifyAndComplete() error {
if info.Config.Consistent == nil {
info.Config.Consistent = defaultConfig.Consistent
}

return nil
}

Expand Down Expand Up @@ -374,56 +360,10 @@ func (info *ChangeFeedInfo) fixSinkProtocol() {
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
func (info *ChangeFeedInfo) CheckErrorHistory() (needSave bool, canInit bool) {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryGCInterval
})
info.ErrorHis = info.ErrorHis[i:]

if i > 0 {
needSave = true
}

i = sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
canInit = len(info.ErrorHis)-i < ErrorHistoryThreshold
return
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
return false
}
return cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

// findActiveErrors finds all errors occurring within errorHistoryCheckInterval
func (info *ChangeFeedInfo) findActiveErrors() []int64 {
i := sort.Search(len(info.ErrorHis), func(i int) bool {
ts := info.ErrorHis[i]
// ts is a errors occurrence time, here to find all errors occurring within errorHistoryCheckInterval
return time.Since(time.Unix(ts/1e3, (ts%1e3)*1e6)) < errorHistoryCheckInterval
})
return info.ErrorHis[i:]
}

// ErrorsReachedThreshold checks error history of a changefeed
// returns true if error counts reach threshold
func (info *ChangeFeedInfo) ErrorsReachedThreshold() bool {
return len(info.findActiveErrors()) >= ErrorHistoryThreshold
}

// CleanUpOutdatedErrorHistory cleans up the outdated error history
// return true if the ErrorHis changed
func (info *ChangeFeedInfo) CleanUpOutdatedErrorHistory() bool {
lastLenOfErrorHis := len(info.ErrorHis)
info.ErrorHis = info.findActiveErrors()
return lastLenOfErrorHis != len(info.ErrorHis)
}
28 changes: 0 additions & 28 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,34 +641,6 @@ func TestChangeFeedInfoClone(t *testing.T) {
require.True(t, info.Config.EnableOldValue)
}

func TestCheckErrorHistory(t *testing.T) {
t.Parallel()

now := time.Now()
info := &ChangeFeedInfo{
ErrorHis: []int64{},
}
for i := 0; i < 5; i++ {
tm := now.Add(-errorHistoryGCInterval)
info.ErrorHis = append(info.ErrorHis, tm.UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
for i := 0; i < ErrorHistoryThreshold-1; i++ {
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
needSave, canInit := info.CheckErrorHistory()
require.True(t, needSave)
require.True(t, canInit)
require.Equal(t, ErrorHistoryThreshold-1, len(info.ErrorHis))

info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
needSave, canInit = info.CheckErrorHistory()
require.False(t, needSave)
require.False(t, canInit)
}

func TestChangefeedInfoStringer(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: new(feedStateManager),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,

errCh: make(chan error, defaultErrChSize),
Expand Down
140 changes: 128 additions & 12 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -24,6 +25,24 @@ import (
"go.uber.org/zap"
)

const (
// When errors occurred and we need to do backoff, we start an exponential backoff
// with an interval from 10s to 30min (10s, 20s, 40s, 80s, 160s, 320s, 640s, 1280s, 1800s).
// And the backoff will be stopped after 72 min (about 9 tries) because if we do another 30min backoff,
// the total duration (72+30=102min) will exceeds the MaxElapsedTime (90min).
// To avoid thunderherd, a random factor is also added.
defaultBackoffInitInterval = 10 * time.Second
defaultBackoffMaxInterval = 30 * time.Minute
defaultBackoffMaxElapsedTime = 90 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

// If all states recorded in window are 'normal', it can be assumed that the changfeed
// is running steady. And then if we enter a state other than normal at next tick,
// the backoff must be reset.
defaultStateWindowSize = 512
)

// feedStateManager manages the ReactorState of a changefeed
// when an error or an admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
Expand All @@ -34,7 +53,71 @@ type feedStateManager struct {
// shouldBeRemoved = false means the changefeed is paused
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
adminJobQueue []*model.AdminJob
stateHistory [defaultStateWindowSize]model.FeedState
lastErrorTime time.Time // time of last error for a changefeed
backoffInterval time.Duration // the interval for restarting a changefeed in 'error' state
errBackoff *backoff.ExponentialBackOff // an exponential backoff for restarting a changefeed
}

// newFeedStateManager creates feedStateManager and initialize the exponential backoff
func newFeedStateManager() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = defaultBackoffInitInterval
f.errBackoff.MaxInterval = defaultBackoffMaxInterval
f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime
f.errBackoff.Multiplier = defaultBackoffMultiplier
f.errBackoff.RandomizationFactor = defaultBackoffRandomizationFactor

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test() *feedStateManager {
f := new(feedStateManager)

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = 200 * time.Millisecond
f.errBackoff.MaxInterval = 1600 * time.Millisecond
f.errBackoff.MaxElapsedTime = 6 * time.Second
f.errBackoff.Multiplier = 2.0
f.errBackoff.RandomizationFactor = 0

f.resetErrBackoff()
f.lastErrorTime = time.Unix(0, 0)

return f
}

// resetErrBackoff reset the backoff-related fields
func (m *feedStateManager) resetErrBackoff() {
m.errBackoff.Reset()
m.backoffInterval = m.errBackoff.NextBackOff()
}

// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
return false
}
}

return true
}

// shiftStateWindow shift the sliding window
func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
for i := 0; i < defaultStateWindowSize-1; i++ {
m.stateHistory[i] = m.stateHistory[i+1]
}

m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) {
Expand Down Expand Up @@ -145,16 +228,18 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
return
}
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrBackoff()
// The lastErrorTime also needs to be cleared before a fresh run.
m.lastErrorTime = time.Unix(0, 0)
jobsPending = true
m.patchState(model.StateNormal)
// remove error history to make sure the changefeed can running in next tick
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
}
if info.Error != nil || len(info.ErrorHis) != 0 {
if info.Error != nil {
info.Error = nil
info.ErrorHis = nil
return info, true, nil
}
return info, false, nil
Expand Down Expand Up @@ -286,8 +371,6 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
return nil, false, nil
}
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
info.CleanUpOutdatedErrorHistory()
return info, true, nil
})
m.shouldBeRunning = false
Expand All @@ -302,16 +385,49 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
}
for _, err := range errs {
info.Error = err
info.ErrorHis = append(info.ErrorHis, time.Now().UnixNano()/1e6)
}
changed := info.CleanUpOutdatedErrorHistory()
return info, changed || len(errs) > 0, nil
return info, len(errs) > 0, nil
})

// if the number of errors has reached the error threshold, stop the changefeed
if m.state.Info.ErrorsReachedThreshold() {
// If we enter into an abnormal state ('error', 'failed') for this changefeed now
// but haven't seen abnormal states in a sliding window (512 ticks),
// it can be assumed that this changefeed meets a sudden change from a stable condition.
// So we can reset the exponential backoff and re-backoff from the InitialInterval.
// TODO: this detection policy should be added into unit test.
if len(errs) > 0 {
m.lastErrorTime = time.Now()
if m.isChangefeedStable() {
m.resetErrBackoff()
}
} else {
if m.state.Info.State == model.StateNormal {
m.lastErrorTime = time.Unix(0, 0)
}
}
m.shiftStateWindow(m.state.Info.State)

if m.lastErrorTime == time.Unix(0, 0) {
return
}

if time.Since(m.lastErrorTime) < m.backoffInterval {
m.shouldBeRunning = false
m.patchState(model.StateError)
return
} else {
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
m.lastErrorTime = time.Unix(0, 0)

// if the duration since backoff start exceeds MaxElapsedTime,
// we set the state of changefeed to "failed" and don't let it run again unless it is manually resumed.
if m.backoffInterval == backoff.Stop {
log.Warn("changefeed will not be restarted because it has been failing for a long time period",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime))
m.shouldBeRunning = false
m.patchState(model.StateFailed)
} else {
log.Info("changefeed restart backoff interval is changed", zap.String("changefeed", m.state.ID),
zap.Duration("oldInterval", oldBackoffInterval), zap.Duration("newInterval", m.backoffInterval))
}
}
}
Loading

0 comments on commit 69c8a71

Please sign in to comment.