Skip to content

Commit

Permalink
Add mutex to protect exchange receiver's async client (#5008) (#5010)
Browse files Browse the repository at this point in the history
close #4977
  • Loading branch information
ti-chi-bot authored May 27, 2022
1 parent decf03e commit b34873a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
11 changes: 9 additions & 2 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ class AsyncRequestHandler : public UnaryCallback<bool>
switch (stage)
{
case AsyncRequestStage::WAIT_MAKE_READER:
{
// Use lock to ensure reader is created already in reactor thread
std::unique_lock lock(mu);
if (!ok)
reader.reset();
notifyReactor();
break;
}
case AsyncRequestStage::WAIT_BATCH_READ:
if (ok)
++read_packet_index;
Expand Down Expand Up @@ -227,6 +231,8 @@ class AsyncRequestHandler : public UnaryCallback<bool>
void start()
{
stage = AsyncRequestStage::WAIT_MAKE_READER;
// Use lock to ensure async reader is unreachable from grpc thread before this function returns
std::unique_lock lock(mu);
rpc_context->makeAsyncReader(*request, reader, thisAsUnaryCallback());
}

Expand Down Expand Up @@ -283,6 +289,7 @@ class AsyncRequestHandler : public UnaryCallback<bool>
size_t read_packet_index = 0;
Status finish_status = RPCContext::getStatusOK();
LoggerPtr log;
std::mutex mu;
};
} // namespace

Expand Down Expand Up @@ -393,10 +400,10 @@ void ExchangeReceiverBase<RPCContext>::reactor(const std::vector<Request> & asyn
MPMCQueue<AsyncHandler *> ready_requests(alive_async_connections * 2);
std::vector<AsyncHandler *> waiting_for_retry_requests;

std::vector<AsyncRequestHandler<RPCContext>> handlers;
std::vector<std::unique_ptr<AsyncHandler>> handlers;
handlers.reserve(alive_async_connections);
for (const auto & req : async_requests)
handlers.emplace_back(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier());
handlers.emplace_back(std::make_unique<AsyncHandler>(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier()));

while (alive_async_connections > 0)
{
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/FunctionsDateTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ void registerFunctionsDateTime(FunctionFactory & factory)
factory.registerFunction<FunctionToTiDBToSeconds>();
factory.registerFunction<FunctionToTiDBToDays>();
factory.registerFunction<FunctionTiDBFromDays>();

factory.registerFunction<FunctionToTimeZone>();
factory.registerFunction<FunctionToLastDay>();
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/FunctionsDateTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -3450,7 +3450,6 @@ using FunctionToTiDBDayOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUI
using FunctionToTiDBWeekOfYear = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt16, TiDBWeekOfYearTransformerImpl, return_nullable>;
using FunctionToTiDBToSeconds = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt64, TiDBToSecondsTransformerImpl, return_nullable>;
using FunctionToTiDBToDays = FunctionMyDateOrMyDateTimeToSomething<DataTypeUInt32, TiDBToDaysTransformerImpl, return_nullable>;

using FunctionToRelativeYearNum = FunctionDateOrDateTimeToSomething<DataTypeUInt16, ToRelativeYearNumImpl>;
using FunctionToRelativeQuarterNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeQuarterNumImpl>;
using FunctionToRelativeMonthNum = FunctionDateOrDateTimeToSomething<DataTypeUInt32, ToRelativeMonthNumImpl>;
Expand Down

0 comments on commit b34873a

Please sign in to comment.