Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split large data when using brpc streaming #1947

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Aaaaaaron
Copy link
Member

https://github.com/apache/incubator-brpc/blob/master/docs/cn/streaming_rpc.md

"目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有Head-of-line blocking问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档"

尝试解决这个 todo.

@Aaaaaaron Aaaaaaron force-pushed the stream_split_mesg branch 3 times, most recently from 04a6930 to 67b6b29 Compare October 9, 2022 03:00
src/brpc/stream.cpp Show resolved Hide resolved
int packet_num = ceil((double)length / (double)trans_unit);

butil::IOBuf split_data;
for (int j = 0; j < packet_num; j++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用while (!data->empty()) 来判断是不是更直接。也不用计算packet_num了。

data_list[i]->clear();
fm.set_has_continuation(has_continuation);
policy::PackStreamMessage(&out, fm, &split_data);
WriteToHostSocket(&out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果has_continuation再Write
否则放到最后统一Write

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是指如果消息没有达到 max_stream_data_frame_size, 就还是和之前一样放到最后统一 write 吗

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

docs/en/streaming_rpc.md Outdated Show resolved Hide resolved
src/brpc/stream.cpp Outdated Show resolved Hide resolved
@wwbmmm
Copy link
Contributor

wwbmmm commented Oct 11, 2022

另外,建议补充下对应单测

@Aaaaaaron
Copy link
Member Author

@wwbmmm 多谢 review, ut 一直想加的, 没有想到合适的测法, 请问你这边有啥建议吗

@wwbmmm
Copy link
Contributor

wwbmmm commented Oct 12, 2022

@wwbmmm 多谢 review, ut 一直想加的, 没有想到合适的测法, 请问你这边有啥建议吗

可以把max_stream_data_frame_size设置成一个比较小的值,然后发一个比较长的包,看是否有分多次发送(通过Socket的GetOrNewSharedPart()->out_num_messages可以获取write的次数)

@jenrryyou
Copy link
Contributor

jenrryyou commented Oct 24, 2022

想到一个问题,如果还没有收到带结束标记的数据帧(has_continuation=false),这个stream就关闭了(收到close或者主动关闭),_pending_buf已经保存的iobuf数据没有释放的相关代码,可能存在内存泄漏。

@wwbmmm
Copy link
Contributor

wwbmmm commented Nov 23, 2022

@Aaaaaaron 这个PR和master有冲突了,能否解决一下。并补充单测,谢谢!

@Aaaaaaron
Copy link
Member Author

@wwbmmm 好的 谢谢 不好意思最近比较忙😂

@Superskyyy
Copy link
Member

@Aaaaaaron 请问这个可以合并了么

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants