Skip to content

Commit

Permalink
*: try check every returned err are handled (pingcap#1817)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and ti-chi-bot committed Jul 6, 2021
1 parent 44b369c commit 695a021
Show file tree
Hide file tree
Showing 20 changed files with 149 additions and 40 deletions.
5 changes: 5 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ ErrConfigExprFilterManyExpr,[code=20039:class=config:scope=internal:level=high],
ErrConfigExprFilterNotFound,[code=20040:class=config:scope=internal:level=high], "Message: mysql-instance(%d)'s expression-filters %s not exist in expression-filter, Workaround: Please check the `expression-filters` config in task configuration file."
ErrConfigExprFilterWrongGrammar,[code=20041:class=config:scope=internal:level=high], "Message: expression-filter name(%s) SQL(%s) has wrong grammar: %v, Workaround: Please check the `expression-filters` config in task configuration file."
ErrConfigExprFilterEmptyName,[code=20042:class=config:scope=internal:level=high], "Message: expression-filter %s has empty %s, Workaround: Please check the `expression-filters` config in task configuration file."
ErrConfigCheckerMaxTooSmall,[code=20043:class=config:scope=internal:level=high], "Message: `backoff-max` value %v is less than `backoff-min` value %v, Workaround: Please increase `backoff-max` config in task configuration file."
ErrConfigGenBAList,[code=20044:class=config:scope=internal:level=high], "Message: generate block allow list error, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrConfigGenTableRouter,[code=20045:class=config:scope=internal:level=high], "Message: generate table router error, Workaround: Please check the `routes` config in task configuration file."
ErrConfigGenColumnMapping,[code=20046:class=config:scope=internal:level=high], "Message: generate column mapping error, Workaround: Please check the `column-mappings` config in task configuration file."
ErrConfigInvalidChunkFileSize,[code=20047:class=config:scope=internal:level=high], "Message: invalid `chunk-filesize` %v, Workaround: Please check the `chunk-filesize` config in task configuration file."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
2 changes: 1 addition & 1 deletion checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string, e
}

for _, cfg := range cfgs {
// we have verify it in subtask config
// we have verify it in SubTaskConfig.Adjust
replica, _ := cfg.DecryptPassword()
c.instances = append(c.instances, &mysqlInstance{
cfg: replica,
Expand Down
4 changes: 4 additions & 0 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (c *SourceConfig) Verify() error {
return terror.ErrConfigBinlogEventFilter.Delegate(err)
}

if c.Checker.BackoffMax.Duration < c.Checker.BackoffMin.Duration {
return terror.ErrConfigCheckerMaxTooSmall.Generate(c.Checker.BackoffMax.Duration, c.Checker.BackoffMin.Duration)
}

return nil
}

Expand Down
21 changes: 20 additions & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/dumpling"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -248,7 +249,7 @@ func (c *SubTaskConfig) Decode(data string, verifyDecryptPassword bool) error {
return c.Adjust(verifyDecryptPassword)
}

// Adjust adjusts configs.
// Adjust adjusts and verifies configs.
func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if c.Name == "" {
return terror.ErrConfigTaskNameEmpty.Generate()
Expand Down Expand Up @@ -308,6 +309,24 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
c.BAList = c.BWList
}

if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil {
return terror.ErrConfigGenBAList.Delegate(err)
}
if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil {
return terror.ErrConfigGenTableRouter.Delegate(err)
}
// NewMapping will fill arguments with the default values.
if _, err := column.NewMapping(c.CaseSensitive, c.ColumnMappingRules); err != nil {
return terror.ErrConfigGenColumnMapping.Delegate(err)
}
if _, err := dumpling.ParseFileSize(c.MydumperConfig.ChunkFilesize, 0); err != nil {
return terror.ErrConfigInvalidChunkFileSize.Generate(c.MydumperConfig.ChunkFilesize)
}

// TODO: check every member
// TODO: since we checked here, we could remove other terror like ErrSyncerUnitGenBAList
// TODO: or we should check at task config and source config rather than this subtask config, to reduce duplication

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
}
routeRule1 = router.TableRule{
SchemaPattern: "db*",
TablePattern: "tbl*",
TargetSchema: "db",
}
routeRule2 = router.TableRule{
SchemaPattern: "db*",
Expand All @@ -580,8 +580,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
TargetTable: "tbl",
}
routeRule3 = router.TableRule{
SchemaPattern: "database*",
TablePattern: "table*",
SchemaPattern: "schema*",
TargetSchema: "schema",
}
routeRule4 = router.TableRule{
SchemaPattern: "schema*",
Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/master/unlock_ddl_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func unlockDDLLockFunc(cmd *cobra.Command, _ []string) error {

sources, _ := common.GetSourceArgs(cmd)
if len(sources) > 0 {
fmt.Println("shoud not specify any sources")
fmt.Println("should not specify any sources")
return errors.New("please check output to see error")
}

Expand Down
16 changes: 8 additions & 8 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,8 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR
wg.Add(1)
go func(worker *scheduler.Worker, source string) {
defer wg.Done()
resp, err3 := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
var workerResp *pb.CommonWorkerResponse
resp, err3 := worker.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
if err3 != nil {
workerResp = errorCommonWorkerResponse(err3.Error(), source, worker.BaseInfo().Name)
} else {
Expand Down Expand Up @@ -1019,8 +1019,8 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, sources []string, tas
sourceID := args[0].(string)
w, _ := args[1].(*scheduler.Worker)

resp, err := w.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
var workerStatus *pb.QueryStatusResponse
resp, err := w.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
if err != nil {
workerStatus = &pb.QueryStatusResponse{
Result: false,
Expand Down Expand Up @@ -1174,6 +1174,9 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
var (
cfgs []*config.SourceConfig
err error
resp = &pb.OperateSourceResponse{
Result: false,
}
)
switch req.Op {
case pb.SourceOp_StartSource, pb.SourceOp_UpdateSource:
Expand All @@ -1182,9 +1185,6 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
// don't check the upstream connections, because upstream may be inaccessible
cfgs, err = parseSourceConfig(req.Config)
}
resp := &pb.OperateSourceResponse{
Result: false,
}
if err != nil {
resp.Msg = err.Error()
// nolint:nilerr
Expand Down Expand Up @@ -1536,8 +1536,8 @@ func (s *Server) waitOperationOk(ctx context.Context, cli *scheduler.Worker, tas
}
}
}
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
var queryResp *pb.QueryStatusResponse
resp, err := cli.SendRequest(ctx, req, s.cfg.RPCTimeout)
if err != nil {
log.L().Error("fail to query operation", zap.Int("retryNum", num), zap.String("task", taskName),
zap.String("source", sourceID), zap.Stringer("expect", expect), log.ShortError(err))
Expand Down Expand Up @@ -1896,8 +1896,8 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateSchemaRequest
},
}

resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout)
var workerResp *pb.CommonWorkerResponse
resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name)
} else {
Expand Down Expand Up @@ -2115,8 +2115,8 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (*
workerRespCh <- errorCommonWorkerResponse(fmt.Sprintf("source %s relevant worker-client not found", source), source, "")
return
}
resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout)
var workerResp *pb.CommonWorkerResponse
resp, err := worker.SendRequest(ctx, &workerReq, s.cfg.RPCTimeout)
if err != nil {
workerResp = errorCommonWorkerResponse(err.Error(), source, worker.BaseInfo().Name)
} else {
Expand Down
2 changes: 1 addition & 1 deletion dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,9 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.

// handleLock handles a single shard DDL lock.
func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error {
lockID, newDDLs, cols, err := o.lk.TrySync(o.cli, info, tts)
cfStage := optimism.ConflictNone
cfMsg := ""
lockID, newDDLs, cols, err := o.lk.TrySync(o.cli, info, tts)
switch {
case info.IgnoreConflict:
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
Expand Down
9 changes: 6 additions & 3 deletions dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/conn"
dutils "github.com/pingcap/dm/pkg/dumpling"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -58,11 +59,13 @@ func NewDumpling(cfg *config.SubTaskConfig) *Dumpling {
// Init implements Unit.Init.
func (m *Dumpling) Init(ctx context.Context) error {
var err error
m.dumpConfig, err = m.constructArgs()
if m.dumpConfig, err = m.constructArgs(); err != nil {
return err
}
m.detectSQLMode(ctx)
m.dumpConfig.SessionParams["time_zone"] = "+00:00"
m.logger.Info("create dumpling", zap.Stringer("config", m.dumpConfig))
return err
return nil
}

// Process implements Unit.Process.
Expand Down Expand Up @@ -229,7 +232,7 @@ func (m *Dumpling) constructArgs() (*export.Config, error) {
dumpConfig.Threads = cfg.Threads
}
if cfg.ChunkFilesize != "" {
dumpConfig.FileSize, err = parseFileSize(cfg.ChunkFilesize)
dumpConfig.FileSize, err = dutils.ParseFileSize(cfg.ChunkFilesize, export.UnspecifiedSize)
if err != nil {
m.logger.Warn("parsed some unsupported arguments", zap.Error(err))
return nil, err
Expand Down
19 changes: 2 additions & 17 deletions dumpling/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package dumpling
import (
"errors"
"fmt"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/dumpling/v4/export"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/spf13/pflag"

dutils "github.com/pingcap/dm/pkg/dumpling"
"github.com/pingcap/dm/pkg/log"
)

Expand Down Expand Up @@ -96,7 +95,7 @@ func parseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e
}

if fileSizeStr != "" {
dumpCfg.FileSize, err = parseFileSize(fileSizeStr)
dumpCfg.FileSize, err = dutils.ParseFileSize(fileSizeStr, export.UnspecifiedSize)
if err != nil {
return err
}
Expand All @@ -114,20 +113,6 @@ func parseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e
return nil
}

func parseFileSize(fileSizeStr string) (uint64, error) {
var fileSize uint64
if len(fileSizeStr) == 0 {
fileSize = export.UnspecifiedSize
} else if fileSizeMB, err := strconv.ParseUint(fileSizeStr, 10, 64); err == nil {
fileSize = fileSizeMB * units.MiB
} else if size, err := units.RAMInBytes(fileSizeStr); err == nil {
fileSize = uint64(size)
} else {
return 0, err
}
return fileSize, nil
}

// parseTableFilter parses `--tables-list` and `--filter`.
// copy (and update) from https://github.com/pingcap/dumpling/blob/6f74c686e54183db7b869775af1c32df46462a6a/cmd/dumpling/main.go#L222.
func parseTableFilter(tablesList, filters []string) (filter.Filter, error) {
Expand Down
30 changes: 30 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,36 @@ description = ""
workaround = "Please check the `expression-filters` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20043]
message = "`backoff-max` value %v is less than `backoff-min` value %v"
description = ""
workaround = "Please increase `backoff-max` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20044]
message = "generate block allow list error"
description = ""
workaround = "Please check the `block-allow-list` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20045]
message = "generate table router error"
description = ""
workaround = "Please check the `routes` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20046]
message = "generate column mapping error"
description = ""
workaround = "Please check the `column-mappings` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20047]
message = "invalid `chunk-filesize` %v"
description = ""
workaround = "Please check the `chunk-filesize` config in task configuration file."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
3 changes: 1 addition & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,11 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
// do nothing
}
line, err := br.ReadString('\n')
cur += int64(len(line))

if err == io.EOF {
w.logger.Info("data are scanned finished.", zap.String("data file", file), zap.Int64("offset", offset))
break
}
cur += int64(len(line))

realLine := strings.TrimSpace(line[:len(line)-1])
if len(realLine) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g
return nil, terror.ErrBinlogMariaDBServerIDMismatch.Generate(mariaGTID.ServerID, header.ServerID)
}
gtidEv, err = GenMariaDBGTIDEvent(header, latestPos, mariaGTID.SequenceNumber, mariaGTID.DomainID)
if err != nil {
return gtidEv, err
}
// in go-mysql, set ServerID in parseEvent. we try to set it directly
gtidEvBody := gtidEv.Event.(*replication.MariadbGTIDEvent)
gtidEvBody.GTID.ServerID = header.ServerID
Expand Down
16 changes: 16 additions & 0 deletions pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/go-mysql-org/go-mysql/mysql"

"github.com/pingcap/dm/pkg/binlog"
Expand Down Expand Up @@ -184,3 +185,18 @@ func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
following.WriteString(line)
}
}

// ParseFileSize parses the size in MiB from input.
func ParseFileSize(fileSizeStr string, defaultSize uint64) (uint64, error) {
var fileSize uint64
if len(fileSizeStr) == 0 {
fileSize = defaultSize
} else if fileSizeMB, err := strconv.ParseUint(fileSizeStr, 10, 64); err == nil {
fileSize = fileSizeMB * units.MiB
} else if size, err := units.RAMInBytes(fileSizeStr); err == nil {
fileSize = uint64(size)
} else {
return 0, err
}
return fileSize, nil
}
3 changes: 3 additions & 0 deletions pkg/ha/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig,
var rev2 int64
sourceResp := txnResp.Responses[0].GetResponseRange()
newSource, rev2, err = getSourceIDFromResp((*clientv3.GetResponse)(sourceResp))
if err != nil {
return nil, 0, err
}

if newSource != source {
log.L().Warn("relay config has been changed, will take a retry",
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,10 @@ func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer,
firstParse = true // new relay log file need to parse
}
needSwitch, latestPos, nextUUID, nextBinlogName, err = r.parseFileAsPossible(ctx, s, relayLogFile, offset, dir, firstParse, currentUUID, i == len(files)-1)
firstParse = false // already parsed
if err != nil {
return false, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, offset, dir)
}
firstParse = false // already parsed
if needSwitch {
// need switch to next relay sub directory
return true, nextUUID, nextBinlogName, nil
Expand Down Expand Up @@ -408,10 +408,10 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
default:
}
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat)
firstParse = false // set to false to handle the `continue` below
if err != nil {
return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir)
}
firstParse = false // set to false to handle the `continue` below
if needReParse {
r.tctx.L().Debug("continue to re-parse relay log file", zap.String("file", relayLogFile), zap.String("directory", relayLogDir))
continue // should continue to parse this file
Expand Down
Loading

0 comments on commit 695a021

Please sign in to comment.