Skip to content

Commit

Permalink
[bug](shared scan) Fix use-after-free when enable pipeline shared sca…
Browse files Browse the repository at this point in the history
…nning (apache#26199)

When enable shared scan, all scanners will be created by one instance. When the main instance reach eos and quit, all states of it will be released. But other instances are still possible to get block from those scanners. So we must assure scanners will not be dependent on any states of the main instance after it quit.
  • Loading branch information
Gabriel39 authored and seawinde committed Nov 12, 2023
1 parent 634c048 commit 8890893
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
10 changes: 8 additions & 2 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ class PipScannerContext : public vectorized::ScannerContext {
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();

RETURN_IF_ERROR(validate_block_schema((*block).get()));

if (_blocks_queues[id].empty() && _data_dependency) {
_data_dependency->block_reading();
}
Expand All @@ -108,6 +106,10 @@ class PipScannerContext : public vectorized::ScannerContext {
if (_need_colocate_distribute) {
std::vector<uint32_t> hash_vals;
for (const auto& block : blocks) {
auto st = validate_block_schema(block.get());
if (!st.ok()) {
set_status_on_error(st, false);
}
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _num_parallel_instances;
Expand Down Expand Up @@ -141,6 +143,10 @@ class PipScannerContext : public vectorized::ScannerContext {
}
} else {
for (const auto& block : blocks) {
auto st = validate_block_schema(block.get());
if (!st.ok()) {
set_status_on_error(st, false);
}
local_bytes += block->allocated_bytes();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
auto st = validate_block_schema(b.get());
if (!st.ok()) {
set_status_on_error(st, false);
}
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
Expand Down Expand Up @@ -246,8 +250,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();

RETURN_IF_ERROR(validate_block_schema((*block).get()));

auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;

Expand Down

0 comments on commit 8890893

Please sign in to comment.