Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader: fix loader lost data when quickly starts/stops task #1378

Merged
merged 2 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab

tctx := tcontext.NewContext(ctx, w.logger)
err2 = w.checkPoint.Init(tctx, baseFile, finfo.Size())
failpoint.Inject("WaitLoaderStopAfterInitCheckpoint", func(v failpoint.Value) {
t := v.(int)
w.logger.Info("wait loader stop after init checkpoint")
w.wg.Add(1)
time.Sleep(time.Duration(t) * time.Second)
w.wg.Done()
})

if err2 != nil {
w.logger.Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err2))
return err2
Expand Down Expand Up @@ -649,6 +657,14 @@ func (l *Loader) Restore(ctx context.Context) error {
return err
}

failpoint.Inject("WaitLoaderStopBeforeLoadCheckpoint", func(v failpoint.Value) {
t := v.(int)
l.logger.Info("wait loader stop before load checkpoint")
l.wg.Add(1)
time.Sleep(time.Duration(t) * time.Second)
l.wg.Done()
})

// not update checkpoint in memory when restoring, so when re-Restore, we need to load checkpoint from DB
err := l.checkPoint.Load(tcontext.NewContext(ctx, l.logger))
if err != nil {
Expand All @@ -657,6 +673,7 @@ func (l *Loader) Restore(ctx context.Context) error {
err = l.checkPoint.CalcProgress(l.db2Tables)
if err != nil {
l.logger.Error("calc load process", log.ShortError(err))
return err
}
l.loadFinishedSize()

Expand Down Expand Up @@ -686,6 +703,9 @@ func (l *Loader) Restore(ctx context.Context) error {
if err == nil {
l.finish.Set(true)
l.logger.Info("all data files have been finished", zap.Duration("cost time", time.Since(begin)))
if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() {
l.cleanDumpFiles()
}
} else if errors.Cause(err) != context.Canceled {
return err
}
Expand Down Expand Up @@ -714,9 +734,6 @@ func (l *Loader) Close() {
if err != nil {
l.logger.Error("close downstream DB error", log.ShortError(err))
}
if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() {
l.cleanDumpFiles()
}
l.checkPoint.Close()
l.removeLabelValuesWithTaskInMetrics(l.cfg.Name)
l.closed.Set(true)
Expand Down
65 changes: 65 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,77 @@ function test_query_timeout(){
export GO_FAILPOINTS=''
}

function test_stop_task_before_checkpoint(){
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopAfterInitCheckpoint=return(5)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# generate uncomplete checkpoint
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"
check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'wait loader stop after init checkpoint' $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

# restart dm-worker
pkill -9 dm-worker.test 2>/dev/null || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

export GO_FAILPOINTS='github.com/pingcap/dm/loader/WaitLoaderStopBeforeLoadCheckpoint=return(5)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# stop-task before load checkpoint
dmctl_start_task "$cur/conf/dm-task.yaml"
check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'wait loader stop before load checkpoint' $WORK_DIR/worker2/log/dm-worker.log
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

dmctl_start_task "$cur/conf/dm-task.yaml"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'"

test_session_config

test_query_timeout

test_stop_task_before_checkpoint

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"

Expand Down