Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): implement start-task --start-time #4485

Merged
merged 12 commits into from
Feb 14, 2022
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low],
ErrConfigCollationCompatibleNotSupport,[code=20052:class=config:scope=internal:level=medium], "Message: collation compatible %s not supported, Workaround: Please check the `collation_compatible` config in task configuration file, which can be set to `loose`/`strict`."
ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium], "Message: invalid load mode '%s', Workaround: Please choose a valid value in ['sql', 'loader']"
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']"
ErrConfigStartTimeTooLate,[code=20055:class=config:scope=internal:level=high], "Message: start-time %s is too late, no binlog location matches it, Workaround: Please check the `--start-time` is expected or try again later."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
9 changes: 8 additions & 1 deletion dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,14 @@ const (
exprFilterIdx
)

// adjust adjusts and verifies config.
// Adjust adjusts and verifies config.
func (c *TaskConfig) Adjust() error {
if c == nil {
return terror.ErrConfigYamlTransform.New("task config is nil")
}
return c.adjust()
}

func (c *TaskConfig) adjust() error {
if len(c.Name) == 0 {
return terror.ErrConfigNeedUniqueTaskName.Generate()
Expand Down
11 changes: 8 additions & 3 deletions dm/dm/config/task_cli_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
)

const (
StartTimeFormat = "2006-01-02 15:04:05"
StartTimeFormat2 = "2006-01-02T15:04:05"
)

// TaskCliArgs is the task command line arguments, these arguments have higher priority than the config file and
// downstream checkpoint, but may need to be removed after the first time they take effect.
type TaskCliArgs struct {
Expand All @@ -46,10 +51,10 @@ func (t *TaskCliArgs) Verify() error {
if t.StartTime == "" {
return nil
}
_, err := time.Parse("2006-01-02 15:04:05", t.StartTime)
_, err := time.Parse(StartTimeFormat, t.StartTime)
if err == nil {
return nil
}
_, err = time.Parse("2006-01-02T15:04:05", t.StartTime)
return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05'")
_, err = time.Parse(StartTimeFormat2, t.StartTime)
return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05' or '2006-01-02T15:04:05'")
}
12 changes: 10 additions & 2 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/master/workerrpc"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/unit"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/cputil"
Expand Down Expand Up @@ -1545,14 +1546,21 @@ func (s *Server) generateSubTask(
task string,
cliArgs *config.TaskCliArgs,
) (*config.TaskConfig, []*config.SubTaskConfig, error) {
var err error
cfg := config.NewTaskConfig()
// bypass the meta check by set any value. If start-time is specified, DM-worker will not use meta field.
if cliArgs != nil && cliArgs.StartTime != "" {
err = cfg.RawDecode(task)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
for _, inst := range cfg.MySQLInstances {
inst.Meta = &config.Meta{BinLogName: cliArgs.StartTime}
inst.Meta = &config.Meta{BinLogName: binlog.FakeBinlogName}
}
err = cfg.Adjust()
} else {
err = cfg.Decode(task)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set inst.Meta after cfg.Decode? So we no need RawDecode and Adjust, only depend on cfg.Decode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfg.Decode will call adjust internally, then it will report error about inst.Meta.

err := cfg.Decode(task)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.St
}

// directly put cfg into subTaskHolder
// the unique of subtask should be assured by etcd
// the uniqueness of subtask should be assured by etcd
st := NewSubTask(cfg, w.etcdClient, w.name)
w.subTaskHolder.recordSubTask(st)
if w.closed.Load() {
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,12 @@ description = ""
workaround = "Please choose a valid value in ['replace', 'error', 'ignore']"
tags = ["internal", "medium"]

[error.DM-config-20055]
message = "start-time %s is too late, no binlog location matches it"
description = ""
workaround = "Please check the `--start-time` is expected or try again later."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
3 changes: 3 additions & 0 deletions dm/pkg/binlog/pos_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"github.com/pingcap/tiflow/dm/pkg/utils"
)

// FakeBinlogName is used to bypass the checking of meta in task config when start-task with --start-time.
const FakeBinlogName = "start-task with --start-time"

type binlogPosFinder struct {
remote bool
tctx *tcontext.Context
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ const (
codeCollationCompatibleNotSupport
codeConfigInvalidLoadMode
codeConfigInvalidLoadDuplicateResolution
codeConfigStartTimeTooLate
)

// Binlog operation error code list.
Expand Down Expand Up @@ -910,6 +911,7 @@ var (
ErrConfigCollationCompatibleNotSupport = New(codeCollationCompatibleNotSupport, ClassConfig, ScopeInternal, LevelMedium, "collation compatible %s not supported", "Please check the `collation_compatible` config in task configuration file, which can be set to `loose`/`strict`.")
ErrConfigInvalidLoadMode = New(codeConfigInvalidLoadMode, ClassConfig, ScopeInternal, LevelMedium, "invalid load mode '%s'", "Please choose a valid value in ['sql', 'loader']")
ErrConfigInvalidDuplicateResolution = New(codeConfigInvalidLoadDuplicateResolution, ClassConfig, ScopeInternal, LevelMedium, "invalid load on-duplicate '%s'", "Please choose a valid value in ['replace', 'error', 'ignore']")
ErrConfigStartTimeTooLate = New(codeConfigStartTimeTooLate, ClassConfig, ScopeInternal, LevelHigh, "start-time %s is too late, no binlog location matches it", "Please check the `--start-time` is expected or try again later.")

// Binlog operation error.
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down
40 changes: 38 additions & 2 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/errors"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
"github.com/pingcap/tiflow/dm/pkg/conn"
Expand Down Expand Up @@ -227,6 +228,9 @@ type CheckPoint interface {
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error

// DeleteAllTablePoint deletes all checkpoints for table in memory and storage
DeleteAllTablePoint(tctx *tcontext.Context) error

// DeleteSchemaPoint deletes checkpoint for specified schema
DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error

Expand All @@ -237,10 +241,13 @@ type CheckPoint interface {
// corresponding to Meta.Save
SaveGlobalPoint(point binlog.Location)

// SaveGlobalPointForcibly saves the global binlog stream's checkpoint forcibly.
SaveGlobalPointForcibly(location binlog.Location)

// Snapshot make a snapshot of current checkpoint
Snapshot(isSyncFlush bool) *SnapshotInfo

// FlushGlobalPointsExcept flushes the global checkpoint and tables'
// FlushPointsExcept flushes the global checkpoint and tables'
// checkpoints except exceptTables, it also flushes SQLs with Args providing
// by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only.
// @exceptTables: [[schema, table]... ]
Expand Down Expand Up @@ -551,6 +558,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, table *filt
return nil
}

// DeleteAllTablePoint implements CheckPoint.DeleteAllTablePoint.
func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error {
cp.Lock()
defer cp.Unlock()

tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration)
defer cancel()
cp.logCtx.L().Info("delete all table checkpoint")
_, err := cp.dbConn.ExecuteSQL(
tctx2,
[]string{`DELETE FROM ` + cp.tableName + ` WHERE id = ? AND is_global = ?`},
[]interface{}{cp.id, false},
)
if err != nil {
return err
}
cp.points = make(map[string]map[string]*binlogPoint)
return nil
}

// DeleteSchemaPoint implements CheckPoint.DeleteSchemaPoint.
func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error {
cp.Lock()
Expand Down Expand Up @@ -614,7 +641,16 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location) {
}
}

// FlushPointsExcept implements CheckPoint.FlushSnapshotPointsExcept.
// SaveGlobalPointForcibly implements CheckPoint.SaveGlobalPointForcibly.
func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location) {
cp.Lock()
defer cp.Unlock()

cp.logCtx.L().Info("reset global checkpoint", zap.Stringer("location", location))
cp.globalPoint = newBinlogPoint(location, binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID)
}

// FlushPointsExcept implements CheckPoint.FlushPointsExcept.
func (cp *RemoteCheckPoint) FlushPointsExcept(
tctx *tcontext.Context,
snapshotID int,
Expand Down
103 changes: 96 additions & 7 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type Syncer struct {

cfg *config.SubTaskConfig
syncCfg replication.BinlogSyncerConfig
cliArgs *config.TaskCliArgs

sgk *ShardingGroupKeeper // keeper to keep all sharding (sub) group in this syncer
pessimist *shardddl.Pessimist // shard DDL pessimist
Expand Down Expand Up @@ -441,11 +442,13 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
}

// when Init syncer, set active relay log info
err = s.setInitActiveRelayLog(ctx)
if err != nil {
return err
if s.cfg.Meta == nil || s.cfg.Meta.BinLogName != binlog.FakeBinlogName {
err = s.setInitActiveRelayLog(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems setInitActiveRelayLog can move to sync.Run so that we don't need this check s.cfg.Meta.BinLogName != binlog.FakeBinlogName ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when Init syncer, set active relay log info

we will risk the relay log being purged between Init and Run. I prefer we don't change old logic if we don't have enough thinking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we into this branch s.cfg.Meta.BinLogName != binlog.FakeBinlogName user still have this risk

so how about make setInitActiveRelayLog support set binlog user want to start in s.Init ? i mean call binlog finder in init

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Init should not have long-running tasks by definition, it will cause whole pipeline timeout.

if err != nil {
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog})
}
rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog})

s.reset()
return nil
Expand Down Expand Up @@ -1259,6 +1262,17 @@ func (s *Syncer) afterFlushCheckpoint(task *checkpointFlushTask) error {
s.lastCheckpointFlushedTime = now

s.logAndClearFilteredStatistics()

if s.cliArgs != nil && s.cliArgs.StartTime != "" {
clone := *s.cliArgs
clone.StartTime = ""
err2 := ha.PutTaskCliArgs(s.cli, s.cfg.Name, []string{s.cfg.SourceID}, clone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use DelTaskCliBySource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have more args in near future, so for compatible I should only remove the least argument.

if err2 != nil {
s.tctx.L().Error("failed to clean start-time in task cli args", zap.Error(err2))
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
} else {
s.cliArgs.StartTime = ""
}
}
return nil
}

Expand Down Expand Up @@ -1479,11 +1493,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}()

// some initialization that can't be put in Syncer.Init
fresh, err := s.IsFreshTask(runCtx)
if err != nil {
return err
} else if fresh {
}

// task command line arguments have the highest priority
// dm-syncer and other usage may not have a etcdCli, so we check it first
skipLoadMeta := false
if s.cli != nil {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
s.cliArgs, err = ha.GetTaskCliArgs(s.cli, s.cfg.Name, s.cfg.SourceID)
if err != nil {
s.tctx.L().Error("failed to get task cli args", zap.Error(err))
}
if s.cliArgs != nil && s.cliArgs.StartTime != "" {
err = s.setGlobalPointByTime(tctx, s.cliArgs.StartTime)
if terror.ErrConfigStartTimeTooLate.Equal(err) {
return err
}
skipLoadMeta = err == nil
}
}

// some initialization that can't be put in Syncer.Init
if fresh && !skipLoadMeta {
// for fresh task, we try to load checkpoints from meta (file or config item)
err = s.checkpoint.LoadMeta()
if err != nil {
Expand Down Expand Up @@ -3690,7 +3723,8 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
// 4. location is too early thus not a COMMIT location, which happens when it's reset by other logic
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" || location.Position.Pos == 4 {
return false, nil
}
// set enableGTID to false for new streamerController
Expand Down Expand Up @@ -3782,3 +3816,58 @@ func (s *Syncer) flushOptimisticTableInfos(tctx *tcontext.Context) {
tctx.L().Error("failed to flush table points with table infos", log.ShortError(err))
}
}

func (s *Syncer) setGlobalPointByTime(tctx *tcontext.Context, timeStr string) error {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// we support two layout
t, err := time.ParseInLocation(config.StartTimeFormat, timeStr, s.timezone)
if err != nil {
t, err = time.ParseInLocation(config.StartTimeFormat2, timeStr, s.timezone)
}
if err != nil {
return err
}

var (
loc *binlog.Location
posTp binlog.PosType
)

if s.relay != nil {
subDir := s.relay.Status(nil).(*pb.RelayStatus).RelaySubDir
relayDir := path.Join(s.cfg.RelayDir, subDir)
finder := binlog.NewLocalBinlogPosFinder(tctx, s.cfg.EnableGTID, s.cfg.Flavor, relayDir)
loc, posTp, err = finder.FindByTimestamp(t.Unix())
} else {
finder := binlog.NewRemoteBinlogPosFinder(tctx, s.fromDB.BaseDB.DB, s.syncCfg, s.cfg.EnableGTID)
loc, posTp, err = finder.FindByTimestamp(t.Unix())
}
if err != nil {
s.tctx.L().Error("fail to find binlog position by timestamp",
zap.Time("time", t),
zap.Error(err))
return err
}

switch posTp {
case binlog.InRangeBinlogPos:
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
s.tctx.L().Info("find binlog position by timestamp",
zap.String("time", timeStr),
zap.Stringer("pos", loc))
case binlog.BelowLowerBoundBinlogPos:
s.tctx.L().Warn("fail to find binlog location by timestamp because the timestamp is too early, will use the earliest binlog location",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will return error better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zap.String("time", timeStr),
zap.Any("location", loc))
case binlog.AboveUpperBoundBinlogPos:
return terror.ErrConfigStartTimeTooLate.Generate(timeStr)
}

err = s.checkpoint.DeleteAllTablePoint(tctx)
if err != nil {
return err
}
s.checkpoint.SaveGlobalPointForcibly(*loc)
s.tctx.L().Info("Will replicate from the specified time, the location recorded in checkpoint and config file will be ignored",
zap.String("time", timeStr),
zap.Any("locationOfTheTime", loc))
return nil
}
1 change: 1 addition & 0 deletions dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ function run_with_prepared_source_config() {

server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}')
echo "binlog_pos: $binlog_pos relay_log_size: $relay_log_size"
[ "$binlog_pos" -eq "$relay_log_size" ]

echo "============== run_with_prepared_source_config success ==================="
Expand Down
2 changes: 2 additions & 0 deletions dm/tests/start_task/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
Loading