Skip to content

Commit

Permalink
cherry pick pingcap#1378 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
GMHDBJD authored and ti-srebot committed Jan 19, 2021
1 parent b4265f1 commit 9103de9
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
23 changes: 20 additions & 3 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab

tctx := tcontext.NewContext(ctx, w.logger)
err2 = w.checkPoint.Init(tctx, baseFile, finfo.Size())
failpoint.Inject("WaitLoaderStopAfterInitCheckpoint", func(v failpoint.Value) {
t := v.(int)
w.logger.Info("wait loader stop after init checkpoint")
w.wg.Add(1)
time.Sleep(time.Duration(t) * time.Second)
w.wg.Done()
})

if err2 != nil {
w.logger.Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err2))
return err2
Expand Down Expand Up @@ -649,6 +657,14 @@ func (l *Loader) Restore(ctx context.Context) error {
return err
}

failpoint.Inject("WaitLoaderStopBeforeLoadCheckpoint", func(v failpoint.Value) {
t := v.(int)
l.logger.Info("wait loader stop before load checkpoint")
l.wg.Add(1)
time.Sleep(time.Duration(t) * time.Second)
l.wg.Done()
})

// not update checkpoint in memory when restoring, so when re-Restore, we need to load checkpoint from DB
err := l.checkPoint.Load(tcontext.NewContext(ctx, l.logger))
if err != nil {
Expand All @@ -657,6 +673,7 @@ func (l *Loader) Restore(ctx context.Context) error {
err = l.checkPoint.CalcProgress(l.db2Tables)
if err != nil {
l.logger.Error("calc load process", log.ShortError(err))
return err
}
l.loadFinishedSize()

Expand Down Expand Up @@ -686,6 +703,9 @@ func (l *Loader) Restore(ctx context.Context) error {
if err == nil {
l.finish.Set(true)
l.logger.Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() {
l.cleanDumpFiles()
}
} else if errors.Cause(err) != context.Canceled {
return err
}
Expand Down Expand Up @@ -714,9 +734,6 @@ func (l *Loader) Close() {
if err != nil {
l.logger.Error("close downstream DB error", log.ShortError(err))
}
if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() {
l.cleanDumpFiles()
}
l.checkPoint.Close()
l.removeLabelValuesWithTaskInMetrics(l.cfg.Name)
l.closed.Set(true)
Expand Down
65 changes: 65 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,77 @@ function test_query_timeout(){
export GO_FAILPOINTS=''
}

function test_stop_task_before_checkpoint(){
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopAfterInitCheckpoint=return(5)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# generate uncomplete checkpoint
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"
check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

# restart dm-worker
pkill -9 dm-worker.test 2>/dev/null || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopBeforeLoadCheckpoint=return(5)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# stop-task before load checkpoint
dmctl_start_task "$cur/conf/dm-task.yaml"
check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

dmctl_start_task "$cur/conf/dm-task.yaml"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"

test_session_config

test_query_timeout

test_stop_task_before_checkpoint

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"

Expand Down

0 comments on commit 9103de9

Please sign in to comment.