diff --git a/dm/dm/master/openapi_controller.go b/dm/dm/master/openapi_controller.go index 500b591048f..4b43c6b5a0c 100644 --- a/dm/dm/master/openapi_controller.go +++ b/dm/dm/master/openapi_controller.go @@ -22,6 +22,10 @@ import ( "context" "encoding/json" "fmt" + "strings" + + "github.com/pingcap/log" + "go.uber.org/zap" "github.com/pingcap/tiflow/dm/checker" dmcommon "github.com/pingcap/tiflow/dm/dm/common" @@ -445,14 +449,29 @@ func (s *Server) deleteTask(ctx context.Context, taskName string, force bool) er } defer release() + ignoreCannotConnectError := func(err error) bool { + if err == nil { + return true + } + if force && strings.Contains(err.Error(), "connect: connection refused") { + log.L().Warn("connect downstream error when fore delete task", zap.Error(err)) + return true + } + return false + } + toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task) if adjustErr := adjustTargetDB(ctx, toDBCfg); adjustErr != nil { - return adjustErr + if !ignoreCannotConnectError(adjustErr) { + return adjustErr + } } metaSchema := *task.MetaSchema err = s.removeMetaData(ctx, taskName, metaSchema, toDBCfg) if err != nil { - return terror.Annotate(err, "while removing metadata") + if !ignoreCannotConnectError(err) { + return terror.Annotate(err, "while removing metadata") + } } release() sourceNameList := s.getTaskSourceNameList(taskName) diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index b4ea29bf170..dfcd0075483 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -35,6 +35,14 @@ function cleanup_process() { wait_process_exit dm-syncer.test } +function cleanup_tidb_server(){ + tidb_server_num=$(ps aux >temp && grep "tidb-server" temp | wc -l && rm temp) + echo "tidb_server_num tidb-server alive" + pkill -hup tidb-server 2>/dev/null || true + + wait_process_exit tidb-server +} + function wait_pattern_exit() { pattern=$1 while true diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 97fd9fc705b..8dece3028bf 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -598,6 +598,42 @@ function test_task_with_ignore_check_items() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: TEST TASK WITH IGNORE CHECK ITEMS SUCCESS" } +function test_delete_task_with_stopped_downstream() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: DELETE TASK WITH STOPPED DOWNSTREAM" + prepare_database + + task_name="test-no-shard" + target_table_name="" # empty means no route + + # create source successfully + openapi_source_check "create_source1_success" + # create source successfully + openapi_source_check "create_source2_success" + # get source list success + openapi_source_check "list_source_success" 2 + # create no shard task success + openapi_task_check "create_noshard_task_success" $task_name $target_table_name + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $task_name" \ + "\"stage\": \"Stopped\"" 2 + + # stop downstream + cleanup_tidb_server + + # delete task failed because downstream is stopped. + openapi_task_check "delete_task_failed" "$task_name" + + # delete task success with force + openapi_task_check "delete_task_with_force_success" "$task_name" + openapi_task_check "get_task_list" 0 + + # restart downstream + run_tidb_server 4000 $TIDB_PASSWORD + sleep 2 + clean_cluster_sources_and_tasks + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: DELETE TASK WITH STOPPED DOWNSTREAM SUCCESS" +} + function test_cluster() { # list master and worker node openapi_cluster_check "list_master_success" 2 @@ -642,6 +678,7 @@ function run() { test_noshard_task_dump_status test_complex_operations_of_source_and_task test_task_with_ignore_check_items + test_delete_task_with_stopped_downstream # NOTE: this test case MUST running at last, because it will offline some members of cluster test_cluster