Skip to content

Commit

Permalink
sink(cdc): ddl sink errors shouldn't fail changefeed quickly (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored and 3AceShowHand committed Aug 29, 2023
1 parent b9a6e65 commit ed3b03a
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
}
return nil, nil
}()
if err != nil && !cerror.IsChangefeedUnRetryableError(err) {
if err != nil && !cerror.ShouldFailChangefeed(err) {
log.Error("failed to mount and unmarshals entry, start to print debug info", zap.Error(err))
snap.PrintStatus(log.Error)
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type RunningError struct {
Message string `json:"message"`
}

// IsChangefeedUnRetryableError return true if a running error contains a changefeed not retry error.
func (r RunningError) IsChangefeedUnRetryableError() bool {
return cerror.IsChangefeedUnRetryableError(errors.New(r.Message + r.Code))
// ShouldFailChangefeed return true if a running error contains a changefeed not retry error.
func (r RunningError) ShouldFailChangefeed() bool {
return cerror.ShouldFailChangefeed(errors.New(r.Message + r.Code))
}
2 changes: 1 addition & 1 deletion cdc/model/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ func TestIsChangefeedNotRetryError(t *testing.T) {
}

for _, c := range cases {
require.Equal(t, c.result, c.err.IsChangefeedUnRetryableError())
require.Equal(t, c.result, c.err.ShouldFailChangefeed())
}
}
4 changes: 2 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f
if err = action(); err == nil {
return nil
}
isRetryable := !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled
isRetryable := !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled
log.Warn("owner ddl sink fails on action",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
Expand Down Expand Up @@ -397,7 +397,7 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (e
if err == nil {
return nil
}
if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
if !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled {
// TODO(qupeng): retry it internally after async sink syncPoint is ready.
s.reportError(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
// and no need to patch other error to the changefeed info
for _, err := range errs {
if cerrors.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(err.Code)) ||
err.IsChangefeedUnRetryableError() {
err.ShouldFailChangefeed() {
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
if info == nil {
return nil, false, nil
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}

// If the error is retryable, we should retry to re-establish the internal resources.
if !cerror.IsChangefeedUnRetryableError(err) && errors.Cause(err) != context.Canceled {
if !cerror.ShouldFailChangefeed(err) && errors.Cause(err) != context.Canceled {
select {
case <-m.managerCtx.Done():
case warnings[0] <- err:
Expand Down
8 changes: 1 addition & 7 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -100,12 +99,7 @@ func NewDDLSink(

// WriteDDLEvent writes a DDL event to the mysql database.
func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
err := m.execDDLWithMaxRetries(ctx, ddl)
// we should not retry changefeed if DDL failed by return an unretryable error.
if !errorutil.IsRetryableDDLError(err) {
return cerror.WrapChangefeedUnretryableErr(err)
}
return errors.Trace(err)
return m.execDDLWithMaxRetries(ctx, ddl)
}

func (m *DDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/errors/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ var changefeedUnRetryableErrors = []*errors.Error{
ErrStorageSinkInvalidConfig,
}

// IsChangefeedUnRetryableError returns true if an error is a changefeed not retry error.
func IsChangefeedUnRetryableError(err error) bool {
// ShouldFailChangefeed returns true if an error is a changefeed not retry error.
func ShouldFailChangefeed(err error) bool {
for _, e := range changefeedUnRetryableErrors {
if e.Equal(err) {
return true
Expand Down
6 changes: 3 additions & 3 deletions pkg/errors/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestChangefeedFastFailError(t *testing.T) {
require.Equal(t, false, IsChangefeedGCFastFailErrorCode(rfcCode))
}

func TestIsChangefeedUnRetryableError(t *testing.T) {
func TestShouldFailChangefeed(t *testing.T) {
t.Parallel()
cases := []struct {
err error
Expand Down Expand Up @@ -171,14 +171,14 @@ func TestIsChangefeedUnRetryableError(t *testing.T) {
}

for _, c := range cases {
require.Equal(t, c.expected, IsChangefeedUnRetryableError(c.err))
require.Equal(t, c.expected, ShouldFailChangefeed(c.err))
}

var code errors.RFCErrorCode
var ok bool
code, ok = RFCCode(ErrChangefeedUnretryable)
require.True(t, ok)
require.True(t, IsChangefeedUnRetryableError(errors.New(string(code))))
require.True(t, ShouldFailChangefeed(errors.New(string(code))))
}

func TestIsCliUnprintableError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (f *dmlExprFilter) shouldSkipDML(
for _, rule := range rules {
ignore, err := rule.shouldSkipDML(row, rawRow, ti)
if err != nil {
if cerror.IsChangefeedUnRetryableError(err) {
if cerror.ShouldFailChangefeed(err) {
return false, err
}
return false, cerror.WrapError(cerror.ErrFailedToFilterDML, err, row)
Expand Down

0 comments on commit ed3b03a

Please sign in to comment.