Skip to content

Commit

Permalink
Fix data race in tunnel_set/receiver_set and establish call data (#5650)
Browse files Browse the repository at this point in the history
ref #5095, close #5651
  • Loading branch information
windtalker authored Aug 18, 2022
1 parent 7a700a6 commit 7428d37
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 29 deletions.
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,12 @@ void DAGContext::addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_r
{
if (!isMPPTask())
return;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->addCoprocessorReader(coprocessor_reader);
coprocessor_readers.push_back(coprocessor_reader);
}

std::vector<CoprocessorReaderPtr> & DAGContext::getCoprocessorReaders()
{
return coprocessor_readers;
}

bool DAGContext::containsRegionsInfoForTable(Int64 table_id) const
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class DAGContext
mpp_receiver_set = receiver_set;
}
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
std::vector<CoprocessorReaderPtr> & getCoprocessorReaders();

void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
Expand Down Expand Up @@ -373,6 +374,7 @@ class DAGContext
std::atomic<UInt64> warning_count;

MPPReceiverSetPtr mpp_receiver_set;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ bool EstablishCallData::write(const mpp::MPPDataPacket & packet)
void EstablishCallData::writeErr(const mpp::MPPDataPacket & packet)
{
state = ERR_HANDLE;
if (write(packet))
err_status = grpc::Status::OK;
else
err_status = grpc::Status(grpc::StatusCode::UNKNOWN, "Write error message failed for unknown reason.");
err_status = grpc::Status::OK;
write(packet);
}

void EstablishCallData::setFinishState(const String & msg)
Expand Down
43 changes: 34 additions & 9 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ void MPPTask::abortTunnels(const String & message, AbortType abort_type)

void MPPTask::abortReceivers()
{
if (likely(receiver_set != nullptr))
{
receiver_set->cancel();
std::unique_lock lock(tunnel_and_receiver_mu);
if unlikely (receiver_set == nullptr)
return;
}
receiver_set->cancel();
}

void MPPTask::abortDataStreams(AbortType abort_type)
Expand All @@ -111,8 +113,12 @@ void MPPTask::abortDataStreams(AbortType abort_type)

void MPPTask::closeAllTunnels(const String & reason)
{
if (likely(tunnel_set))
tunnel_set->close(reason);
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (unlikely(tunnel_set == nullptr))
return;
}
tunnel_set->close(reason);
}

void MPPTask::finishWrite()
Expand All @@ -128,7 +134,7 @@ void MPPTask::run()

void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
{
tunnel_set = std::make_shared<MPPTunnelSet>(log->identifier());
auto tunnel_set_local = std::make_shared<MPPTunnelSet>(log->identifier());
std::chrono::seconds timeout(task_request.timeout());
const auto & exchange_sender = dag_req.root_executor().exchange_sender();

Expand All @@ -144,17 +150,24 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
LOG_FMT_DEBUG(log, "begin to register the tunnel {}", tunnel->id());
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id()));
tunnel_set->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel);
if (!dag_context->isRootMPPTask())
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task);
}
}
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
}
dag_context->tunnel_set = tunnel_set;
}

void MPPTask::initExchangeReceivers()
{
receiver_set = std::make_shared<MPPReceiverSet>(log->identifier());
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
traverseExecutors(&dag_req, [&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand All @@ -177,11 +190,17 @@ void MPPTask::initExchangeReceivers()
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");

receiver_set->addExchangeReceiver(executor_id, exchange_receiver);
receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);
new_thread_count_of_exchange_receiver += exchange_receiver->computeNewThreadCount();
}
return true;
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
}
dag_context->setMPPReceiverSet(receiver_set);
}

Expand Down Expand Up @@ -293,7 +312,6 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
// register tunnels
registerTunnels(task_request);

dag_context->tunnel_set = tunnel_set;
// register task.
auto task_manager = tmt_context.getMPPTaskManager();
LOG_FMT_DEBUG(log, "begin to register the task {}", id.toString());
Expand All @@ -320,6 +338,13 @@ void MPPTask::preprocess()
auto start_time = Clock::now();
initExchangeReceivers();
executeQuery(*context);
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
receiver_set->addCoprocessorReader(r);
}
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

MPPTaskId id;

std::mutex tunnel_and_receiver_mu;

MPPTunnelSetPtr tunnel_set;

MPPReceiverSetPtr receiver_set;
Expand Down
15 changes: 1 addition & 14 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,7 @@ MPPTunnel::~MPPTunnel()
});
try
{
{
std::unique_lock lock(*mu);
if (status == TunnelStatus::Finished)
{
LOG_DEBUG(log, "already finished!");
return;
}

/// make sure to finish the tunnel after it is connected
waitUntilConnectedOrFinished(lock);
finishSendQueue();
}
LOG_FMT_TRACE(log, "waiting consumer finish!");
waitForSenderFinish(/*allow_throw=*/false);
close("");
}
catch (...)
{
Expand Down

0 comments on commit 7428d37

Please sign in to comment.