Skip to content

Commit

Permalink
Fix mpp hang error if some error happens during compile of mpp plan i…
Browse files Browse the repository at this point in the history
…n TiFlash (#1533)
  • Loading branch information
windtalker committed Mar 11, 2021
1 parent c4ba277 commit 4538877
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 28 deletions.
49 changes: 29 additions & 20 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,35 @@ namespace DB
{
std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::fail_point_wait_channels;

#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata) \
M(exception_between_create_database_meta_and_directory) \
M(exception_before_rename_table_old_meta_removed) \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatible_mode) \
M(force_disable_region_persister_compatible_mode) \
M(force_triggle_background_merge_delta) \
M(force_triggle_foreground_flush)
#define APPLY_FOR_FAILPOINTS(M) \
M(exception_between_drop_meta_and_data) \
M(exception_between_alter_data_and_meta) \
M(exception_drop_table_during_remove_meta) \
M(exception_between_rename_table_data_and_metadata) \
M(exception_between_create_database_meta_and_directory) \
M(exception_before_rename_table_old_meta_removed) \
M(exception_after_step_1_in_exchange_partition) \
M(exception_before_step_2_rename_in_exchange_partition) \
M(exception_after_step_2_in_exchange_partition) \
M(exception_before_step_3_rename_in_exchange_partition) \
M(exception_after_step_3_in_exchange_partition) \
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatible_mode) \
M(force_disable_region_persister_compatible_mode) \
M(force_triggle_background_merge_delta) \
M(force_triggle_foreground_flush) \
M(exception_before_mpp_register_non_root_mpp_task) \
M(exception_before_mpp_register_tunnel_for_non_root_mpp_task) \
M(exception_during_mpp_register_tunnel_for_non_root_mpp_task) \
M(exception_before_mpp_non_root_task_run) \
M(exception_during_mpp_non_root_task_run) \
M(exception_before_mpp_register_root_mpp_task) \
M(exception_before_mpp_register_tunnel_for_root_mpp_task) \
M(exception_before_mpp_root_task_run) \
M(exception_during_mpp_root_task_run)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
81 changes: 76 additions & 5 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ namespace DB
namespace FailPoints
{
extern const char hang_in_execution[];
extern const char exception_before_mpp_register_non_root_mpp_task[];
extern const char exception_before_mpp_register_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[];
extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_before_mpp_non_root_task_run[];
extern const char exception_before_mpp_root_task_run[];
extern const char exception_during_mpp_non_root_task_run[];
extern const char exception_during_mpp_root_task_run[];
} // namespace FailPoints

bool MPPTaskProgress::isTaskHanging(const Context & context)
Expand Down Expand Up @@ -108,21 +117,37 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
context.getTimezoneInfo().resetByDAGRequest(*dag_req);
context.setProgressCallback([this](const Progress & progress) { this->updateProgress(progress); });

dag_context = std::make_unique<DAGContext>(*dag_req, task_request.meta());
context.setDAGContext(dag_context.get());

// register task.
TMTContext & tmt_context = context.getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
LOG_DEBUG(log, "begin to register the task " << id.toString());

if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_root_mpp_task);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_non_root_mpp_task);
}
if (!task_manager->registerTask(shared_from_this()))
{
throw TiFlashException(std::string(__PRETTY_FUNCTION__) + ": Failed to register MPP Task", Errors::Coprocessor::BadRequest);
}


dag_context = std::make_unique<DAGContext>(*dag_req, task_request.meta());
context.setDAGContext(dag_context.get());

DAGQuerySource dag(context, regions, *dag_req, true);

if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task);
}
// register tunnels
MPPTunnelSetPtr tunnel_set = std::make_shared<MPPTunnelSet>();
const auto & exchangeSender = dag_req->root_executor().exchange_sender();
Expand All @@ -136,6 +161,10 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
LOG_DEBUG(log, "begin to register the tunnel " << tunnel->tunnel_id);
registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set->tunnels.emplace_back(tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
}
}
// read index , this may take a long time.
io = executeQuery(dag, context, false, QueryProcessingStage::Complete);
Expand Down Expand Up @@ -202,6 +231,14 @@ void MPPTask::runImpl()
count += block.rows();
to->write(block);
FAIL_POINT_PAUSE(FailPoints::hang_in_execution);
if (dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_root_task_run);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_non_root_task_run);
}
}

/// For outputting additional information in some formats.
Expand Down Expand Up @@ -278,14 +315,45 @@ void MPPTask::cancel()
LOG_WARNING(log, "Finish cancel task: " + id.toString());
}

void MPPHandler::handleError(MPPTaskPtr task, String error)
{
try
{
if (task != nullptr)
{
/// for root task, the tunnel is only connected after DispatchMPPTask
/// finishes without error, for non-root task, tunnel can be connected
/// even if the DispatchMPPTask fails, so for non-root task, we write
/// error to all tunnels, while for root task, we just close the tunnel.
if (!task->dag_context->isRootMPPTask())
task->writeErrToAllTunnel(error);
else
task->closeAllTunnel();
task->unregisterTask();
}
}
catch (...)
{
tryLogCurrentException(log, "Fail to handle error and clean task");
}
}
// execute is responsible for making plan , register tasks and tunnels and start the running thread.
grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse * response)
{
MPPTaskPtr task = nullptr;
try
{
Stopwatch stopwatch;
MPPTaskPtr task = std::make_shared<MPPTask>(task_request.meta(), context);
task = std::make_shared<MPPTask>(task_request.meta(), context);
task->prepare(task_request);
if (task->dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_root_task_run);
}
else
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_before_mpp_non_root_task_run);
}
task->memory_tracker = current_memory_tracker;
task->run();
LOG_INFO(log, "processing dispatch is over; the time cost is " << std::to_string(stopwatch.elapsedMilliseconds()) << " ms");
Expand All @@ -295,18 +363,21 @@ grpc::Status MPPHandler::execute(Context & context, mpp::DispatchTaskResponse *
LOG_ERROR(log, "dispatch task meet error : " << e.displayText());
auto * err = response->mutable_error();
err->set_msg(e.displayText());
handleError(task, e.displayText());
}
catch (std::exception & e)
{
LOG_ERROR(log, "dispatch task meet error : " << e.what());
auto * err = response->mutable_error();
err->set_msg(e.what());
handleError(task, e.what());
}
catch (...)
{
LOG_ERROR(log, "dispatch task meet fatal error");
auto * err = response->mutable_error();
err->set_msg("fatal error");
handleError(task, "fatal error");
}
return grpc::Status::OK;
}
Expand Down
33 changes: 30 additions & 3 deletions dbms/src/Flash/Mpp/MPPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct MPPTunnel
}
catch (...)
{
LOG_WARNING(log, "Error in destructor function of MPPTunnel");
tryLogCurrentException(log, "Error in destructor function of MPPTunnel");
}
}

Expand Down Expand Up @@ -126,6 +126,18 @@ struct MPPTunnel
cv_for_finished.notify_all();
}

/// close() finishes the tunnel without checking the connect status, this function
/// should only be used when handling error if DispatchMPPTask fails for
/// root task. Because for root task, if DispatchMPPTask fails, TiDB does
/// not sending establish MPP connection request at all, it is meaningless
/// to check the connect status in this case, just finish the tunnel.
void close()
{
std::unique_lock<std::mutex> lk(mu);
finished = true;
cv_for_finished.notify_all();
}

// a MPPConn request has arrived. it will build connection by this tunnel;
void connect(::grpc::ServerWriter<::mpp::MPPDataPacket> * writer_)
{
Expand Down Expand Up @@ -256,7 +268,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab
// which targeted task we should send data by which tunnel.
std::map<MPPTaskId, MPPTunnelPtr> tunnel_map;

MPPTaskManager * manager;
MPPTaskManager * manager = nullptr;

Logger * log;

Expand All @@ -279,6 +291,20 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab

void cancel();

void closeAllTunnel()
{
try
{
for (auto & it : tunnel_map)
{
it.second->close();
}
}
catch (...)
{
tryLogCurrentException(log, "Failed to close all tunnels");
}
}
void writeErrToAllTunnel(const String & e)
{
try
Expand All @@ -294,7 +320,7 @@ struct MPPTask : std::enable_shared_from_this<MPPTask>, private boost::noncopyab
}
catch (...)
{
LOG_WARNING(log, "Failed to write error " + e + " to all tunnel");
tryLogCurrentException(log, "Failed to write error " + e + " to all tunnels");
}
}

Expand Down Expand Up @@ -516,6 +542,7 @@ class MPPHandler
public:
MPPHandler(const mpp::DispatchTaskRequest & task_request_) : task_request(task_request_), log(&Logger::get("MPPHandler")) {}
grpc::Status execute(Context & context, mpp::DispatchTaskResponse * response);
void handleError(MPPTaskPtr task, String error);
};

} // namespace DB
69 changes: 69 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Preparation.
=> DBGInvoke __init_fail_point()

mysql> drop table if exists test.t
mysql> create table test.t (id int, value varchar(64))
mysql> insert into test.t values(1,'a'),(2,'b'),(3,'c')
mysql> alter table test.t set tiflash replica 1

func> wait_table test t


# Data.

## exception before mpp register non root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_non_root_mpp_task)

## exception before mpp register root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_root_mpp_task)

## exception before mpp register tunnel for non root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_non_root_mpp_task)

## exception before mpp register tunnel for root mpp task
=> DBGInvoke __enable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_register_tunnel_for_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_register_tunnel_for_root_mpp_task)

## exception during mpp register tunnel for non root mpp task
=> DBGInvoke __enable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_register_tunnel_for_non_root_mpp_task)

## exception before mpp run non root task
=> DBGInvoke __enable_fail_point(exception_before_mpp_non_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_non_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_non_root_task_run)

## exception before mpp run root task
=> DBGInvoke __enable_fail_point(exception_before_mpp_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: DB::Exception: Fail point FailPoints::exception_before_mpp_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_before_mpp_root_task_run)

## exception during mpp run non root task
=> DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: exchange receiver meet error : DB::Exception: Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)

## exception during mpp run root task
=> DBGInvoke __enable_fail_point(exception_during_mpp_root_task_run)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_during_mpp_root_task_run is triggered.
=> DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run)

# Clean up.
mysql> drop table if exists test.t

0 comments on commit 4538877

Please sign in to comment.