diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index d321075435b2eb..04eb2e19dda47b 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -78,8 +78,39 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { Status Scanner::get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& row_descriptor = _local_state->_parent->row_descriptor(); if (_output_row_descriptor) { - _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); - RETURN_IF_ERROR(get_block(state, &_origin_block, eos)); + if (_alreay_eos) { + *eos = true; + _padding_block.swap(_origin_block); + } else { + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + const auto min_batch_size = std::max(state->batch_size() / 2, 1); + while (_padding_block.rows() < min_batch_size && !*eos) { + RETURN_IF_ERROR(get_block(state, &_origin_block, eos)); + if (_origin_block.rows() >= min_batch_size) { + break; + } + + if (_origin_block.rows() + _padding_block.rows() <= state->batch_size()) { + RETURN_IF_ERROR(_merge_padding_block()); + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + } else { + if (_origin_block.rows() < _padding_block.rows()) { + _padding_block.swap(_origin_block); + } + break; + } + } + } + + // first output the origin block change eos = false, next time output padding block + // set the eos to true + if (*eos && !_padding_block.empty() && !_origin_block.empty()) { + _alreay_eos = true; + *eos = false; + } + if (_origin_block.empty() && !_padding_block.empty()) { + _padding_block.swap(_origin_block); + } return _do_projections(&_origin_block, block); } else { return get_block(state, block, eos); diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h index 26023c58b3b076..9aad37169464e1 100644 --- a/be/src/vec/exec/scan/scanner.h +++ b/be/src/vec/exec/scan/scanner.h @@ -103,6 +103,16 @@ class Scanner { // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; + Status _merge_padding_block() { + if (_padding_block.empty()) { + _padding_block.swap(_origin_block); + } else if (_origin_block.rows()) { + RETURN_IF_ERROR( + MutableBlock::build_mutable_block(&_padding_block).merge(_origin_block)); + } + return Status::OK(); + } + // Update the counters before closing this scanner virtual void _collect_profile_before_close(); @@ -217,6 +227,8 @@ class Scanner { // Used in common subexpression elimination to compute intermediate results. std::vector _intermediate_projections; vectorized::Block _origin_block; + vectorized::Block _padding_block; + bool _alreay_eos = false; VExprContextSPtrs _common_expr_ctxs_push_down; // Late arriving runtime filters will update _conjuncts.