Skip to content

Commit

Permalink
changefeed: fix changefeed does not fast fail when occur ErrGCTTLExce…
Browse files Browse the repository at this point in the history
…eded error (#3120) (#3134)
  • Loading branch information
ti-chi-bot authored Oct 28, 2021
1 parent fe8a97e commit 35f2d40
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 24 deletions.
16 changes: 11 additions & 5 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
if err := c.gcManager.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
})
if err := c.gcManager.checkStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -132,16 +135,19 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) error {
c.state = state
c.feedStateManager.Tick(state)
if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
return nil
}

checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// check stale checkPointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure an error or stopped changefeed also be checked
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
return errors.Trace(err)
}

if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
return nil
}

if !c.preflightCheck(captures) {
return nil
}
Expand Down
13 changes: 8 additions & 5 deletions cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var gcSafepointUpdateInterval = 1 * time.Minute
type GcManager interface {
updateGCSafePoint(ctx cdcContext.Context, state *model.GlobalReactorState) error
currentTimeFromPDCached(ctx cdcContext.Context) (time.Time, error)
checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error
checkStaleCheckpointTs(ctx cdcContext.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error
}

type gcManager struct {
Expand Down Expand Up @@ -128,18 +128,21 @@ func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time,
return m.pdPhysicalTimeCache, nil
}

func (m *gcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
func (m *gcManager) checkStaleCheckpointTs(
ctx cdcContext.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.currentTimeFromPDCached(ctx)
if err != nil {
return errors.Trace(err)
}
if pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, ctx.ChangefeedVars().ID)
if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, changefeedID)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts.
if checkpointTs < m.lastSafePointTs {
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs)
}
}
Expand Down
8 changes: 5 additions & 3 deletions cdc/owner/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,16 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
ctx := cdcContext.NewBackendContext4Test(true)
mockPDClient := &mockPDClient{}
ctx.GlobalVars().PDClient = mockPDClient
err := gcManager.checkStaleCheckpointTs(ctx, 10)
err := gcManager.checkStaleCheckpointTs(ctx, "cfID", 10)
c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)

err = gcManager.checkStaleCheckpointTs(ctx, oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0))
err = gcManager.checkStaleCheckpointTs(ctx, "cfID", oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0))
c.Assert(err, check.IsNil)

gcManager.isTiCDCBlockGC = false
gcManager.lastSafePointTs = 20
err = gcManager.checkStaleCheckpointTs(ctx, 10)
err = gcManager.checkStaleCheckpointTs(ctx, "cfID", 10)
c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue)
c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)
}
2 changes: 1 addition & 1 deletion cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type mockGcManager struct {
GcManager
}

func (m *mockGcManager) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error {
func (m *mockGcManager) checkStaleCheckpointTs(ctx cdcContext.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs()
}

Expand Down
39 changes: 29 additions & 10 deletions pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,40 @@ func WrapError(rfcError *errors.Error, err error) error {
return rfcError.Wrap(err).GenWithStackByCause()
}

// ChangefeedFastFailError checks the error, returns true if it is meaningless
// to retry on this error
// ChangeFeedFastFailError is read only.
// If this type of error occurs in a changefeed, it means that the data it
// wants to replicate has been or will be GC. So it makes no sense to try to
// resume the changefeed, and the changefeed should immediately be failed.
var ChangeFeedFastFailError = []*errors.Error{
ErrGCTTLExceeded, ErrSnapshotLostByGC, ErrStartTsBeforeGC,
}

// ChangefeedFastFailError checks if an error is a ChangefeedFastFailError
func ChangefeedFastFailError(err error) bool {
return ErrStartTsBeforeGC.Equal(errors.Cause(err)) || ErrSnapshotLostByGC.Equal(errors.Cause(err))
if err == nil {
return false
}
for _, e := range ChangeFeedFastFailError {
if e.Equal(err) {
return true
}
rfcCode, ok := RFCCode(err)
if ok && e.RFCCode() == rfcCode {
return true
}
}
return false
}

// ChangefeedFastFailErrorCode checks the error, returns true if it is meaningless
// to retry on this error
// ChangefeedFastFailErrorCode checks the error code, returns true if it is a
// ChangefeedFastFailError code
func ChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool {
switch errCode {
case ErrStartTsBeforeGC.RFCCode(), ErrSnapshotLostByGC.RFCCode():
return true
default:
return false
for _, e := range ChangeFeedFastFailError {
if errCode == e.RFCCode() {
return true
}
}
return false
}

// RFCCode returns a RFCCode from an error
Expand Down
34 changes: 34 additions & 0 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,37 @@ func (s *helperSuite) TestIsRetryableError(c *check.C) {
c.Assert(ret, check.Equals, tt.want, check.Commentf("case:%s", tt.name))
}
}

func (s *helperSuite) TestChangefeedFastFailError(c *check.C) {
defer testleak.AfterTest(c)()

err := ErrGCTTLExceeded.FastGenByArgs()
rfcCode, _ := RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrGCTTLExceeded.GenWithStack("aa")
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrGCTTLExceeded.Wrap(errors.New("aa"))
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrSnapshotLostByGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrStartTsBeforeGC.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsTrue)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsTrue)

err = ErrToTLSConfigFailed.FastGenByArgs()
rfcCode, _ = RFCCode(err)
c.Assert(ChangefeedFastFailError(err), check.IsFalse)
c.Assert(ChangefeedFastFailErrorCode(rfcCode), check.IsFalse)
}
71 changes: 71 additions & 0 deletions tests/changefeed_fast_fail/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/bin/bash

set -e

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=20

function check_changefeed_mark_failed_regex() {
endpoints=$1
changefeedid=$2
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info | jq -r '.state')
if [[ ! "$state" == "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
message=$(echo $info | jq -r '.error.message')
if [[ ! "$message" =~ $error_msg ]]; then
echo "error message '$message' does not match '$error_msg'"
exit 1
fi
}

export -f check_changefeed_mark_failed_regex

function run() {
# it is no need to test kafka
# the logic are all the same
if [ "$SINK_TYPE" == "kafka" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectChangefeedFastFailError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://normal:123456@127.0.0.1:3306/?max-txn-row=1"

changefeedid="changefeed-fast-fail"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "ErrGCTTLExceeded"
run_cdc_cli changefeed remove -c $changefeedid
sleep 2
#result=$(curl -X GET "http://127.0.0.1:8300/api/v1/changefeeds")
result=$(cdc cli changefeed list)
if [[ ! "$result" == "[]" ]]; then
echo "changefeed remove failed"
exit 1
fi

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 35f2d40

Please sign in to comment.