From 00f4d35f14bd6d8ff37258bddf0c2970c7a7ce7e Mon Sep 17 00:00:00 2001 From: chaoyli Date: Mon, 3 Aug 2020 19:00:50 +0800 Subject: [PATCH 1/3] [BUG] Using attachement strategy of brpc to send packet with big size. BRPC send packet should serialize it first and then send it. If we send one batch with big size, it will encounter a connection failed. So we can use attachment strategy to bypass the problem and eliminate the serialization cost. --- be/src/common/config.h | 2 ++ be/src/runtime/data_stream_sender.cpp | 18 ++++++++++-------- be/src/service/brpc.h | 1 + be/src/service/internal_service.cpp | 7 +++++++ 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 31a62b76a0c219..b69b70e37f59be 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -519,6 +519,8 @@ namespace config { CONF_Int64(brpc_max_body_size, "209715200"); // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED CONF_Int64(brpc_socket_max_unwritten_bytes, "67108864"); + // If batch size large than brpc_attachment_threashold, use attachment instead + CONF_Int64(brpc_attachment_threashold, "67108864"); // max number of txns for every txn_partition_map in txn manager // this is a self protection to avoid too many txns saving in manager diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 73d642d677e44f..93c98e932d4dc6 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -164,7 +164,7 @@ class DataStreamSender::Channel { int64_t _packet_seq; // we're accumulating rows into this batch - boost::scoped_ptr _batch; + std::unique_ptr _batch; bool _need_close; int _be_number; @@ -227,7 +227,14 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { } _brpc_request.set_eos(eos); - if (batch != nullptr) { + if (batch != nullptr && RowBatch::get_batch_size(*batch) > config::brpc_socket_max_unwritten_bytes) { + std::string* tuple_data_to_attachment = batch->release_tuple_data(); + butil::IOBuf& io_buf = _closure->cntl.request_attachment(); + // append_user_data will not copy data instead of reference + io_buf.append_user_data(const_cast(tuple_data_to_attachment->c_str()), + tuple_data_to_attachment->size(), + [](void* address) { free(address); }); + batch->mutable_tuple_data(); // to padding the required tuple_data field in PB _brpc_request.set_allocated_row_batch(batch); } _brpc_request.set_packet_seq(_packet_seq++); @@ -270,12 +277,7 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) { } Status DataStreamSender::Channel::send_current_batch(bool eos) { - { - SCOPED_TIMER(_parent->_serialize_batch_timer); - int uncompressed_bytes = _batch->serialize(&_pb_batch); - COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch)); - COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes); - } + _parent->serialize_batch(_batch.get(), &_pb_batch, 1); _batch->reset(); RETURN_IF_ERROR(send_batch(&_pb_batch, eos)); return Status::OK(); diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index d3fa30f4811aac..c9325d31e861bf 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -56,3 +56,4 @@ #include #include #include +#include diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68dfdfd63813ad..07372ba01e1cb7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -49,6 +49,13 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt google::protobuf::Closure* done) { VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); + brpc::Controller* cntl = static_cast(cntl_base); + if (cntl->request_attachment().size() > 0) { + PRowBatch* batch = (const_cast(request))->mutable_row_batch(); + butil::IOBuf& io_buf = cntl->request_attachment(); + std::string* tuple_data = batch->mutable_tuple_data(); + io_buf.copy_to(tuple_data); + } _exec_env->stream_mgr()->transmit_data(request, &done); if (done != nullptr) { done->Run(); From 432aa46b8e22bfd9c9acfb5d79fff19be17a4e8d Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 4 Aug 2020 14:08:21 +0800 Subject: [PATCH 2/3] Fix memory leak --- be/src/common/config.h | 2 -- be/src/runtime/data_stream_sender.cpp | 8 ++------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index b69b70e37f59be..31a62b76a0c219 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -519,8 +519,6 @@ namespace config { CONF_Int64(brpc_max_body_size, "209715200"); // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED CONF_Int64(brpc_socket_max_unwritten_bytes, "67108864"); - // If batch size large than brpc_attachment_threashold, use attachment instead - CONF_Int64(brpc_attachment_threashold, "67108864"); // max number of txns for every txn_partition_map in txn manager // this is a self protection to avoid too many txns saving in manager diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 93c98e932d4dc6..8c38e06aee3555 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -227,13 +227,9 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { } _brpc_request.set_eos(eos); - if (batch != nullptr && RowBatch::get_batch_size(*batch) > config::brpc_socket_max_unwritten_bytes) { - std::string* tuple_data_to_attachment = batch->release_tuple_data(); + if (batch != nullptr) { butil::IOBuf& io_buf = _closure->cntl.request_attachment(); - // append_user_data will not copy data instead of reference - io_buf.append_user_data(const_cast(tuple_data_to_attachment->c_str()), - tuple_data_to_attachment->size(), - [](void* address) { free(address); }); + io_buf.append(batch->tuple_data()); batch->mutable_tuple_data(); // to padding the required tuple_data field in PB _brpc_request.set_allocated_row_batch(batch); } From f7d0a27a108a7573c2b3ed09b3113bfccd9bb98d Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 4 Aug 2020 14:27:51 +0800 Subject: [PATCH 3/3] Add clear tuple_data --- be/src/runtime/data_stream_sender.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 8c38e06aee3555..384e7606f9f58c 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -230,7 +230,7 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { if (batch != nullptr) { butil::IOBuf& io_buf = _closure->cntl.request_attachment(); io_buf.append(batch->tuple_data()); - batch->mutable_tuple_data(); // to padding the required tuple_data field in PB + batch->set_tuple_data(""); // to padding the required tuple_data field in PB _brpc_request.set_allocated_row_batch(batch); } _brpc_request.set_packet_seq(_packet_seq++);