Skip to content

Commit

Permalink
[opt](scanner) optimize the number of threads of scanners
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Dec 20, 2023
1 parent 9d5b9cc commit 6ca1e07
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 13 deletions.
11 changes: 9 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"

namespace doris {
namespace config {
Expand Down Expand Up @@ -235,8 +236,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, "true");
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> bool {
if (config == -1) {
CpuInfo::init();
doris_scanner_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 4);
}
return true;
});
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
// default thrift client connect timeout(in seconds)
Expand Down
5 changes: 1 addition & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_Int32(doris_scanner_thread_pool_thread_num);
// max number of remote scanner thread pool size
// if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
// number of olap scanner thread pool queue size
DECLARE_Int32(doris_scanner_thread_pool_queue_size);
// default thrift client connect timeout(in seconds)
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ class MergeRangeFileReader : public io::FileReader {
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB
static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB
static constexpr size_t HDFS_MIN_IO_SIZE = 8 * 1024; // 8KB
static constexpr size_t OSS_MIN_IO_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128

MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
config::doris_scanner_thread_pool_queue_size, "local_scan");

// 3. remote scan thread pool
_remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1
? config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
_remote_thread_pool_max_size = config::doris_scanner_thread_pool_thread_num;
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size, config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner {
private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1);

static {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4);
if (numThreads > 32) {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2, 4);
if (numThreads > 48) {
numThreads = Runtime.getRuntime().availableProcessors();
}
avroReadPool = Executors.newFixedThreadPool(numThreads,
Expand Down

0 comments on commit 6ca1e07

Please sign in to comment.