Skip to content

Commit

Permalink
[config](exchange) add BE config exchange_sink_queue_capacity_factor
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Mar 7, 2024
1 parent 73d1fdc commit 442e872
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 3 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
DEFINE_mInt64(exchange_sink_queue_capacity_factor, "64");
DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
// You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
DECLARE_mBool(tablet_writer_ignore_eovercrowded);
DECLARE_mBool(exchange_sink_ignore_eovercrowded);
DECLARE_mInt64(exchange_sink_queue_capacity_factor);
DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ void ExchangeSinkBuffer<Parent>::close() {

template <typename Parent>
bool ExchangeSinkBuffer<Parent>::can_write() const {
size_t max_package_size = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size();
size_t max_package_size =
config::exchange_sink_queue_capacity_factor * _instance_to_package_queue.size();
size_t total_package_size = 0;
for (auto& [_, q] : _instance_to_package_queue) {
total_package_size += q.size();
Expand Down Expand Up @@ -168,7 +169,7 @@ void ExchangeSinkBuffer<Parent>::register_sink(TUniqueId fragment_instance_id) {
std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>();
_instance_to_broadcast_package_queue[low_id] =
std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>>();
_queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size();
_queue_capacity = config::exchange_sink_queue_capacity_factor * _instance_to_package_queue.size();
PUniqueId finst_id;
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
int64_t get_sum_rpc_time();

std::atomic<int> _total_queue_size = 0;
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
std::shared_ptr<Dependency> _queue_dependency;
std::shared_ptr<Dependency> _finish_dependency;
std::atomic<bool> _should_stop {false};
Expand Down

0 comments on commit 442e872

Please sign in to comment.