Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix: do not block in load unit for stop/pause task (#1265) (#1269)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: Xuecheng Zhang <csuzhangxc@gmail.com>
  • Loading branch information
ti-srebot and csuzhangxc authored Nov 4, 2020
1 parent a8a59e6 commit 86e7e4c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 68 deletions.
33 changes: 23 additions & 10 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,25 @@ type Worker struct {
}

// NewWorker returns a Worker.
func NewWorker(loader *Loader, id int) (worker *Worker, err error) {
func NewWorker(loader *Loader, id int) *Worker {
ctctx := loader.logCtx.WithLogger(loader.logCtx.L().WithFields(zap.Int("worker ID", id)))

return &Worker{
w := &Worker{
id: id,
cfg: loader.cfg,
checkPoint: loader.checkPoint,
conn: loader.toDBConns[id],
jobQueue: make(chan *dataJob, jobCount),
loader: loader,
tctx: ctctx,
}, nil
}

failpoint.Inject("workerChanSize", func(val failpoint.Value) {
size := val.(int)
w.tctx.L().Info("", zap.String("failpoint", "workerChanSize"), zap.Int("size", size))
w.jobQueue = make(chan *dataJob, size)
})

return w
}

// Close closes worker
Expand Down Expand Up @@ -142,6 +149,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
ctctx := w.tctx.WithContext(newCtx)

doJob := func() {
hasError := false
for {
select {
case <-newCtx.Done():
Expand All @@ -156,6 +164,10 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
w.tctx.L().Info("jobs are finished, execution goroutine exits")
return
}
if hasError {
continue // continue to read so than the sender will not be blocked
}

sqls := make([]string, 0, 3)
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(job.schema, w.tctx.L())))
sqls = append(sqls, job.sql)
Expand Down Expand Up @@ -184,7 +196,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(err)
}
return
hasError = true
failpoint.Inject("returnDoJobError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "returnDoJobError"))
failpoint.Return()
})
continue
}
w.loader.checkPoint.UpdateOffset(job.file, job.offset)
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down Expand Up @@ -844,11 +861,7 @@ func (l *Loader) genRouter(rules []*router.TableRule) error {

func (l *Loader) initAndStartWorkerPool(ctx context.Context) error {
for i := 0; i < l.cfg.PoolSize; i++ {
worker, err := NewWorker(l, i)
if err != nil {
return err
}

worker := NewWorker(l, i)
l.workerWg.Add(1) // for every worker goroutine, Add(1)
go func() {
defer l.workerWg.Done()
Expand Down
25 changes: 25 additions & 0 deletions tests/import_goroutine_leak/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[[check-tables]]
schema = "import_goroutine_leak"
tables = ["~t.*"]

[[table-config]]
schema = "import_goroutine_leak"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "import_goroutine_leak"
table = "t1"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
6 changes: 0 additions & 6 deletions tests/import_goroutine_leak/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ mysql-instances:
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["import_goroutine_leak"]
Expand Down
2 changes: 0 additions & 2 deletions tests/import_goroutine_leak/conf/dm-worker2.toml

This file was deleted.

10 changes: 0 additions & 10 deletions tests/import_goroutine_leak/conf/source2.yaml

This file was deleted.

113 changes: 73 additions & 40 deletions tests/import_goroutine_leak/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

# only use one DM-worker instance to avoid re-schedule after restart process.

COUNT=200
function prepare_datafile() {
for i in $(seq 2); do
data_file="$WORK_DIR/db$i.prepare.sql"
echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file
echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file
echo 'USE import_goroutine_leak;' >> $data_file
echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file
for j in $(seq $COUNT); do
echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file
done
data_file="$WORK_DIR/db1.prepare.sql"
echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file
echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file
echo 'USE import_goroutine_leak;' >> $data_file
echo "CREATE TABLE t1(i TINYINT, j INT UNIQUE KEY);" >> $data_file
for j in $(seq $COUNT); do
echo "INSERT INTO t1 VALUES (1,${j}0001),(1,${j}0011);" >> $data_file
done
}

function run() {
prepare_datafile

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

echo "dm-worker panic, doJob of import unit workers don't exit"
# check doJobs of import unit worker exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
# send to closed `runFatalChan`
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/workerCantClose=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
Expand All @@ -39,23 +39,16 @@ function run() {
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml" \
"\"source\": \"$SOURCE_ID1\"" 1 \
"\"source\": \"$SOURCE_ID2\"" 1 \

check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

# dm-worker1 panics
err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l`
Expand All @@ -64,27 +57,19 @@ function run() {
exit 2
fi

echo "dm-workers panic again, workers of import unit don't exit"
# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
echo "dm-worker panic again, workers of import unit don't exit"
# send to closed `runFatalChan`
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml

echo "start task after restarted dm-worker"
# TODO: check whether dm-worker has restarted and continued the subtask
# run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
# "start-task $cur/conf/dm-task.yaml" \
# "\"result\": true" 1 \
# "start sub task test: sub task test already exists" 2
sleep 2s

check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
sleep 2

# dm-worker1 panics
err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l`
Expand All @@ -94,18 +79,66 @@ function run() {
exit 2
fi

# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
echo "restart dm-workers with errros to pause"
# paused with injected error
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"inject failpoint dispatchError" 1

echo "restart dm-workers block in sending to chan"
ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true
check_port_offline $WORKER1_PORT 20

# use a small job chan size to block the sender
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/workerChanSize=return(10)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# wait until the task running
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"stage": "Running"' 1
sleep 2 # wait to be blocked

# check to be blocked
curl -X POST 127.0.0.1:$WORKER1_PORT/debug/pprof/goroutine?debug=2 > $WORK_DIR/goroutine.worker1
check_log_contains $WORK_DIR/goroutine.worker1 "chan send"

# try to kill, but can't kill (NOTE: the port will be shutdown, but the process still exists)
ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true
sleep 5
worker_cnt=`ps aux | grep dm-worker | grep -v "grep" | wc -l`
if [ $worker_cnt -lt 1 ]; then
echo "some dm-workers exit, remain count ${worker_cnt}"
exit 2
fi

echo "force to restart dm-workers without errors"
ps aux | grep dm-worker | grep -v "grep" |awk '{print $2}'|xargs kill -9 || true

export GO_FAILPOINTS=''
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"stage": "Finished"' 1

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup_data import_goroutine_leak
Expand Down

0 comments on commit 86e7e4c

Please sign in to comment.