Skip to content

Commit

Permalink
relay(dm): cancel when relay meet error to close goroutine (#6803)
Browse files Browse the repository at this point in the history
close #6193
  • Loading branch information
lance6716 authored Aug 19, 2022
1 parent 26b99a5 commit a9b5c0b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
6 changes: 5 additions & 1 deletion dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {

failpoint.Inject("RelayGetEventFailed", func() {
err = errors.New("RelayGetEventFailed")
})
failpoint.Inject("RelayGetEventFailedAt", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function run_with_prepared_source_config() {

# with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID
# here we fail at the third write rows event, sync should retry and auto recover without any duplicate event
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailedAt=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
Expand Down
33 changes: 33 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ function test_restart_relay_status() {
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_restart_relay_status passed"
}

function test_relay_leak() {
cleanup_process
cleanup_data $TEST_NAME
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml
sed -i "/checker:/d" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"RelayGetEventFailed" 1

check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log

count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true)
if [ $count -gt 1 ]; then
echo "relay goroutine leak detected, count expect 1 but got $count"
exit 1
fi
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed"
}

function test_cant_dail_upstream() {
cleanup_process
cleanup_data $TEST_NAME
Expand Down Expand Up @@ -356,6 +388,7 @@ function test_relay_operations() {
}

function run() {
test_relay_leak
test_relay_operations
test_cant_dail_upstream
test_restart_relay_status
Expand Down
1 change: 1 addition & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (h *realRelayHolder) run() {
h.setResult(nil) // clear previous result

r := h.relay.Process(h.ctx)
h.cancel()

h.setResult(&r)
for _, err := range r.Errors {
Expand Down

0 comments on commit a9b5c0b

Please sign in to comment.