Skip to content

Commit

Permalink
reset internal state of feedStateManager when shouldBeRunning is false (
Browse files Browse the repository at this point in the history
#9893)

close #9892
  • Loading branch information
CharlesCheung96 authored Oct 16, 2023
1 parent 0f4a8a1 commit 432828b
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 39 deletions.
87 changes: 50 additions & 37 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ type feedStateManager struct {

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
resolvedTs model.Ts
initCheckpointTs model.Ts
resolvedTs model.Ts
checkpointTs model.Ts
checkpointTsAdvanced time.Time

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
changefeedErrorStuckDuration time.Duration
}

Expand All @@ -86,9 +85,28 @@ func newFeedStateManager(up *upstream.Upstream, cfg *config.ReplicaConfig) *feed
m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration

m.resetErrRetry()
m.isRetrying = false
return m
}

func (m *feedStateManager) shouldRetry() bool {
// changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval).
return time.Since(m.lastErrorRetryTime) >= m.backoffInterval
}

func (m *feedStateManager) shouldFailWhenRetry() bool {
// retry the changefeed
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
return true
}

m.lastErrorRetryTime = time.Now()
return false
}

// resetErrRetry reset the error retry related fields
func (m *feedStateManager) resetErrRetry() {
m.errBackoff.Reset()
Expand All @@ -100,25 +118,25 @@ func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
) (adminJobPending bool) {
m.checkAndInitLastRetryCheckpointTs(state.Status)

if state.Status != nil {
if m.lastCheckpointTs < state.Status.CheckpointTs {
m.lastCheckpointTs = state.Status.CheckpointTs
if m.checkpointTs < state.Status.CheckpointTs {
m.checkpointTs = state.Status.CheckpointTs
m.checkpointTsAdvanced = time.Now()
}
if m.state == nil || m.state.Status == nil {
// It's the first time `m.state.Status` gets filled.
m.initCheckpointTs = state.Status.CheckpointTs
if m.resolvedTs < resolvedTs {
m.resolvedTs = resolvedTs
}
if m.checkpointTs >= m.resolvedTs {
m.checkpointTsAdvanced = time.Now()
}
}

m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
m.resolvedTs = resolvedTs
m.shouldBeRunning = true
defer func() {
if !m.shouldBeRunning {
m.cleanUpTaskPositions()
m.cleanUp()
}
}()

Expand All @@ -141,16 +159,12 @@ func (m *feedStateManager) Tick(
m.shouldBeRunning = false
return
case model.StatePending:
if time.Since(m.lastErrorRetryTime) < m.backoffInterval {
if !m.shouldRetry() {
m.shouldBeRunning = false
return
}
// retry the changefeed
oldBackoffInterval := m.backoffInterval
m.backoffInterval = m.errBackoff.NextBackOff()
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {

if m.shouldFailWhenRetry() {
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
Expand All @@ -164,18 +178,17 @@ func (m *feedStateManager) Tick(
return
}

m.lastErrorRetryTime = time.Now()
// retry the changefeed
m.shouldBeRunning = true
if m.state.Status != nil {
m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs
}
m.shouldBeRunning = true
m.patchState(model.StateWarning)
log.Info("changefeed retry backoff interval is elapsed,"+
"chengefeed will be restarted",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastErrorRetryTime", m.lastErrorRetryTime),
zap.Duration("lastRetryInterval", oldBackoffInterval),
zap.Duration("nextRetryInterval", m.backoffInterval))
case model.StateNormal, model.StateWarning:
m.checkAndChangeState()
Expand Down Expand Up @@ -284,6 +297,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = true
// when the changefeed is manually resumed, we must reset the backoff
m.resetErrRetry()
m.isRetrying = false
jobsPending = true
m.patchState(model.StateNormal)

Expand Down Expand Up @@ -416,12 +430,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
})
}

func (m *feedStateManager) cleanUpTaskPositions() {
func (m *feedStateManager) cleanUp() {
for captureID := range m.state.TaskPositions {
m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
m.checkpointTs = 0
m.checkpointTsAdvanced = time.Time{}
m.resolvedTs = 0
}

func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError {
Expand Down Expand Up @@ -546,12 +563,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
info.Error = lastError
return info, true, nil
})
}

// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}
}

Expand All @@ -565,13 +582,9 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) {
currTime := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration &&
time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration &&
m.resolvedTs > m.initCheckpointTs {

checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration
if checkpointTsStuck {
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
Expand Down
116 changes: 116 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func newFeedStateManager4Test(

f.resetErrRetry()

f.changefeedErrorStuckDuration = time.Second * 3

return f
}

Expand Down Expand Up @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Status.AdminJobType, model.AdminStop)

// resume the changefeed in failed state
manager.isRetrying = true
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminResume,
Expand All @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) {
require.Equal(t, state.Info.State, model.StateNormal)
require.Equal(t, state.Info.AdminJobType, model.AdminNone)
require.Equal(t, state.Status.AdminJobType, model.AdminNone)
require.False(t, manager.isRetrying)
}

func TestMarkFinished(t *testing.T) {
Expand Down Expand Up @@ -742,6 +746,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
manager.changefeedErrorStuckDuration = 100 * time.Millisecond
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -934,3 +939,114 @@ func TestErrorAfterWarning(t *testing.T) {
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}

func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing,
// the changefeed state will remain warning whena new warning is encountered.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 200)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time.
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 400)
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}
2 changes: 1 addition & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
// Tick all changefeeds.
ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
// check if we are the changefeed owner to handle this changefeed
// check if we are the changefeed owner to handle this changefeed
if !o.shouldHandleChangefeed(changefeedState) {
continue
}
Expand Down
1 change: 0 additions & 1 deletion pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
log.Info("update changefeed info", zap.Any("info", s.Info))
s.Info.VerifyAndComplete()
}
return nil
Expand Down

0 comments on commit 432828b

Please sign in to comment.