Skip to content

Commit

Permalink
refine log
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Feb 9, 2023
1 parent dc7179d commit 198185c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
31 changes: 21 additions & 10 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
const LoggerPtr & log)
{
Int32 copy_live_connections;
String first_err_msg = local_err_msg;
{
std::lock_guard lock(mu);

Expand All @@ -843,29 +844,39 @@ void ExchangeReceiverBase<RPCContext>::connectionDone(
state = ExchangeReceiverState::ERROR;
if (err_msg.empty())
err_msg = local_err_msg;
else
first_err_msg = err_msg;
}

copy_live_connections = --live_connections;
}

LOG_DEBUG(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);

if (meet_error)
{
LOG_WARNING(
log,
"connection end. meet error: {}, err msg: {}, current alive connections: {}",
meet_error,
local_err_msg,
copy_live_connections);
}
else
{
LOG_DEBUG(
log,
"connection end. Current alive connections: {}",
copy_live_connections);
}
assert(copy_live_connections >= 0);
if (copy_live_connections == 0)
{
LOG_DEBUG(log, "All threads end in ExchangeReceiver");
cv.notify_all();
}
else if (copy_live_connections < 0)
throw Exception("alive_connection_num should not be less than 0!");

if (meet_error || copy_live_connections == 0)
{
LOG_INFO(exc_log, "receiver channels finished");
LOG_INFO(exc_log, "receiver channels finished, meet error: {}, error message: {}", meet_error, first_err_msg);
finishAllMsgChannels();
}
}
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
{
std::unique_lock lk(mu);
waitUntilConnectedOrFinished(lk);
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel which is already closed.");
RUNTIME_CHECK_MSG(tunnel_sender != nullptr, "write to tunnel {} which is already closed.", tunnel_id);
}

auto pushed_data_size = data->getPacket().ByteSizeLong();
Expand All @@ -167,7 +167,7 @@ void MPPTunnel::write(TrackedMppDataPacketPtr && data)
updateConnProfileInfo(pushed_data_size);
return;
}
throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
throw Exception(fmt::format("write to tunnel {} which is already closed, {}", tunnel_id, tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : ""));
}

/// done normally and being called exactly once after writing all packets
Expand All @@ -179,7 +179,7 @@ void MPPTunnel::writeDone()
/// make sure to finish the tunnel after it is connected
waitUntilConnectedOrFinished(lk);
if (tunnel_sender == nullptr)
throw Exception(fmt::format("write to tunnel which is already closed."));
throw Exception(fmt::format("write to tunnel {} which is already closed.", tunnel_id));
}
tunnel_sender->finish();
waitForSenderFinish(/*allow_throw=*/true);
Expand Down Expand Up @@ -208,8 +208,8 @@ void MPPTunnel::connectLocalV2(size_t source_index, LocalRequestHandler & local_
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "This should be a local tunnel");
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));
RUNTIME_CHECK_MSG(mode == TunnelSenderMode::LOCAL, "{} should be a local tunnel", tunnel_id);

LOG_TRACE(log, "ready to connect local tunnel version 2");
if (is_fine_grained)
Expand All @@ -233,7 +233,7 @@ void MPPTunnel::connectAsync(IAsyncCallData * call_data)
{
{
std::unique_lock lk(mu);
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel has connected or finished: {}", statusToString());
RUNTIME_CHECK_MSG(status == TunnelStatus::Unconnected, "MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString());

LOG_TRACE(log, "ready to connect async");
RUNTIME_ASSERT(mode == TunnelSenderMode::ASYNC_GRPC, log, "mode {} is not async grpc in connectAsync", magic_enum::enum_name(mode));
Expand Down Expand Up @@ -283,7 +283,7 @@ void MPPTunnel::waitForSenderFinish(bool allow_throw)
status = TunnelStatus::Finished;
}
if (allow_throw && !err_msg.empty())
throw Exception("Consumer exits unexpected, " + err_msg);
throw Exception(fmt::format("{}: consumer exits unexpected, error message: {} ", tunnel_id, err_msg));
LOG_TRACE(log, "end wait for consumer finish!");
}

Expand Down Expand Up @@ -311,7 +311,7 @@ void MPPTunnel::waitUntilConnectedOrFinished(std::unique_lock<std::mutex> & lk)
LOG_TRACE(log, "end waitUntilConnectedOrFinished");
}
if (status == TunnelStatus::Unconnected)
throw Exception("MPPTunnel can not be connected because MPPTask is cancelled");
throw Exception(fmt::format("MPPTunnel {} can not be connected because MPPTask is cancelled", tunnel_id));
}

StringRef MPPTunnel::statusToString()
Expand Down Expand Up @@ -399,7 +399,7 @@ void MPPTunnel::connectLocalV1(PacketWriter * writer)
{
std::unique_lock lk(mu);
if (status != TunnelStatus::Unconnected)
throw Exception(fmt::format("MPPTunnel has connected or finished: {}", statusToString()));
throw Exception(fmt::format("MPPTunnel {} has connected or finished: {}", tunnel_id, statusToString()));

LOG_TRACE(log, "ready to connect local tunnel version 1");

Expand Down

0 comments on commit 198185c

Please sign in to comment.