Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader/syncer: filter context cancel error while executing sqls #355

Merged
merged 16 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
return ret, err
})
if err != nil {
ctx.L().Error("query statement failed after retry",
ctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -169,7 +169,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
})

if err != nil {
ctx.L().Error("execute statements failed after retry",
ctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
6 changes: 4 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -174,7 +174,9 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...)

if err != nil {
tctx.L().Error("query statement failed",
tctx.L().ErrorFilterContextCanceled("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -144,7 +144,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
continue
}

tctx.L().Error("execute statement failed",
tctx.L().ErrorFilterContextCanceled("execute statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

Expand Down
11 changes: 11 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package log

import (
"context"
"fmt"

pclog "github.com/pingcap/log"
Expand Down Expand Up @@ -72,6 +73,16 @@ func (l Logger) WithFields(fields ...zap.Field) Logger {
return Logger{l.With(fields...)}
}

// ErrorFilterContextCanceled wraps Logger.Error() and will filter error log when error is context.Canceled
func (l Logger) ErrorFilterContextCanceled(msg string, fields ...zap.Field) {
for _, field := range fields {
if field.Key == "error" && field.String == context.Canceled.Error() {
return
}
}
l.Logger.Error(msg, fields...)
}

// logger for DM
var (
appLogger = Logger{zap.NewNop()}
Expand Down
7 changes: 7 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package utils

import (
"context"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
return false
}

// IsContextCanceledError checks whether err is context.Canceled
func IsContextCanceledError(err error) bool {
return errors.Cause(err) == context.Canceled
}

// IsBuildInSkipDDL return true when checked sql that will be skipped for syncer
func IsBuildInSkipDDL(sql string) bool {
return builtInSkipDDLPatterns.FindStringIndex(sql) != nil
Expand Down
4 changes: 2 additions & 2 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
)

if err != nil {
tctx.L().Error("query statement failed after retry",
tctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -266,7 +266,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
})

if err != nil {
tctx.L().Error("execute statements failed after retry",
tctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
4 changes: 3 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,9 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn,
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
Expand Down
12 changes: 7 additions & 5 deletions syncer/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} {

errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors))
for _, ctx := range s.execErrors.errors {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
if !utils.IsContextCanceledError(ctx.err) {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
}
}

return &pb.SyncError{Errors: errors}
Expand Down