From 840f47b9bf59f2f83ca5f39236f9141641396725 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 1 Nov 2023 15:51:20 +0800 Subject: [PATCH] [bug](shared scan) Fix use-after-free when enable pipeline shared scanning (#26199) When enable shared scan, all scanners will be created by one instance. When the main instance reach eos and quit, all states of it will be released. But other instances are still possible to get block from those scanners. So we must assure scanners will not be dependent on any states of the main instance after it quit. --- be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++-- be/src/vec/exec/scan/scanner_context.cpp | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index c74108b0cf3a28..f02e07a6f86b4d 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -84,8 +84,6 @@ class PipScannerContext : public vectorized::ScannerContext { *block = std::move(_blocks_queues[id].front()); _blocks_queues[id].pop_front(); - RETURN_IF_ERROR(validate_block_schema((*block).get())); - if (_blocks_queues[id].empty() && _data_dependency) { _data_dependency->block_reading(); } @@ -108,6 +106,10 @@ class PipScannerContext : public vectorized::ScannerContext { if (_need_colocate_distribute) { std::vector hash_vals; for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } // vectorized calculate hash int rows = block->rows(); const auto element_size = _num_parallel_instances; @@ -141,6 +143,10 @@ class PipScannerContext : public vectorized::ScannerContext { } } else { for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } local_bytes += block->allocated_bytes(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index e1c29d569aa27f..a2ee93815c4077 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -190,6 +190,10 @@ void ScannerContext::append_blocks_to_queue(std::vector& std::lock_guard l(_transfer_lock); auto old_bytes_in_queue = _cur_bytes_in_queue; for (auto& b : blocks) { + auto st = validate_block_schema(b.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } _cur_bytes_in_queue += b->allocated_bytes(); _blocks_queue.push_back(std::move(b)); } @@ -246,8 +250,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo *block = std::move(_blocks_queue.front()); _blocks_queue.pop_front(); - RETURN_IF_ERROR(validate_block_schema((*block).get())); - auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes;