diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a08fdd5a6cc391..45a0aaccb7a351 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -505,6 +505,7 @@ DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true"); DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true"); +DEFINE_mInt64(exchange_sink_queue_capacity_factor, "64"); DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record diff --git a/be/src/common/config.h b/be/src/common/config.h index 81685bc1e5cf7c..5264bac5ead2ee 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -554,6 +554,7 @@ DECLARE_Int32(tablet_writer_open_rpc_timeout_sec); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DECLARE_mBool(tablet_writer_ignore_eovercrowded); DECLARE_mBool(exchange_sink_ignore_eovercrowded); +DECLARE_mInt64(exchange_sink_queue_capacity_factor); DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index ed7f18bfcb7c37..99357a67639a22 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -111,7 +111,8 @@ void ExchangeSinkBuffer::close() { template bool ExchangeSinkBuffer::can_write() const { - size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + size_t max_package_size = + config::exchange_sink_queue_capacity_factor * _instance_to_package_queue.size(); size_t total_package_size = 0; for (auto& [_, q] : _instance_to_package_queue) { total_package_size += q.size(); @@ -168,7 +169,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { std::queue, std::list>>(); _instance_to_broadcast_package_queue[low_id] = std::queue, std::list>>(); - _queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); + _queue_capacity = config::exchange_sink_queue_capacity_factor * _instance_to_package_queue.size(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 0afa59bf731726..8c0375499c3f86 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -270,7 +270,6 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - static constexpr int QUEUE_CAPACITY_FACTOR = 64; std::shared_ptr _queue_dependency; std::shared_ptr _finish_dependency; std::atomic _should_stop {false};