diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index a939d25654b4cc..0369cf75834ce9 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -62,6 +62,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _num_partitions)); RETURN_IF_ERROR(_partitioner->init(_texprs)); } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + DCHECK_GT(num_buckets, 0); _partitioner.reset( new vectorized::Crc32HashPartitioner(num_buckets)); RETURN_IF_ERROR(_partitioner->init(_texprs)); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index c5f99ca5d6a4a5..647988f8b794cb 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -226,19 +226,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); } } - } else if (_num_senders != _num_sources) { - // In this branch, data just should be distributed equally into all instances. - new_block_wrapper->ref(_num_partitions); - for (size_t i = 0; i < _num_partitions; i++) { - uint32_t start = local_state._partition_rows_histogram[i]; - uint32_t size = local_state._partition_rows_histogram[i + 1] - start; - if (size > 0) { - _enqueue_data_and_set_ready(i % _num_sources, local_state, - {new_block_wrapper, {row_idx, start, size}}); - } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); - } - } } else { DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index bf052ac3b924ca..4912ab3369815b 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -220,9 +220,7 @@ class ShuffleExchanger : public Exchanger { ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_sources, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, LocalExchangeSinkLocalState& local_state); }; @@ -232,7 +230,10 @@ class BucketShuffleExchanger final : public ShuffleExchanger { BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - free_block_limit) {} + free_block_limit) { + DCHECK_GT(num_partitions, 0); + _data_queue.resize(std::max(num_partitions, num_sources)); + } ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d14a0d0c3cd4a7..93aadec9976423 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -969,9 +969,9 @@ Status PipelineFragmentContext::_plan_local_exchange( // if 'num_buckets == 0' means the fragment is colocated by exchange node not the // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0 // still keep colocate plan after local shuffle - RETURN_IF_ERROR(_plan_local_exchange( - _use_serial_source || num_buckets == 0 ? _num_instances : num_buckets, pip_idx, - _pipelines[pip_idx], bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); + RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx], + bucket_seq_to_instance_idx, + shuffle_idx_to_instance_idx)); } return Status::OK(); }