From decdcd4d32c793b4ed024c5a09a64282619c181a Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 19 Jan 2021 11:56:21 +0800 Subject: [PATCH] cherry pick #1378 to release-2.0 (#1389) --- loader/loader.go | 23 +++++++++++++-- tests/all_mode/run.sh | 65 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/loader/loader.go b/loader/loader.go index 2ee7ed402b..32cfbf55d8 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -282,6 +282,14 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab tctx := tcontext.NewContext(ctx, w.logger) err2 = w.checkPoint.Init(tctx, baseFile, finfo.Size()) + failpoint.Inject("WaitLoaderStopAfterInitCheckpoint", func(v failpoint.Value) { + t := v.(int) + w.logger.Info("wait loader stop after init checkpoint") + w.wg.Add(1) + time.Sleep(time.Duration(t) * time.Second) + w.wg.Done() + }) + if err2 != nil { w.logger.Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err2)) return err2 @@ -649,6 +657,14 @@ func (l *Loader) Restore(ctx context.Context) error { return err } + failpoint.Inject("WaitLoaderStopBeforeLoadCheckpoint", func(v failpoint.Value) { + t := v.(int) + l.logger.Info("wait loader stop before load checkpoint") + l.wg.Add(1) + time.Sleep(time.Duration(t) * time.Second) + l.wg.Done() + }) + // not update checkpoint in memory when restoring, so when re-Restore, we need to load checkpoint from DB err := l.checkPoint.Load(tcontext.NewContext(ctx, l.logger)) if err != nil { @@ -657,6 +673,7 @@ func (l *Loader) Restore(ctx context.Context) error { err = l.checkPoint.CalcProgress(l.db2Tables) if err != nil { l.logger.Error("calc load process", log.ShortError(err)) + return err } l.loadFinishedSize() @@ -686,6 +703,9 @@ func (l *Loader) Restore(ctx context.Context) error { if err == nil { l.finish.Set(true) l.logger.Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) + if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() { + l.cleanDumpFiles() + } } else if errors.Cause(err) != context.Canceled { return err } @@ -714,9 +734,6 @@ func (l *Loader) Close() { if err != nil { l.logger.Error("close downstream DB error", log.ShortError(err)) } - if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() { - l.cleanDumpFiles() - } l.checkPoint.Close() l.removeLabelValuesWithTaskInMetrics(l.cfg.Name) l.closed.Set(true) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 9b1fab4086..45e6c84ad4 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -105,12 +105,77 @@ function test_query_timeout(){ export GO_FAILPOINTS='' } +function test_stop_task_before_checkpoint(){ + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopAfterInitCheckpoint=return(5)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + # generate uncomplete checkpoint + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + # restart dm-worker + pkill -9 dm-worker.test 2>/dev/null || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopBeforeLoadCheckpoint=return(5)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # stop-task before load checkpoint + dmctl_start_task "$cur/conf/dm-task.yaml" + check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + dmctl_start_task "$cur/conf/dm-task.yaml" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + cleanup_data all_mode + cleanup_process $* + + export GO_FAILPOINTS='' +} + function run() { run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" test_session_config test_query_timeout + + test_stop_task_before_checkpoint export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"