Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ DEFINE_mBool(enable_segment_rows_consistency_check, "false");
DEFINE_mBool(enable_segment_rows_check_core, "false");
// ATTENTION: For test only. In test environment, there are no historical data,
// so all rowset meta should have segment rows info.
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta,"false");
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
DEFINE_String(row_cache_mem_limit, "20%");

// Cache for storage page size
Expand Down Expand Up @@ -1458,6 +1458,21 @@ DEFINE_mInt64(string_overflow_size, "4294967295"); // std::numic_limits<uint32_t
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
// The max thread num for BufferedReaderPrefetchThreadPool
DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");

DEFINE_mBool(enable_segment_prefetch_verbose_log, "false");
// The thread num for SegmentPrefetchThreadPool
DEFINE_Int64(segment_prefetch_thread_pool_thread_num_min, "32");
DEFINE_Int64(segment_prefetch_thread_pool_thread_num_max, "2000");

DEFINE_mInt32(segment_file_cache_consume_rowids_batch_size, "8000");
// Enable segment file cache block prefetch for query
DEFINE_mBool(enable_query_segment_file_cache_prefetch, "false");
// Number of blocks to prefetch ahead in segment iterator for query
DEFINE_mInt32(query_segment_file_cache_prefetch_block_size, "2");
// Enable segment file cache block prefetch for compaction
DEFINE_mBool(enable_compaction_segment_file_cache_prefetch, "false");
// Number of blocks to prefetch ahead in segment iterator for compaction
DEFINE_mInt32(compaction_segment_file_cache_prefetch_block_size, "2");
// The min thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
Expand Down Expand Up @@ -1580,6 +1595,12 @@ DEFINE_mBool(enable_wal_tde, "false");
DEFINE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction, "true");
DEFINE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction, "true");

// Concurrency stats dump configuration
DEFINE_mBool(enable_concurrency_stats_dump, "false");
DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
DEFINE_Validator(concurrency_stats_dump_interval_ms,
[](const int32_t config) -> bool { return config >= 10; });

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
19 changes: 19 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,21 @@ DECLARE_mInt64(string_overflow_size);
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
// The max thread num for BufferedReaderPrefetchThreadPool
DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);

DECLARE_mBool(enable_segment_prefetch_verbose_log);
// The thread num for SegmentPrefetchThreadPool
DECLARE_Int64(segment_prefetch_thread_pool_thread_num_min);
DECLARE_Int64(segment_prefetch_thread_pool_thread_num_max);

DECLARE_mInt32(segment_file_cache_consume_rowids_batch_size);
// Enable segment file cache block prefetch for query
DECLARE_mBool(enable_query_segment_file_cache_prefetch);
// Number of blocks to prefetch ahead in segment iterator for query
DECLARE_mInt32(query_segment_file_cache_prefetch_block_size);
// Enable segment file cache block prefetch for compaction
DECLARE_mBool(enable_compaction_segment_file_cache_prefetch);
// Number of blocks to prefetch ahead in segment iterator for compaction
DECLARE_mInt32(compaction_segment_file_cache_prefetch_block_size);
// The min thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
Expand Down Expand Up @@ -1641,6 +1656,10 @@ DECLARE_mBool(enable_wal_tde);
DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction);
DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction);

// Concurrency stats dump configuration
DECLARE_mBool(enable_concurrency_stats_dump);
DECLARE_mInt32(concurrency_stats_dump_interval_ms);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/concurrency_stats.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
Expand Down Expand Up @@ -736,7 +737,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o
DCHECK(stats != nullptr);
MonotonicStopWatch sw;
sw.start();
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
std::lock_guard cache_lock(_mutex);
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
stats->lock_wait_timer += sw.elapsed_time();
FileBlocks file_blocks;
int64_t duration = 0;
Expand Down
92 changes: 76 additions & 16 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "io/io_common.h"
#include "runtime/exec_env.h"
#include "util/bit_util.h"
#include "util/concurrency_stats.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "vec/exec/scan/scanner_scheduler.h"
Expand Down Expand Up @@ -135,6 +136,8 @@ std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, si

Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);

g_read_at_req_bytes << result.size;
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
Expand Down Expand Up @@ -240,8 +243,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
cache_context.stats = &stats;
MonotonicStopWatch sw;
sw.start();

ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
FileBlocksHolder holder =
_cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();

stats.cache_get_or_set_timer += sw.elapsed_time();
std::vector<FileBlockSPtr> empty_blocks;
for (auto& block : holder.file_blocks) {
Expand Down Expand Up @@ -279,23 +286,28 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
RETURN_IF_ERROR(_remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx));
}
for (auto& block : empty_blocks) {
if (block->state() == FileBlock::State::SKIP_CACHE) {
continue;
}
SCOPED_RAW_TIMER(&stats.local_write_timer);
char* cur_ptr = buffer.get() + block->range().left - empty_start;
size_t block_size = block->range().size();
Status st = block->append(Slice(cur_ptr, block_size));
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
LOG_EVERY_N(WARNING, 100) << "Write data to file cache failed. err=" << st.msg();
} else {
_insert_file_reader(block);
{
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
for (auto& block : empty_blocks) {
if (block->state() == FileBlock::State::SKIP_CACHE) {
continue;
}
SCOPED_RAW_TIMER(&stats.local_write_timer);
char* cur_ptr = buffer.get() + block->range().left - empty_start;
size_t block_size = block->range().size();
Status st = block->append(Slice(cur_ptr, block_size));
if (st.ok()) {
st = block->finalize();
}
if (!st.ok()) {
LOG_EVERY_N(WARNING, 100)
<< "Write data to file cache failed. err=" << st.msg();
} else {
_insert_file_reader(block);
}
stats.bytes_write_into_file_cache += block_size;
}
stats.bytes_write_into_file_cache += block_size;
}
// copy from memory directly
size_t right_offset = offset + bytes_req - 1;
Expand Down Expand Up @@ -333,6 +345,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
static int64_t max_wait_time = 10;
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time);
if (block_state != FileBlock::State::DOWNLOADED) {
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
do {
SCOPED_RAW_TIMER(&stats.remote_read_timer);
TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING");
Expand All @@ -358,6 +372,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
} else {
size_t file_offset = current_offset - left;
SCOPED_RAW_TIMER(&stats.local_read_timer);
SCOPED_CONCURRENCY_COUNT(
ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
st = block->read(Slice(result.data + (current_offset - offset), read_size),
file_offset);
}
Expand Down Expand Up @@ -423,4 +439,48 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
g_skip_cache_sum << read_stats.skip_cache;
}

void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) {
if (offset >= this->size() || size == 0) {
return;
}

size = std::min(size, this->size() - offset);

ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
if (pool == nullptr) {
return;
}

IOContext dryrun_ctx;
if (io_ctx != nullptr) {
dryrun_ctx = *io_ctx;
}
dryrun_ctx.is_dryrun = true;
dryrun_ctx.query_id = nullptr;
dryrun_ctx.file_cache_stats = nullptr;
dryrun_ctx.file_reader_stats = nullptr;

LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
<< fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}",
offset, size, path().filename().native());
std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
auto self = weak_this.lock();
if (self == nullptr) {
return;
}
size_t bytes_read;
Slice dummy_buffer((char*)nullptr, size);
(void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx);
LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
<< fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}",
offset, size, self->path().filename().native());
});

if (!st.ok()) {
VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size
<< " error=" << st.to_string();
}
}

} // namespace doris::io
15 changes: 14 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace doris::io {
struct IOContext;
struct FileCacheStatistics;

class CachedRemoteFileReader final : public FileReader {
class CachedRemoteFileReader final : public FileReader,
public std::enable_shared_from_this<CachedRemoteFileReader> {
public:
CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts);

Expand All @@ -54,6 +55,18 @@ class CachedRemoteFileReader final : public FileReader {

static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);

// Asynchronously prefetch a range of file cache blocks.
// This method triggers read file cache in dryrun mode to warm up the cache
// without actually reading the data into user buffers.
//
// Parameters:
// offset: Starting offset in the file
// size: Number of bytes to prefetch
// io_ctx: IO context (can be nullptr, will create a dryrun context internally)
//
// Note: This is a best-effort operation. Errors are logged but not returned.
void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx = nullptr);

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
#include "util/concurrency_stats.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -124,6 +125,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
return Status::InternalError("init s3 client error");
}

SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read);

int retry_count = 0;
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ struct OlapReaderStatistics {
int64_t segment_iterator_init_return_column_iterators_timer_ns = 0;
int64_t segment_iterator_init_bitmap_index_iterators_timer_ns = 0;
int64_t segment_iterator_init_inverted_index_iterators_timer_ns = 0;
int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0;

int64_t segment_create_column_readers_timer_ns = 0;
int64_t segment_load_index_timer_ns = 0;
Expand Down
45 changes: 45 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "util/binary_cast.hpp"
#include "util/bitmap.h"
#include "util/block_compression.h"
#include "util/concurrency_stats.h"
#include "util/rle_encoding.h" // for RleDecoder
#include "util/slice.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -363,6 +364,7 @@ Status ColumnReader::new_inverted_index_iterator(
Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
BlockCompressionCodec* codec) const {
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().column_reader_read_page);
iter_opts.sanity_check();
PageReadOptions opts(iter_opts.io_ctx);
opts.verify_checksum = _opts.verify_checksum;
Expand Down Expand Up @@ -726,6 +728,16 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat
return Status::OK();
}

Status ColumnReader::get_ordinal_index_reader(OrdinalIndexReader*& reader,
OlapReaderStatistics* index_load_stats) {
CHECK(_ordinal_index) << fmt::format("ordinal index is null for column reader of type {}",
std::to_string(int(_meta_type)));
RETURN_IF_ERROR(
_ordinal_index->load(_use_index_page_cache, _opts.kept_in_memory, index_load_stats));
reader = _ordinal_index.get();
return Status::OK();
}

Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column) {
return new_iterator(iterator, tablet_column, nullptr);
}
Expand Down Expand Up @@ -1328,7 +1340,22 @@ Status FileColumnIterator::seek_to_first() {
return Status::OK();
}

void FileColumnIterator::_trigger_prefetch_if_eligible(ordinal_t ord) {
std::vector<BlockRange> ranges;
if (_prefetcher->need_prefetch(ord, &ranges)) {
for (const auto& range : ranges) {
_cached_remote_file_reader->prefetch_range(range.offset, range.size, &_opts.io_ctx);
}
}
}

Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) {
LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
"[verbose] FileColumnIterator::seek_to_ordinal seek to ordinal {}, enable_prefetch={}",
ord, _enable_prefetch);
if (_enable_prefetch) {
_trigger_prefetch_if_eligible(ord);
}
// if current page contains this row, we don't need to seek
if (!_page || !_page.contains(ord) || !_page_iter.valid()) {
RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts));
Expand Down Expand Up @@ -1611,6 +1638,24 @@ Status FileColumnIterator::get_row_ranges_by_dict(const AndBlockColumnPredicate*
return Status::OK();
}

Status FileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) {
if (_cached_remote_file_reader =
std::dynamic_pointer_cast<io::CachedRemoteFileReader>(_reader->_file_reader);
!_cached_remote_file_reader) {
return Status::OK();
}
_enable_prefetch = true;
_prefetcher = std::make_unique<SegmentPrefetcher>(params.config);
RETURN_IF_ERROR(_prefetcher->init(params.row_bitmap, _reader, params.read_options));
return Status::OK();
}

void FileColumnIterator::collect_prefetchers(std::vector<SegmentPrefetcher*>& prefetchers) {
if (_prefetcher) {
prefetchers.emplace_back(_prefetcher.get());
}
}

Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) {
_opts = opts;
// be consistent with segment v1
Expand Down
Loading
Loading