From 86e7e4c511b34e9fa6dd1a74ceacdeefa4d77abf Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 4 Nov 2020 11:05:33 +0800 Subject: [PATCH] fix: do not block in load unit for stop/pause task (#1265) (#1269) Signed-off-by: ti-srebot Co-authored-by: Xuecheng Zhang --- loader/loader.go | 33 +++-- .../conf/diff_config.toml | 25 ++++ tests/import_goroutine_leak/conf/dm-task.yaml | 6 - .../conf/dm-worker2.toml | 2 - tests/import_goroutine_leak/conf/source2.yaml | 10 -- tests/import_goroutine_leak/run.sh | 113 +++++++++++------- 6 files changed, 121 insertions(+), 68 deletions(-) create mode 100644 tests/import_goroutine_leak/conf/diff_config.toml delete mode 100644 tests/import_goroutine_leak/conf/dm-worker2.toml delete mode 100644 tests/import_goroutine_leak/conf/source2.yaml diff --git a/loader/loader.go b/loader/loader.go index b25c78ef6d..a2c5a5c1de 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -95,10 +95,9 @@ type Worker struct { } // NewWorker returns a Worker. -func NewWorker(loader *Loader, id int) (worker *Worker, err error) { +func NewWorker(loader *Loader, id int) *Worker { ctctx := loader.logCtx.WithLogger(loader.logCtx.L().WithFields(zap.Int("worker ID", id))) - - return &Worker{ + w := &Worker{ id: id, cfg: loader.cfg, checkPoint: loader.checkPoint, @@ -106,7 +105,15 @@ func NewWorker(loader *Loader, id int) (worker *Worker, err error) { jobQueue: make(chan *dataJob, jobCount), loader: loader, tctx: ctctx, - }, nil + } + + failpoint.Inject("workerChanSize", func(val failpoint.Value) { + size := val.(int) + w.tctx.L().Info("", zap.String("failpoint", "workerChanSize"), zap.Int("size", size)) + w.jobQueue = make(chan *dataJob, size) + }) + + return w } // Close closes worker @@ -142,6 +149,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh ctctx := w.tctx.WithContext(newCtx) doJob := func() { + hasError := false for { select { case <-newCtx.Done(): @@ -156,6 +164,10 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh w.tctx.L().Info("jobs are finished, execution goroutine exits") return } + if hasError { + continue // continue to read so than the sender will not be blocked + } + sqls := make([]string, 0, 3) sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(job.schema, w.tctx.L()))) sqls = append(sqls, job.sql) @@ -184,7 +196,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh if !utils.IsContextCanceledError(err) { runFatalChan <- unit.NewProcessError(err) } - return + hasError = true + failpoint.Inject("returnDoJobError", func(_ failpoint.Value) { + w.tctx.L().Info("", zap.String("failpoint", "returnDoJobError")) + failpoint.Return() + }) + continue } w.loader.checkPoint.UpdateOffset(job.file, job.offset) w.loader.finishedDataSize.Add(job.offset - job.lastOffset) @@ -844,11 +861,7 @@ func (l *Loader) genRouter(rules []*router.TableRule) error { func (l *Loader) initAndStartWorkerPool(ctx context.Context) error { for i := 0; i < l.cfg.PoolSize; i++ { - worker, err := NewWorker(l, i) - if err != nil { - return err - } - + worker := NewWorker(l, i) l.workerWg.Add(1) // for every worker goroutine, Add(1) go func() { defer l.workerWg.Done() diff --git a/tests/import_goroutine_leak/conf/diff_config.toml b/tests/import_goroutine_leak/conf/diff_config.toml new file mode 100644 index 0000000000..0fa733e1a4 --- /dev/null +++ b/tests/import_goroutine_leak/conf/diff_config.toml @@ -0,0 +1,25 @@ +[[check-tables]] +schema = "import_goroutine_leak" +tables = ["~t.*"] + +[[table-config]] +schema = "import_goroutine_leak" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "import_goroutine_leak" +table = "t1" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +instance-id = "source-1" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/tests/import_goroutine_leak/conf/dm-task.yaml b/tests/import_goroutine_leak/conf/dm-task.yaml index dd060f6bf1..232e2b27ab 100644 --- a/tests/import_goroutine_leak/conf/dm-task.yaml +++ b/tests/import_goroutine_leak/conf/dm-task.yaml @@ -19,12 +19,6 @@ mysql-instances: loader-config-name: "global" syncer-config-name: "global" - - source-id: "mysql-replica-02" - block-allow-list: "instance" - mydumper-config-name: "global" - loader-config-name: "global" - syncer-config-name: "global" - block-allow-list: instance: do-dbs: ["import_goroutine_leak"] diff --git a/tests/import_goroutine_leak/conf/dm-worker2.toml b/tests/import_goroutine_leak/conf/dm-worker2.toml deleted file mode 100644 index 010e21c73e..0000000000 --- a/tests/import_goroutine_leak/conf/dm-worker2.toml +++ /dev/null @@ -1,2 +0,0 @@ -name = "worker2" -join = "127.0.0.1:8261" diff --git a/tests/import_goroutine_leak/conf/source2.yaml b/tests/import_goroutine_leak/conf/source2.yaml deleted file mode 100644 index bd68886439..0000000000 --- a/tests/import_goroutine_leak/conf/source2.yaml +++ /dev/null @@ -1,10 +0,0 @@ -source-id: mysql-replica-02 -flavor: '' -enable-gtid: false -relay-binlog-name: '' -relay-binlog-gtid: '' -from: - host: 127.0.0.1 - user: root - password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= - port: 3307 diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index 5235f906bc..a192007b96 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -6,17 +6,17 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +# only use one DM-worker instance to avoid re-schedule after restart process. + COUNT=200 function prepare_datafile() { - for i in $(seq 2); do - data_file="$WORK_DIR/db$i.prepare.sql" - echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file - echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file - echo 'USE import_goroutine_leak;' >> $data_file - echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file - for j in $(seq $COUNT); do - echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file - done + data_file="$WORK_DIR/db1.prepare.sql" + echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file + echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file + echo 'USE import_goroutine_leak;' >> $data_file + echo "CREATE TABLE t1(i TINYINT, j INT UNIQUE KEY);" >> $data_file + for j in $(seq $COUNT); do + echo "INSERT INTO t1 VALUES (1,${j}0001),(1,${j}0011);" >> $data_file done } @@ -24,13 +24,13 @@ function run() { prepare_datafile run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 echo "dm-worker panic, doJob of import unit workers don't exit" - # check doJobs of import unit worker exit - inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" - "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + # send to closed `runFatalChan` + inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/dispatchError=return(1)" "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/returnDoJobError=return(1)" "github.com/pingcap/dm/loader/workerCantClose=return(1)" ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" @@ -39,23 +39,16 @@ function run() { check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT # operate mysql config to worker cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml - cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml - sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/dm-task.yaml" \ "\"source\": \"$SOURCE_ID1\"" 1 \ - "\"source\": \"$SOURCE_ID2\"" 1 \ check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 # dm-worker1 panics err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` @@ -64,27 +57,19 @@ function run() { exit 2 fi - echo "dm-workers panic again, workers of import unit don't exit" - # check workers of import unit exit - inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" - "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" - "github.com/pingcap/dm/loader/executeSQLError=return(1)" + echo "dm-worker panic again, workers of import unit don't exit" + # send to closed `runFatalChan` + inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" "github.com/pingcap/dm/loader/dispatchError=return(1)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/returnDoJobError=return(1)" + "github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - - echo "start task after restarted dm-worker" - # TODO: check whether dm-worker has restarted and continued the subtask - # run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - # "start-task $cur/conf/dm-task.yaml" \ - # "\"result\": true" 1 \ - # "start sub task test: sub task test already exists" 2 - sleep 2s check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + sleep 2 # dm-worker1 panics err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l` @@ -94,18 +79,66 @@ function run() { exit 2 fi - # check workers of import unit exit - inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" - "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + echo "restart dm-workers with errros to pause" + # paused with injected error + inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/dispatchError=return(1)" "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/returnDoJobError=return(1)" ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "inject failpoint dispatchError" 1 + + echo "restart dm-workers block in sending to chan" + ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true + check_port_offline $WORKER1_PORT 20 + + # use a small job chan size to block the sender + inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" + "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/returnDoJobError=return(1)" + "github.com/pingcap/dm/loader/workerChanSize=return(10)" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # wait until the task running + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"stage": "Running"' 1 + sleep 2 # wait to be blocked + + # check to be blocked + curl -X POST 127.0.0.1:$WORKER1_PORT/debug/pprof/goroutine?debug=2 > $WORK_DIR/goroutine.worker1 + check_log_contains $WORK_DIR/goroutine.worker1 "chan send" + + # try to kill, but can't kill (NOTE: the port will be shutdown, but the process still exists) + ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true + sleep 5 + worker_cnt=`ps aux | grep dm-worker | grep -v "grep" | wc -l` + if [ $worker_cnt -lt 1 ]; then + echo "some dm-workers exit, remain count ${worker_cnt}" + exit 2 + fi + + echo "force to restart dm-workers without errors" + ps aux | grep dm-worker | grep -v "grep" |awk '{print $2}'|xargs kill -9 || true export GO_FAILPOINTS='' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"stage": "Finished"' 1 + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } cleanup_data import_goroutine_leak