Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

fix: do not block in load unit for stop/pause task (#1265) #1269

Merged
merged 1 commit into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,25 @@ type Worker struct {
}

// NewWorker returns a Worker.
func NewWorker(loader *Loader, id int) (worker *Worker, err error) {
func NewWorker(loader *Loader, id int) *Worker {
ctctx := loader.logCtx.WithLogger(loader.logCtx.L().WithFields(zap.Int("worker ID", id)))

return &Worker{
w := &Worker{
id: id,
cfg: loader.cfg,
checkPoint: loader.checkPoint,
conn: loader.toDBConns[id],
jobQueue: make(chan *dataJob, jobCount),
loader: loader,
tctx: ctctx,
}, nil
}

failpoint.Inject("workerChanSize", func(val failpoint.Value) {
size := val.(int)
w.tctx.L().Info("", zap.String("failpoint", "workerChanSize"), zap.Int("size", size))
w.jobQueue = make(chan *dataJob, size)
})

return w
}

// Close closes worker
Expand Down Expand Up @@ -142,6 +149,7 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
ctctx := w.tctx.WithContext(newCtx)

doJob := func() {
hasError := false
for {
select {
case <-newCtx.Done():
Expand All @@ -156,6 +164,10 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
w.tctx.L().Info("jobs are finished, execution goroutine exits")
return
}
if hasError {
continue // continue to read so than the sender will not be blocked
}

sqls := make([]string, 0, 3)
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(job.schema, w.tctx.L())))
sqls = append(sqls, job.sql)
Expand Down Expand Up @@ -184,7 +196,12 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh
if !utils.IsContextCanceledError(err) {
runFatalChan <- unit.NewProcessError(err)
}
return
hasError = true
failpoint.Inject("returnDoJobError", func(_ failpoint.Value) {
w.tctx.L().Info("", zap.String("failpoint", "returnDoJobError"))
failpoint.Return()
})
continue
}
w.loader.checkPoint.UpdateOffset(job.file, job.offset)
w.loader.finishedDataSize.Add(job.offset - job.lastOffset)
Expand Down Expand Up @@ -844,11 +861,7 @@ func (l *Loader) genRouter(rules []*router.TableRule) error {

func (l *Loader) initAndStartWorkerPool(ctx context.Context) error {
for i := 0; i < l.cfg.PoolSize; i++ {
worker, err := NewWorker(l, i)
if err != nil {
return err
}

worker := NewWorker(l, i)
l.workerWg.Add(1) // for every worker goroutine, Add(1)
go func() {
defer l.workerWg.Done()
Expand Down
25 changes: 25 additions & 0 deletions tests/import_goroutine_leak/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[[check-tables]]
schema = "import_goroutine_leak"
tables = ["~t.*"]

[[table-config]]
schema = "import_goroutine_leak"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "import_goroutine_leak"
table = "t1"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
6 changes: 0 additions & 6 deletions tests/import_goroutine_leak/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ mysql-instances:
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["import_goroutine_leak"]
Expand Down
2 changes: 0 additions & 2 deletions tests/import_goroutine_leak/conf/dm-worker2.toml

This file was deleted.

10 changes: 0 additions & 10 deletions tests/import_goroutine_leak/conf/source2.yaml

This file was deleted.

113 changes: 73 additions & 40 deletions tests/import_goroutine_leak/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

# only use one DM-worker instance to avoid re-schedule after restart process.

COUNT=200
function prepare_datafile() {
for i in $(seq 2); do
data_file="$WORK_DIR/db$i.prepare.sql"
echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file
echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file
echo 'USE import_goroutine_leak;' >> $data_file
echo "CREATE TABLE t$i(i TINYINT, j INT UNIQUE KEY);" >> $data_file
for j in $(seq $COUNT); do
echo "INSERT INTO t$i VALUES ($i,${j}000$i),($i,${j}001$i);" >> $data_file
done
data_file="$WORK_DIR/db1.prepare.sql"
echo 'DROP DATABASE if exists import_goroutine_leak;' >> $data_file
echo 'CREATE DATABASE import_goroutine_leak;' >> $data_file
echo 'USE import_goroutine_leak;' >> $data_file
echo "CREATE TABLE t1(i TINYINT, j INT UNIQUE KEY);" >> $data_file
for j in $(seq $COUNT); do
echo "INSERT INTO t1 VALUES (1,${j}0001),(1,${j}0011);" >> $data_file
done
}

function run() {
prepare_datafile

run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

echo "dm-worker panic, doJob of import unit workers don't exit"
# check doJobs of import unit worker exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
# send to closed `runFatalChan`
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/workerCantClose=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
Expand All @@ -39,23 +39,16 @@ function run() {
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml" \
"\"source\": \"$SOURCE_ID1\"" 1 \
"\"source\": \"$SOURCE_ID2\"" 1 \

check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

# dm-worker1 panics
err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l`
Expand All @@ -64,27 +57,19 @@ function run() {
exit 2
fi

echo "dm-workers panic again, workers of import unit don't exit"
# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
echo "dm-worker panic again, workers of import unit don't exit"
# send to closed `runFatalChan`
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml

echo "start task after restarted dm-worker"
# TODO: check whether dm-worker has restarted and continued the subtask
# run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
# "start-task $cur/conf/dm-task.yaml" \
# "\"result\": true" 1 \
# "start sub task test: sub task test already exists" 2
sleep 2s

check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
sleep 2

# dm-worker1 panics
err_cnt=`grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l`
Expand All @@ -94,18 +79,66 @@ function run() {
exit 2
fi

# check workers of import unit exit
inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
echo "restart dm-workers with errros to pause"
# paused with injected error
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/dispatchError=return(1)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"inject failpoint dispatchError" 1

echo "restart dm-workers block in sending to chan"
ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true
check_port_offline $WORKER1_PORT 20

# use a small job chan size to block the sender
inject_points=("github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)"
"github.com/pingcap/dm/loader/executeSQLError=return(1)"
"github.com/pingcap/dm/loader/returnDoJobError=return(1)"
"github.com/pingcap/dm/loader/workerChanSize=return(10)"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# wait until the task running
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"stage": "Running"' 1
sleep 2 # wait to be blocked

# check to be blocked
curl -X POST 127.0.0.1:$WORKER1_PORT/debug/pprof/goroutine?debug=2 > $WORK_DIR/goroutine.worker1
check_log_contains $WORK_DIR/goroutine.worker1 "chan send"

# try to kill, but can't kill (NOTE: the port will be shutdown, but the process still exists)
ps aux | grep dm-worker |awk '{print $2}'|xargs kill || true
sleep 5
worker_cnt=`ps aux | grep dm-worker | grep -v "grep" | wc -l`
if [ $worker_cnt -lt 1 ]; then
echo "some dm-workers exit, remain count ${worker_cnt}"
exit 2
fi

echo "force to restart dm-workers without errors"
ps aux | grep dm-worker | grep -v "grep" |awk '{print $2}'|xargs kill -9 || true

export GO_FAILPOINTS=''
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"stage": "Finished"' 1

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup_data import_goroutine_leak
Expand Down