-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Refactor](Exec) Support one rpc send muti blocks #50113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
1 similar comment
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 34103 ms |
TPC-DS: Total hot run time: 192556 ms |
ClickBench: Total hot run time: 30.12 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 34200 ms |
TPC-DS: Total hot run time: 186310 ms |
ClickBench: Total hot run time: 29.65 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 34185 ms |
TPC-DS: Total hot run time: 186560 ms |
ClickBench: Total hot run time: 29.69 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression P0 && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 34059 ms |
TPC-DS: Total hot run time: 185536 ms |
ClickBench: Total hot run time: 29.82 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
8c350b9 to
0166d9a
Compare
|
run buildall |
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 34097 ms |
TPC-DS: Total hot run time: 192406 ms |
ClickBench: Total hot run time: 29.76 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression P0 && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
### What problem does this PR solve? #50113 transmit_block should be marked as done when transmitting the last block. Otherwise, the previous block might reach a memory limit and set done to null. ``` F20250430 11:49:29.413803 2176884 vdata_stream_recvr.cpp:200] Check failed: *done != nullptr *** Check failure stack trace: *** @ 0x558196be1956 google::LogMessage::SendToLog() @ 0x558196bde3a0 google::LogMessage::Flush() @ 0x558196be2199 google::LogMessageFatal::~LogMessageFatal() @ 0x558193b1659c doris::vectorized::VDataStreamRecvr::SenderQueue::add_block() @ 0x558193b1f7b5 doris::vectorized::VDataStreamRecvr::add_block() @ 0x558193af6cf8 doris::vectorized::VDataStreamMgr::transmit_block() @ 0x558157f7a685 doris::pipeline::DataStreamRecvrTest_transmit_block_Test::TestBody() @ 0x5581973c8c0b testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c2a69 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x55819739943a testing::Test::Run() @ 0x558197399e5e testing::TestInfo::Run() @ 0x55819739a71e testing::TestSuite::Run() @ 0x5581973a9dde testing::internal::UnitTestImpl::RunAllTests() @ 0x5581973c9a56 testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c3a61 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x5581973a85d3 testing::UnitTest::Run() @ 0x5581593d8653 RUN_ALL_TESTS() ``` ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1. Rmove unless channel ptr in trans struct to reduce mem consume 2. Send multi block one time in one rpc
…#50560) ### What problem does this PR solve? apache#50113 transmit_block should be marked as done when transmitting the last block. Otherwise, the previous block might reach a memory limit and set done to null. ``` F20250430 11:49:29.413803 2176884 vdata_stream_recvr.cpp:200] Check failed: *done != nullptr *** Check failure stack trace: *** @ 0x558196be1956 google::LogMessage::SendToLog() @ 0x558196bde3a0 google::LogMessage::Flush() @ 0x558196be2199 google::LogMessageFatal::~LogMessageFatal() @ 0x558193b1659c doris::vectorized::VDataStreamRecvr::SenderQueue::add_block() @ 0x558193b1f7b5 doris::vectorized::VDataStreamRecvr::add_block() @ 0x558193af6cf8 doris::vectorized::VDataStreamMgr::transmit_block() @ 0x558157f7a685 doris::pipeline::DataStreamRecvrTest_transmit_block_Test::TestBody() @ 0x5581973c8c0b testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c2a69 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x55819739943a testing::Test::Run() @ 0x558197399e5e testing::TestInfo::Run() @ 0x55819739a71e testing::TestSuite::Run() @ 0x5581973a9dde testing::internal::UnitTestImpl::RunAllTests() @ 0x5581973c9a56 testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c3a61 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x5581973a85d3 testing::UnitTest::Run() @ 0x5581593d8653 RUN_ALL_TESTS() ``` ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
We previously had a crash. The cause is that we should not access the
request after calling add_block(...) because add_block may enqueue a
closure that runs on another thread and frees the request
```
==730145==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x7be1efd803a0 at pc 0x556b38d9d625 bp 0x7b16bf0193f0 sp 0x7b16bf0193e8
READ of size 4 at 0x7be1efd803a0 thread T1559
#0 0x556b38d9d624 in google::protobuf::internal::RepeatedPtrFieldBase::size() const /home/zcp/repo_center/doris_master/doris/thirdparty/installed/include/google/protobuf/repeated_ptr_field.h:185:29
#1 0x556b408ab062 in google::protobuf::RepeatedPtrField<doris::PBlock>::size() const /home/zcp/repo_center/doris_master/doris/thirdparty/installed/include/google/protobuf/repeated_ptr_field.h:1248:32
#2 0x556b408aaff4 in doris::PTransmitDataParams::_internal_blocks_size() const /home/zcp/repo_center/doris_master/doris/be/../gensrc/build/gen_cpp/internal_service.pb.h:32149:25
#3 0x556b4089731c in doris::PTransmitDataParams::blocks_size() const /home/zcp/repo_center/doris_master/doris/be/../gensrc/build/gen_cpp/internal_service.pb.h:32152:10
#4 0x556b60a83c17 in doris::vectorized::VDataStreamMgr::transmit_block(doris::PTransmitDataParams const*, google::protobuf::Closure**, long) /home/zcp/repo_center/doris_master/doris/be/src/vec/runtime/vdata_stream_mgr.cpp:150:38
#5 0x556b407f7408 in doris::PInternalService::_transmit_block(google::protobuf::RpcController*, doris::PTransmitDataParams const*, doris::PTransmitDataResult*, google::protobuf::Closure*, doris::Status const&, long) /home/zcp/repo_center/doris_master/doris/be/src/service/internal_service.cpp:1673:40
#6 0x556b407f52bb in doris::PInternalService::transmit_block(google::protobuf::RpcController*, doris::PTransmitDataParams const*, doris::PTransmitDataResult*, google::protobuf::Closure*) /home/zcp/repo_center/doris_master/doris/be/src/service/internal_service.cpp:1610:9
#7 0x556b43fceba2 in doris::PBackendService::CallMethod(google::protobuf::MethodDescriptor const*, google::protobuf::RpcController*, google::protobuf::Message const*, google::protobuf::Message*, google::protobuf::Closure*) /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/internal_service.pb.cc:49452:7
#8 0x556b6736273e in brpc::policy::ProcessRpcRequest(brpc::InputMessageBase*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770bd73e)
#9 0x556b67357426 in brpc::ProcessInputMessage(void*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b2426)
#10 0x556b67357f20 in brpc::InputMessenger::InputMessageClosure::~InputMessageClosure() (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b2f20)
#11 0x556b673588dd in brpc::InputMessenger::OnNewMessages(brpc::Socket*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b38dd)
#12 0x556b674a0adc in brpc::Socket::ProcessEvent(void*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x771fbadc)
#13 0x556b672e0f76 in bthread::TaskGroup::task_runner(long) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x7703bf76)
#14 0x556b672cbbe0 in bthread_make_fcontext (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x77026be0)
```
```
Note: The done pointer will be saved in add_block and may be called in another thread via done->Run().
For example, when blocks_size == 1, the process is as follows:
transmit_block (i=0)
└─> recvr->add_block(..., done, ...) // Pass done
└─> SenderQueue::add_block
└─> _pending_closures.push(done) // done is saved
get_batch() [another thread]
└─> closure_pair.first->Run() // ⚠️ done->Run() is called
└─> brpc releases request and response
transmit_block (i=1) [original thread continues]
└─> request->blocks_size() // ⚠️ request has already been released!
At this point, a use-after-free issue occurs.
TODO: We should consider refactoring this part because add_block may release the request.
We should not access the request after calling add_block.
```
#50113
…57888) We previously had a crash. The cause is that we should not access the request after calling add_block(...) because add_block may enqueue a closure that runs on another thread and frees the request ``` ==730145==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x7be1efd803a0 at pc 0x556b38d9d625 bp 0x7b16bf0193f0 sp 0x7b16bf0193e8 READ of size 4 at 0x7be1efd803a0 thread T1559 #0 0x556b38d9d624 in google::protobuf::internal::RepeatedPtrFieldBase::size() const /home/zcp/repo_center/doris_master/doris/thirdparty/installed/include/google/protobuf/repeated_ptr_field.h:185:29 apache#1 0x556b408ab062 in google::protobuf::RepeatedPtrField<doris::PBlock>::size() const /home/zcp/repo_center/doris_master/doris/thirdparty/installed/include/google/protobuf/repeated_ptr_field.h:1248:32 apache#2 0x556b408aaff4 in doris::PTransmitDataParams::_internal_blocks_size() const /home/zcp/repo_center/doris_master/doris/be/../gensrc/build/gen_cpp/internal_service.pb.h:32149:25 apache#3 0x556b4089731c in doris::PTransmitDataParams::blocks_size() const /home/zcp/repo_center/doris_master/doris/be/../gensrc/build/gen_cpp/internal_service.pb.h:32152:10 apache#4 0x556b60a83c17 in doris::vectorized::VDataStreamMgr::transmit_block(doris::PTransmitDataParams const*, google::protobuf::Closure**, long) /home/zcp/repo_center/doris_master/doris/be/src/vec/runtime/vdata_stream_mgr.cpp:150:38 apache#5 0x556b407f7408 in doris::PInternalService::_transmit_block(google::protobuf::RpcController*, doris::PTransmitDataParams const*, doris::PTransmitDataResult*, google::protobuf::Closure*, doris::Status const&, long) /home/zcp/repo_center/doris_master/doris/be/src/service/internal_service.cpp:1673:40 apache#6 0x556b407f52bb in doris::PInternalService::transmit_block(google::protobuf::RpcController*, doris::PTransmitDataParams const*, doris::PTransmitDataResult*, google::protobuf::Closure*) /home/zcp/repo_center/doris_master/doris/be/src/service/internal_service.cpp:1610:9 apache#7 0x556b43fceba2 in doris::PBackendService::CallMethod(google::protobuf::MethodDescriptor const*, google::protobuf::RpcController*, google::protobuf::Message const*, google::protobuf::Message*, google::protobuf::Closure*) /home/zcp/repo_center/doris_master/doris/gensrc/build/gen_cpp/internal_service.pb.cc:49452:7 apache#8 0x556b6736273e in brpc::policy::ProcessRpcRequest(brpc::InputMessageBase*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770bd73e) apache#9 0x556b67357426 in brpc::ProcessInputMessage(void*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b2426) apache#10 0x556b67357f20 in brpc::InputMessenger::InputMessageClosure::~InputMessageClosure() (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b2f20) apache#11 0x556b673588dd in brpc::InputMessenger::OnNewMessages(brpc::Socket*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x770b38dd) apache#12 0x556b674a0adc in brpc::Socket::ProcessEvent(void*) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x771fbadc) apache#13 0x556b672e0f76 in bthread::TaskGroup::task_runner(long) (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x7703bf76) apache#14 0x556b672cbbe0 in bthread_make_fcontext (/mnt/hdd01/selectdb-cloud-chaos/cluster0/be/lib/doris_be+0x77026be0) ``` ``` Note: The done pointer will be saved in add_block and may be called in another thread via done->Run(). For example, when blocks_size == 1, the process is as follows: transmit_block (i=0) └─> recvr->add_block(..., done, ...) // Pass done └─> SenderQueue::add_block └─> _pending_closures.push(done) // done is saved get_batch() [another thread] └─> closure_pair.first->Run() //⚠️ done->Run() is called └─> brpc releases request and response transmit_block (i=1) [original thread continues] └─> request->blocks_size() //⚠️ request has already been released! At this point, a use-after-free issue occurs. TODO: We should consider refactoring this part because add_block may release the request. We should not access the request after calling add_block. ``` apache#50113
What problem does this PR solve?
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)