From 198185c250bc7e98e988696af1e3b43e59721ff6 Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 9 Feb 2023 12:34:09 +0800 Subject: [PATCH] refine log Signed-off-by: xufei --- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 31 +++++++++++++++++-------- dbms/src/Flash/Mpp/MPPTunnel.cpp | 18 +++++++------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 6e254bd62dc..9f0bd8e99dc 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -834,6 +834,7 @@ void ExchangeReceiverBase::connectionDone( const LoggerPtr & log) { Int32 copy_live_connections; + String first_err_msg = local_err_msg; { std::lock_guard lock(mu); @@ -843,29 +844,39 @@ void ExchangeReceiverBase::connectionDone( state = ExchangeReceiverState::ERROR; if (err_msg.empty()) err_msg = local_err_msg; + else + first_err_msg = err_msg; } copy_live_connections = --live_connections; } - LOG_DEBUG( - log, - "connection end. meet error: {}, err msg: {}, current alive connections: {}", - meet_error, - local_err_msg, - copy_live_connections); - + if (meet_error) + { + LOG_WARNING( + log, + "connection end. meet error: {}, err msg: {}, current alive connections: {}", + meet_error, + local_err_msg, + copy_live_connections); + } + else + { + LOG_DEBUG( + log, + "connection end. Current alive connections: {}", + copy_live_connections); + } + assert(copy_live_connections >= 0); if (copy_live_connections == 0) { LOG_DEBUG(log, "All threads end in ExchangeReceiver"); cv.notify_all(); } - else if (copy_live_connections < 0) - throw Exception("alive_connection_num should not be less than 0!"); if (meet_error || copy_live_connections == 0) { - LOG_INFO(exc_log, "receiver channels finished"); + LOG_INFO(exc_log, "receiver channels finished, meet error: {}, error message: {}", meet_error, first_err_msg); finishAllMsgChannels(); } } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 5a56f47368e..4978e6f4f27 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -157,7 +157,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data) { std::unique_lock lk(mu); waitUntilConnectedOrFinished(lk); - RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel which is already closed."); + RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id); } auto pushed_data_size = data->getPacket().ByteSizeLong(); @@ -167,7 +167,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data) updateConnProfileInfo(pushed_data_size); return; } - throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); + throw Exception(fmt::format("write to tunnel {} which is already closed, {}", tunnel_id, tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); } /// done normally and being called exactly once after writing all packets @@ -179,7 +179,7 @@ void MPPTunnel::writeDone() /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lk); if (tunnel_sender == nullptr) - throw Exception(fmt::format("write to tunnel which is already closed.")); + throw Exception(fmt::format("write to tunnel {} which is already closed.", tunnel_id)); } tunnel_sender->finish(); waitForSenderFinish(/*allow_throw=*/true); @@ -208,8 +208,8 @@ void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_ { { std::unique_lock lk(mu); - RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString())); - RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel"); + RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString())); + RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "{} should be a local tunnel", tunnel_id); LOG_TRACE(log, "ready to connect local tunnel version 2"); if (is_fine_grained) @@ -233,7 +233,7 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data) { { std::unique_lock lk(mu); - RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel has connected or finished: {}", statusToString()); + RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()); LOG_TRACE(log, "ready to connect async"); RUNTIME_ASSERT(mode == TunnelSenderMode::ASYNC_GRPC, log, "mode {} is not async grpc in connectAsync", magic_enum::enum_name(mode)); @@ -283,7 +283,7 @@ void MPPTunnel::waitForSenderFinish(bool allow_throw) status = TunnelStatus::Finished; } if (allow_throw && !err_msg.empty()) - throw Exception("Consumer exits unexpected, " + err_msg); + throw Exception(fmt::format("{}: consumer exits unexpected, error message: {} ", tunnel_id, err_msg)); LOG_TRACE(log, "end wait for consumer finish!"); } @@ -311,7 +311,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock & lk) LOG_TRACE(log, "end waitUntilConnectedOrFinished"); } if (status == TunnelStatus::Unconnected) - throw Exception("MPPTunnel can not be connected because MPPTask is cancelled"); + throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id)); } StringRef MPPTunnel::statusToString() @@ -399,7 +399,7 @@ void MPPTunnel::connectLocalV1(PacketWriter * writer) { std::unique_lock lk(mu); if (status != TunnelStatus::Unconnected) - throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString())); + throw Exception(fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString())); LOG_TRACE(log, "ready to connect local tunnel version 1");