Skip to content

Commit

Permalink
Merge branch 'master' into compact_by_delete_range
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu committed Aug 23, 2021
2 parents bd457f0 + 707b512 commit 4e7a39b
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions dbms/src/Flash/Mpp/MPPTunnelSet.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#include <Common/Exception.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <fmt/core.h>

namespace DB
{

namespace
{
inline mpp::MPPDataPacket serializeToPacket(const tipb::SelectResponse & response)
{
mpp::MPPDataPacket packet;
if (!response.SerializeToString(packet.mutable_data()))
throw Exception(fmt::format("Fail to serialize response, response size: {}", response.ByteSizeLong()));
return packet;
}
} // namespace

void MPPTunnelSet::clearExecutionSummaries(tipb::SelectResponse & response)
{
/// can not use response.clear_execution_summaries() because
Expand All @@ -19,20 +31,23 @@ void MPPTunnelSet::clearExecutionSummaries(tipb::SelectResponse & response)

void MPPTunnelSet::write(tipb::SelectResponse & response)
{
for (size_t i = 0; i < tunnels.size(); ++i)
write(response, i);
tunnels[0]->write(serializeToPacket(response));

if (tunnels.size() > 1)
{
clearExecutionSummaries(response);
auto packet = serializeToPacket(response);
for (size_t i = 1; i < tunnels.size(); ++i)
tunnels[i]->write(packet);
}
}

void MPPTunnelSet::write(tipb::SelectResponse & response, int16_t partition_id)
{
if (partition_id != 0)
clearExecutionSummaries(response);

mpp::MPPDataPacket packet;
if (!response.SerializeToString(packet.mutable_data()))
throw Exception("Fail to serialize response, response size: " + std::to_string(response.ByteSizeLong()));
tunnels[partition_id]->write(packet);
tunnels[partition_id]->write(serializeToPacket(response));
}

} // namespace DB

0 comments on commit 4e7a39b

Please sign in to comment.