diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 6e254bd62dc..9f0bd8e99dc 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -834,6 +834,7 @@ void ExchangeReceiverBase::connectionDone( const LoggerPtr & log) { Int32 copy_live_connections; + String first_err_msg = local_err_msg; { std::lock_guard lock(mu); @@ -843,29 +844,39 @@ void ExchangeReceiverBase::connectionDone( state = ExchangeReceiverState::ERROR; if (err_msg.empty()) err_msg = local_err_msg; + else + first_err_msg = err_msg; } copy_live_connections = --live_connections; } - LOG_DEBUG( - log, - "connection end. meet error: {}, err msg: {}, current alive connections: {}", - meet_error, - local_err_msg, - copy_live_connections); - + if (meet_error) + { + LOG_WARNING( + log, + "connection end. meet error: {}, err msg: {}, current alive connections: {}", + meet_error, + local_err_msg, + copy_live_connections); + } + else + { + LOG_DEBUG( + log, + "connection end. Current alive connections: {}", + copy_live_connections); + } + assert(copy_live_connections >= 0); if (copy_live_connections == 0) { LOG_DEBUG(log, "All threads end in ExchangeReceiver"); cv.notify_all(); } - else if (copy_live_connections < 0) - throw Exception("alive_connection_num should not be less than 0!"); if (meet_error || copy_live_connections == 0) { - LOG_INFO(exc_log, "receiver channels finished"); + LOG_INFO(exc_log, "receiver channels finished, meet error: {}, error message: {}", meet_error, first_err_msg); finishAllMsgChannels(); } } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 5a56f47368e..4978e6f4f27 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -157,7 +157,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data) { std::unique_lock lk(mu); waitUntilConnectedOrFinished(lk); - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel which is already closed."); + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); } auto pushed_data_size = data->getPacket().ByteSizeLong(); @@ -167,7 +167,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data) updateConnProfileInfo(pushed_data_size); return; } - throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); + throw Exception(fmt::format("write to tunnel {} which is already closed, {}", tunnel_id, tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); } /// done normally and being called exactly once after writing all packets @@ -179,7 +179,7 @@ void MPPTunnel::writeDone() /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lk); if (tunnel_sender == nullptr) - throw Exception(fmt::format("write to tunnel which is already closed.")); + throw Exception(fmt::format("write to tunnel {} which is already closed.", tunnel_id)); } tunnel_sender->finish(); waitForSenderFinish(/*allow_throw=*/true); @@ -208,8 +208,8 @@ void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_ { { std::unique_lock lk(mu); - RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString())); - RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel"); + RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString())); + RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "{} should be a local tunnel", tunnel_id); LOG_TRACE(log, "ready to connect local tunnel version 2"); if (is_fine_grained) @@ -233,7 +233,7 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data) { { std::unique_lock lk(mu); - RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel has connected or finished: {}", statusToString()); + RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()); LOG_TRACE(log, "ready to connect async"); RUNTIME_ASSERT(mode == TunnelSenderMode::ASYNC_GRPC, log, "mode {} is not async grpc in connectAsync", magic_enum::enum_name(mode)); @@ -283,7 +283,7 @@ void MPPTunnel::waitForSenderFinish(bool allow_throw) status = TunnelStatus::Finished; } if (allow_throw && !err_msg.empty()) - throw Exception("Consumer exits unexpected, " + err_msg); + throw Exception(fmt::format("{}: consumer exits unexpected, error message: {} ", tunnel_id, err_msg)); LOG_TRACE(log, "end wait for consumer finish!"); } @@ -311,7 +311,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock & lk) LOG_TRACE(log, "end waitUntilConnectedOrFinished"); } if (status == TunnelStatus::Unconnected) - throw Exception("MPPTunnel can not be connected because MPPTask is cancelled"); + throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id)); } StringRef MPPTunnel::statusToString() @@ -399,7 +399,7 @@ void MPPTunnel::connectLocalV1(PacketWriter * writer) { std::unique_lock lk(mu); if (status != TunnelStatus::Unconnected) - throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString())); + throw Exception(fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString())); LOG_TRACE(log, "ready to connect local tunnel version 1"); diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 5fa98e93a26..28bdc3921ab 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -377,7 +377,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed."); + GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed."); } } @@ -392,7 +392,7 @@ TEST_F(TestMPPTunnel, SyncWriteDoneAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); + GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed."); } } @@ -460,7 +460,7 @@ TEST_F(TestMPPTunnel, SyncWriteError) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed."); + GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. "); } } @@ -481,7 +481,7 @@ TEST_F(TestMPPTunnel, SyncWriteAfterFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, "); } if (mpp_tunnel_ptr != nullptr) mpp_tunnel_ptr->waitForFinish(); @@ -573,7 +573,7 @@ TEST_F(TestMPPTunnel, AsyncWriteError) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, 0000_0001 meet error: grpc writes failed."); + GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: 0000_0001 meet error: grpc writes failed. "); } } @@ -660,7 +660,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Finished"); + GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Finished"); } TEST_F(TestMPPTunnel, LocalConnectWhenConnected) @@ -679,7 +679,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel has connected or finished: Connected"); + GTEST_ASSERT_EQ(e.message(), "Check status == TunnelStatus::Unconnected failed: MPPTunnel 0000_0001 has connected or finished: Connected"); } TEST_F(TestMPPTunnel, LocalCloseBeforeConnect) @@ -713,7 +713,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel which is already closed."); + GTEST_ASSERT_EQ(e.message(), "Check tunnel_sender != nullptr failed: write to tunnel 0000_0001 which is already closed."); } TEST_F(TestMPPTunnel, LocalWriteDoneAfterUnconnectFinished) @@ -726,7 +726,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); + GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed."); } TEST_F(TestMPPTunnel, LocalWriteError) @@ -756,7 +756,7 @@ try } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "Consumer exits unexpected, err"); + GTEST_ASSERT_EQ(e.message(), "0000_0001: consumer exits unexpected, error message: err "); } TEST_F(TestMPPTunnel, LocalWriteAfterFinished) @@ -775,7 +775,7 @@ TEST_F(TestMPPTunnel, LocalWriteAfterFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel 0000_0001 which is already closed, "); } if (tunnel != nullptr) tunnel->waitForFinish();