diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 034bfa786d3..14149de5862 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -8,26 +8,35 @@ namespace DB { std::unordered_map> FailPointHelper::fail_point_wait_channels; -#define APPLY_FOR_FAILPOINTS(M) \ - M(exception_between_drop_meta_and_data) \ - M(exception_between_alter_data_and_meta) \ - M(exception_drop_table_during_remove_meta) \ - M(exception_between_rename_table_data_and_metadata) \ - M(exception_between_create_database_meta_and_directory) \ - M(exception_before_rename_table_old_meta_removed) \ - M(exception_after_step_1_in_exchange_partition) \ - M(exception_before_step_2_rename_in_exchange_partition) \ - M(exception_after_step_2_in_exchange_partition) \ - M(exception_before_step_3_rename_in_exchange_partition) \ - M(exception_after_step_3_in_exchange_partition) \ - M(region_exception_after_read_from_storage_some_error) \ - M(region_exception_after_read_from_storage_all_error) \ - M(exception_before_dmfile_remove_encryption) \ - M(exception_before_dmfile_remove_from_disk) \ - M(force_enable_region_persister_compatible_mode) \ - M(force_disable_region_persister_compatible_mode) \ - M(force_triggle_background_merge_delta) \ - M(force_triggle_foreground_flush) +#define APPLY_FOR_FAILPOINTS(M) \ + M(exception_between_drop_meta_and_data) \ + M(exception_between_alter_data_and_meta) \ + M(exception_drop_table_during_remove_meta) \ + M(exception_between_rename_table_data_and_metadata) \ + M(exception_between_create_database_meta_and_directory) \ + M(exception_before_rename_table_old_meta_removed) \ + M(exception_after_step_1_in_exchange_partition) \ + M(exception_before_step_2_rename_in_exchange_partition) \ + M(exception_after_step_2_in_exchange_partition) \ + M(exception_before_step_3_rename_in_exchange_partition) \ + M(exception_after_step_3_in_exchange_partition) \ + M(region_exception_after_read_from_storage_some_error) \ + M(region_exception_after_read_from_storage_all_error) \ + M(exception_before_dmfile_remove_encryption) \ + M(exception_before_dmfile_remove_from_disk) \ + M(force_enable_region_persister_compatible_mode) \ + M(force_disable_region_persister_compatible_mode) \ + M(force_triggle_background_merge_delta) \ + M(force_triggle_foreground_flush) \ + M(exception_before_mpp_register_non_root_mpp_task) \ + M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \ + M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \ + M(exception_before_mpp_non_root_task_run) \ + M(exception_during_mpp_non_root_task_run) \ + M(exception_before_mpp_register_root_mpp_task) \ + M(exception_before_mpp_register_tunnel_for_root_mpp_task) \ + M(exception_before_mpp_root_task_run) \ + M(exception_during_mpp_root_task_run) #define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \ M(pause_after_learner_read) \ diff --git a/dbms/src/Flash/Mpp/MPPHandler.cpp b/dbms/src/Flash/Mpp/MPPHandler.cpp index 0dcc66c7780..2ad58606674 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.cpp +++ b/dbms/src/Flash/Mpp/MPPHandler.cpp @@ -16,6 +16,15 @@ namespace DB namespace FailPoints { extern const char hang_in_execution[]; +extern const char exception_before_mpp_register_non_root_mpp_task[]; +extern const char exception_before_mpp_register_root_mpp_task[]; +extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[]; +extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[]; +extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[]; +extern const char exception_before_mpp_non_root_task_run[]; +extern const char exception_before_mpp_root_task_run[]; +extern const char exception_during_mpp_non_root_task_run[]; +extern const char exception_during_mpp_root_task_run[]; } // namespace FailPoints bool MPPTaskProgress::isTaskHanging(const Context & context) @@ -108,21 +117,37 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) context.getTimezoneInfo().resetByDAGRequest(*dag_req); context.setProgressCallback([this](const Progress & progress) { this->updateProgress(progress); }); + dag_context = std::make_unique(*dag_req, task_request.meta()); + context.setDAGContext(dag_context.get()); + // register task. TMTContext & tmt_context = context.getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); LOG_DEBUG(log, "begin to register the task " << id.toString()); + + if (dag_context->isRootMPPTask()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_root_mpp_task); + } + else + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_non_root_mpp_task); + } if (!task_manager->registerTask(shared_from_this())) { throw TiFlashException(std::string(__PRETTY_FUNCTION__) + ": Failed to register MPP Task", Errors::Coprocessor::BadRequest); } - - dag_context = std::make_unique(*dag_req, task_request.meta()); - context.setDAGContext(dag_context.get()); - DAGQuerySource dag(context, regions, *dag_req, true); + if (dag_context->isRootMPPTask()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task); + } + else + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task); + } // register tunnels MPPTunnelSetPtr tunnel_set = std::make_shared(); const auto & exchangeSender = dag_req->root_executor().exchange_sender(); @@ -136,6 +161,10 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) LOG_DEBUG(log, "begin to register the tunnel " << tunnel->tunnel_id); registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel); tunnel_set->tunnels.emplace_back(tunnel); + if (!dag_context->isRootMPPTask()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task); + } } // read index , this may take a long time. io = executeQuery(dag, context, false, QueryProcessingStage::Complete); @@ -202,6 +231,14 @@ void MPPTask::runImpl() count += block.rows(); to->write(block); FAIL_POINT_PAUSE(FailPoints::hang_in_execution); + if (dag_context->isRootMPPTask()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_root_task_run); + } + else + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_non_root_task_run); + } } /// For outputting additional information in some formats. @@ -278,14 +315,45 @@ void MPPTask::cancel() LOG_WARNING(log, "Finish cancel task: " + id.toString()); } +void MPPHandler::handleError(MPPTaskPtr task, String error) +{ + try + { + if (task != nullptr) + { + /// for root task, the tunnel is only connected after DispatchMPPTask + /// finishes without error, for non-root task, tunnel can be connected + /// even if the DispatchMPPTask fails, so for non-root task, we write + /// error to all tunnels, while for root task, we just close the tunnel. + if (!task->dag_context->isRootMPPTask()) + task->writeErrToAllTunnel(error); + else + task->closeAllTunnel(); + task->unregisterTask(); + } + } + catch (...) + { + tryLogCurrentException(log, "Fail to handle error and clean task"); + } +} // execute is responsible for making plan , register tasks and tunnels and start the running thread. grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse * response) { + MPPTaskPtr task = nullptr; try { Stopwatch stopwatch; - MPPTaskPtr task = std::make_shared(task_request.meta(), context); + task = std::make_shared(task_request.meta(), context); task->prepare(task_request); + if (task->dag_context->isRootMPPTask()) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_root_task_run); + } + else + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_non_root_task_run); + } task->memory_tracker = current_memory_tracker; task->run(); LOG_INFO(log, "processing dispatch is over; the time cost is " << std::to_string(stopwatch.elapsedMilliseconds()) << " ms"); @@ -295,18 +363,21 @@ grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse * LOG_ERROR(log, "dispatch task meet error : " << e.displayText()); auto * err = response->mutable_error(); err->set_msg(e.displayText()); + handleError(task, e.displayText()); } catch (std::exception & e) { LOG_ERROR(log, "dispatch task meet error : " << e.what()); auto * err = response->mutable_error(); err->set_msg(e.what()); + handleError(task, e.what()); } catch (...) { LOG_ERROR(log, "dispatch task meet fatal error"); auto * err = response->mutable_error(); err->set_msg("fatal error"); + handleError(task, "fatal error"); } return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPHandler.h b/dbms/src/Flash/Mpp/MPPHandler.h index a126be6f07a..7552a7735b5 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.h +++ b/dbms/src/Flash/Mpp/MPPHandler.h @@ -67,7 +67,7 @@ struct MPPTunnel } catch (...) { - LOG_WARNING(log, "Error in destructor function of MPPTunnel"); + tryLogCurrentException(log, "Error in destructor function of MPPTunnel"); } } @@ -126,6 +126,18 @@ struct MPPTunnel cv_for_finished.notify_all(); } + /// close() finishes the tunnel without checking the connect status, this function + /// should only be used when handling error if DispatchMPPTask fails for + /// root task. Because for root task, if DispatchMPPTask fails, TiDB does + /// not sending establish MPP connection request at all, it is meaningless + /// to check the connect status in this case, just finish the tunnel. + void close() + { + std::unique_lock lk(mu); + finished = true; + cv_for_finished.notify_all(); + } + // a MPPConn request has arrived. it will build connection by this tunnel; void connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_) { @@ -256,7 +268,7 @@ struct MPPTask : std::enable_shared_from_this, private boost::noncopyab // which targeted task we should send data by which tunnel. std::map tunnel_map; - MPPTaskManager * manager; + MPPTaskManager * manager = nullptr; Logger * log; @@ -279,6 +291,20 @@ struct MPPTask : std::enable_shared_from_this, private boost::noncopyab void cancel(); + void closeAllTunnel() + { + try + { + for (auto & it : tunnel_map) + { + it.second->close(); + } + } + catch (...) + { + tryLogCurrentException(log, "Failed to close all tunnels"); + } + } void writeErrToAllTunnel(const String & e) { try @@ -294,7 +320,7 @@ struct MPPTask : std::enable_shared_from_this, private boost::noncopyab } catch (...) { - LOG_WARNING(log, "Failed to write error " + e + " to all tunnel"); + tryLogCurrentException(log, "Failed to write error " + e + " to all tunnels"); } } @@ -516,6 +542,7 @@ class MPPHandler public: MPPHandler(const mpp::DispatchTaskRequest & task_request_) : task_request(task_request_), log(&Logger::get("MPPHandler")) {} grpc::Status execute(Context & context, mpp::DispatchTaskResponse * response); + void handleError(MPPTaskPtr task, String error); }; } // namespace DB diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test new file mode 100644 index 00000000000..705bc8b228f --- /dev/null +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -0,0 +1,69 @@ +# Preparation. +=> DBGInvoke __init_fail_point() + +mysql> drop table if exists test.t +mysql> create table test.t (id int, value varchar(64)) +mysql> insert into test.t values(1,'a'),(2,'b'),(3,'c') +mysql> alter table test.t set tiflash replica 1 + +func> wait_table test t + + +# Data. + +## exception before mpp register non root mpp task +=> DBGInvoke __enable_fail_point(exception_before_mpp_register_non_root_mpp_task) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_non_root_mpp_task is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_register_non_root_mpp_task) + +## exception before mpp register root mpp task +=> DBGInvoke __enable_fail_point(exception_before_mpp_register_root_mpp_task) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_root_mpp_task is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_register_root_mpp_task) + +## exception before mpp register tunnel for non root mpp task +=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task) + +## exception before mpp register tunnel for root mpp task +=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task) + +## exception during mpp register tunnel for non root mpp task +=> DBGInvoke __enable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task is triggered. +=> DBGInvoke __disable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task) + +## exception before mpp run non root task +=> DBGInvoke __enable_fail_point(exception_before_mpp_non_root_task_run) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_non_root_task_run is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_non_root_task_run) + +## exception before mpp run root task +=> DBGInvoke __enable_fail_point(exception_before_mpp_root_task_run) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_root_task_run is triggered. +=> DBGInvoke __disable_fail_point(exception_before_mpp_root_task_run) + +## exception during mpp run non root task +=> DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: exchange receiver meet error : DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered. +=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) + +## exception during mpp run root task +=> DBGInvoke __enable_fail_point(exception_during_mpp_root_task_run) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered. +=> DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) + +# Clean up. +mysql> drop table if exists test.t