Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
loader/syncer: filter context cancel error while executing sqls (#355) (
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored and csuzhangxc committed Nov 29, 2019
1 parent 751f8fd commit 60a2688
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 14 deletions.
4 changes: 2 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
return ret, err
})
if err != nil {
ctx.L().Error("query statement failed after retry",
ctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -169,7 +169,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
})

if err != nil {
ctx.L().Error("execute statements failed after retry",
ctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
6 changes: 4 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -174,7 +174,9 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if err != nil {
// expect pause rather than exit
err = terror.WithScope(terror.Annotatef(err, "file %s", job.file), terror.ScopeDownstream)
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
return
}
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down
4 changes: 2 additions & 2 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
rows, err := conn.DBConn.QueryContext(tctx.Context(), query, args...)

if err != nil {
tctx.L().Error("query statement failed",
tctx.L().ErrorFilterContextCanceled("query statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -144,7 +144,7 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
continue
}

tctx.L().Error("execute statement failed",
tctx.L().ErrorFilterContextCanceled("execute statement failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

Expand Down
21 changes: 21 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package log

import (
"context"
"fmt"
"strings"

"github.com/pingcap/errors"
pclog "github.com/pingcap/log"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,6 +75,24 @@ func (l Logger) WithFields(fields ...zap.Field) Logger {
return Logger{l.With(fields...)}
}

// ErrorFilterContextCanceled wraps Logger.Error() and will filter error log when error is context.Canceled
func (l Logger) ErrorFilterContextCanceled(msg string, fields ...zap.Field) {
for _, field := range fields {
switch field.Type {
case zapcore.StringType:
if field.Key == "error" && strings.Contains(field.String, context.Canceled.Error()) {
return
}
case zapcore.ErrorType:
err, ok := field.Interface.(error)
if ok && errors.Cause(err) == context.Canceled {
return
}
}
}
l.Logger.Error(msg, fields...)
}

// logger for DM
var (
appLogger = Logger{zap.NewNop()}
Expand Down
67 changes: 67 additions & 0 deletions pkg/log/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package log

import (
"context"
"testing"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

func TestLog(t *testing.T) {
TestingT(t)
}

type testLogSuite struct{}

var _ = Suite(&testLogSuite{})

func (s *testLogSuite) TestTestLogger(c *C) {
logger, buffer := makeTestLogger()
logger.Warn("the message", zap.Int("number", 123456), zap.Ints("array", []int{7, 8, 9}))
c.Assert(
buffer.Stripped(), Equals,
`{"$lvl":"WARN","$msg":"the message","number":123456,"array":[7,8,9]}`,
)
buffer.Reset()
logger.ErrorFilterContextCanceled("the message", zap.Int("number", 123456),
zap.Ints("array", []int{7, 8, 9}), zap.Error(context.Canceled))
c.Assert(buffer.Stripped(), Equals, "")
buffer.Reset()
logger.ErrorFilterContextCanceled("the message", zap.Int("number", 123456),
zap.Ints("array", []int{7, 8, 9}), ShortError(errors.Annotate(context.Canceled, "extra info")))
c.Assert(buffer.Stripped(), Equals, "")
}

// makeTestLogger creates a Logger instance which produces JSON logs.
func makeTestLogger() (Logger, *zaptest.Buffer) {
buffer := new(zaptest.Buffer)
logger := zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(zapcore.EncoderConfig{
LevelKey: "$lvl",
MessageKey: "$msg",
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}),
buffer,
zap.DebugLevel,
))
return Logger{Logger: logger}, buffer
}
7 changes: 7 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package utils

import (
"context"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -152,6 +154,11 @@ func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
return false
}

// IsContextCanceledError checks whether err is context.Canceled
func IsContextCanceledError(err error) bool {
return errors.Cause(err) == context.Canceled
}

// IsBuildInSkipDDL return true when checked sql that will be skipped for syncer
func IsBuildInSkipDDL(sql string) bool {
return builtInSkipDDLPatterns.FindStringIndex(sql) != nil
Expand Down
4 changes: 2 additions & 2 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
)

if err != nil {
tctx.L().Error("query statement failed after retry",
tctx.L().ErrorFilterContextCanceled("query statement failed after retry",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down Expand Up @@ -266,7 +266,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
})

if err != nil {
tctx.L().Error("execute statements failed after retry",
tctx.L().ErrorFilterContextCanceled("execute statements failed after retry",
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)),
log.ShortError(err))
Expand Down
4 changes: 3 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,9 @@ func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *DBConn,
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
if !utils.IsContextCanceledError(err) {
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, err)
}
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
Expand Down
12 changes: 7 additions & 5 deletions syncer/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func (s *Syncer) Error() interface{} {

errors := make([]*pb.SyncSQLError, 0, len(s.execErrors.errors))
for _, ctx := range s.execErrors.errors {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
if !utils.IsContextCanceledError(ctx.err) {
errors = append(errors, &pb.SyncSQLError{
Msg: ctx.err.Error(),
FailedBinlogPosition: fmt.Sprintf("%s:%d", ctx.pos.Name, ctx.pos.Pos),
ErrorSQL: ctx.jobs,
})
}
}

return &pb.SyncError{Errors: errors}
Expand Down

0 comments on commit 60a2688

Please sign in to comment.