diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index c74108b0cf3a28..f02e07a6f86b4d 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -84,8 +84,6 @@ class PipScannerContext : public vectorized::ScannerContext { *block = std::move(_blocks_queues[id].front()); _blocks_queues[id].pop_front(); - RETURN_IF_ERROR(validate_block_schema((*block).get())); - if (_blocks_queues[id].empty() && _data_dependency) { _data_dependency->block_reading(); } @@ -108,6 +106,10 @@ class PipScannerContext : public vectorized::ScannerContext { if (_need_colocate_distribute) { std::vector hash_vals; for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } // vectorized calculate hash int rows = block->rows(); const auto element_size = _num_parallel_instances; @@ -141,6 +143,10 @@ class PipScannerContext : public vectorized::ScannerContext { } } else { for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } local_bytes += block->allocated_bytes(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index e1c29d569aa27f..a2ee93815c4077 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -190,6 +190,10 @@ void ScannerContext::append_blocks_to_queue(std::vector& std::lock_guard l(_transfer_lock); auto old_bytes_in_queue = _cur_bytes_in_queue; for (auto& b : blocks) { + auto st = validate_block_schema(b.get()); + if (!st.ok()) { + set_status_on_error(st, false); + } _cur_bytes_in_queue += b->allocated_bytes(); _blocks_queue.push_back(std::move(b)); } @@ -246,8 +250,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo *block = std::move(_blocks_queue.front()); _blocks_queue.pop_front(); - RETURN_IF_ERROR(validate_block_schema((*block).get())); - auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes;