From 1df2f9a3e6b252c10a6a5b3627b496f252c40a6c Mon Sep 17 00:00:00 2001 From: taojiatao <245915794@qq.com> Date: Wed, 12 Oct 2022 17:44:07 +0800 Subject: [PATCH 1/2] Split large data when using brpc streaming --- docs/cn/streaming_rpc.md | 3 +-- docs/en/streaming_rpc.md | 3 +-- src/brpc/stream.cpp | 27 +++++++++++++++++++-------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md index 987ce37b17..ddbd5ff385 100644 --- a/docs/cn/streaming_rpc.md +++ b/docs/cn/streaming_rpc.md @@ -16,8 +16,7 @@ Streaming RPC保证: - 全双工。 - 支持流控。 - 提供超时提醒 - -目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。 +- 自动切割过大的消息 例子见[example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/)。 diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md index 8ea32e2318..4bd5d602c3 100644 --- a/docs/en/streaming_rpc.md +++ b/docs/en/streaming_rpc.md @@ -16,8 +16,7 @@ Streaming RPC ensures/provides: - Full duplex - Flow control - Notification on timeout - -We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation. +- support segment large messages automaticall For examples please refer to [example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/). diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 67b3541ac9..c84184bb38 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -35,7 +35,8 @@ namespace brpc { DECLARE_bool(usercode_in_pthread); - +DEFINE_uint64(max_trans_unit_size, 64 * 1024 * 1024, + "Maximum size of a transmission unit that we used to cut the message."); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; Stream::Stream() @@ -140,20 +141,30 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/, errno = EBADF; return -1; } - butil::IOBuf out; ssize_t len = 0; for (size_t i = 0; i < size; ++i) { + butil::IOBuf *data = data_list[i]; + size_t length = data->length(); + uint64_t trans_unit = FLAGS_max_trans_unit_size; + int packet_num = ceil((double)length / (double)trans_unit); + + butil::IOBuf split_data; + for (int j = 0; j < packet_num; j++) { + butil::IOBuf out; + data->cutn(&split_data, trans_unit); + bool has_continuation = (j != packet_num - 1); StreamFrameMeta fm; fm.set_stream_id(_remote_settings.stream_id()); fm.set_source_stream_id(id()); fm.set_frame_type(FRAME_TYPE_DATA); - // TODO: split large data - fm.set_has_continuation(false); - policy::PackStreamMessage(&out, fm, data_list[i]); - len += data_list[i]->length(); - data_list[i]->clear(); + fm.set_has_continuation(has_continuation); + policy::PackStreamMessage(&out, fm, &split_data); + WriteToHostSocket(&out); + len += (ssize_t)split_data.length(); + split_data.clear(); + } + data->clear(); } - WriteToHostSocket(&out); return len; } From 1589af6ab7d99f5804404ac408b55aff3db8d05f Mon Sep 17 00:00:00 2001 From: taojiatao <245915794@qq.com> Date: Wed, 12 Oct 2022 18:57:35 +0800 Subject: [PATCH 2/2] review update --- docs/en/streaming_rpc.md | 2 +- src/brpc/stream.cpp | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md index 4bd5d602c3..74c090edf5 100644 --- a/docs/en/streaming_rpc.md +++ b/docs/en/streaming_rpc.md @@ -16,7 +16,7 @@ Streaming RPC ensures/provides: - Full duplex - Flow control - Notification on timeout -- support segment large messages automaticall +- support segment large messages automatically For examples please refer to [example/streaming_echo_c++](https://github.com/brpc/brpc/tree/master/example/streaming_echo_c++/). diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index c84184bb38..d81da2381e 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -35,7 +35,7 @@ namespace brpc { DECLARE_bool(usercode_in_pthread); -DEFINE_uint64(max_trans_unit_size, 64 * 1024 * 1024, +DEFINE_uint64(max_stream_data_frame_size, 64 * 1024 * 1024, "Maximum size of a transmission unit that we used to cut the message."); const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L; @@ -145,7 +145,9 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/, for (size_t i = 0; i < size; ++i) { butil::IOBuf *data = data_list[i]; size_t length = data->length(); - uint64_t trans_unit = FLAGS_max_trans_unit_size; + uint64_t trans_unit = FLAGS_max_stream_data_frame_size == 0 + ? length + : FLAGS_max_stream_data_frame_size; int packet_num = ceil((double)length / (double)trans_unit); butil::IOBuf split_data;