Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more information to the error message in MPPTunnel #6787

Merged
merged 2 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const LoggerPtr & log)
{
Int32 copy_live_connections;
String first_err_msg = local_err_msg;
{
std::lock_guard lock(mu);

Expand All @@ -843,29 +844,39 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
state = ExchangeReceiverState::ERROR;
if (err_msg.empty())
err_msg = local_err_msg;
else
first_err_msg = err_msg;
}

copy_live_connections = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);

if (meet_error)
{
LOG_WARNING(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);
}
else
{
LOG_DEBUG(
log,
"connection end. Current alive connections: {}",
copy_live_connections);
}
assert(copy_live_connections >= 0);
if (copy_live_connections == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");

if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver channels finished");
LOG_INFO(exc_log, "receiver channels finished, meet error: {}, error message: {}", meet_error, first_err_msg);
finishAllMsgChannels();
}
}
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
{
std::unique_lock lk(mu);
waitUntilConnectedOrFinished(lk);
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel which is already closed.");
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
}

auto pushed_data_size = data->getPacket().ByteSizeLong();
Expand All @@ -167,7 +167,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
updateConnProfileInfo(pushed_data_size);
return;
}
throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
throw Exception(fmt::format("write to tunnel {} which is already closed, {}", tunnel_id, tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
}

/// done normally and being called exactly once after writing all packets
Expand All @@ -179,7 +179,7 @@ void MPPTunnel::writeDone()
/// make sure to finish the tunnel after it is connected
waitUntilConnectedOrFinished(lk);
if (tunnel_sender == nullptr)
throw Exception(fmt::format("write to tunnel which is already closed."));
throw Exception(fmt::format("write to tunnel {} which is already closed.", tunnel_id));
}
tunnel_sender->finish();
waitForSenderFinish(/*allow_throw=*/true);
Expand Down Expand Up @@ -208,8 +208,8 @@ void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel");
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "{} should be a local tunnel", tunnel_id);

LOG_TRACE(log, "ready to connect local tunnel version 2");
if (is_fine_grained)
Expand All @@ -233,7 +233,7 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data)
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel has connected or finished: {}", statusToString());
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString());

LOG_TRACE(log, "ready to connect async");
RUNTIME_ASSERT(mode == TunnelSenderMode::ASYNC_GRPC, log, "mode {} is not async grpc in connectAsync", magic_enum::enum_name(mode));
Expand Down Expand Up @@ -283,7 +283,7 @@ void MPPTunnel::waitForSenderFinish(bool allow_throw)
status = TunnelStatus::Finished;
}
if (allow_throw && !err_msg.empty())
throw Exception("Consumer exits unexpected, " + err_msg);
throw Exception(fmt::format("{}: consumer exits unexpected, error message: {} ", tunnel_id, err_msg));
LOG_TRACE(log, "end wait for consumer finish!");
}

Expand Down Expand Up @@ -311,7 +311,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk)
LOG_TRACE(log, "end waitUntilConnectedOrFinished");
}
if (status == TunnelStatus::Unconnected)
throw Exception("MPPTunnel can not be connected because MPPTask is cancelled");
throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id));
}

StringRef MPPTunnel::statusToString()
Expand Down Expand Up @@ -399,7 +399,7 @@ void MPPTunnel::connectLocalV1(PacketWriter * writer)
{
std::unique_lock lk(mu);
if (status != TunnelStatus::Unconnected)
throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
throw Exception(fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));

LOG_TRACE(log, "ready to connect local tunnel version 1");

Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterUnconnectFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed.");
}
}

Expand All @@ -392,7 +392,7 @@ TEST_F(TestMPPTunnel, SyncWriteDoneAfterUnconnectFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed.");
}
}

Expand Down Expand Up @@ -460,7 +460,7 @@ TEST_F(TestMPPTunnel, SyncWriteError)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed.");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. ");
}
}

Expand All @@ -481,7 +481,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, ");
}
if (mpp_tunnel_ptr != nullptr)
mpp_tunnel_ptr->waitForFinish();
Expand Down Expand Up @@ -573,7 +573,7 @@ TEST_F(TestMPPTunnel, AsyncWriteError)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed.");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. ");
}
}

Expand Down Expand Up @@ -660,7 +660,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Finished");
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Finished");
}

TEST_F(TestMPPTunnel, LocalConnectWhenConnected)
Expand All @@ -679,7 +679,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Connected");
GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Connected");
}

TEST_F(TestMPPTunnel, LocalCloseBeforeConnect)
Expand Down Expand Up @@ -713,7 +713,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed.");
}

TEST_F(TestMPPTunnel, LocalWriteDoneAfterUnconnectFinished)
Expand All @@ -726,7 +726,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed.");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed.");
}

TEST_F(TestMPPTunnel, LocalWriteError)
Expand Down Expand Up @@ -756,7 +756,7 @@ try
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, err");
GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: err ");
}

TEST_F(TestMPPTunnel, LocalWriteAfterFinished)
Expand All @@ -775,7 +775,7 @@ TEST_F(TestMPPTunnel, LocalWriteAfterFinished)
}
catch (Exception & e)
{
GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,");
GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, ");
}
if (tunnel != nullptr)
tunnel->waitForFinish();
Expand Down