diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d6ac9186c70734..369e249c802f47 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1016,6 +1016,9 @@ DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80"); DEFINE_mBool(enable_read_cache_file_directly, "false"); DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false"); DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not checking +// If true, evict the ttl cache using LRU when full. +// Otherwise, only expiration can evict ttl and new data won't add to cache when full. +DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true"); DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800"); DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f93abcfd260608..dd44f56fd300a9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1064,6 +1064,9 @@ DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent); DECLARE_mBool(enable_read_cache_file_directly); DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size); DECLARE_mInt64(file_cache_ttl_valid_check_interval_second); +// If true, evict the ttl cache using LRU when full. +// Otherwise, only expiration can evict ttl and new data won't add to cache when full. +DECLARE_Bool(enable_ttl_cache_evict_using_lru); // inverted index searcher cache // cache entry stay time after lookup diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 64e443b6a999a3..6c320a21bb3630 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -56,6 +56,10 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0); _cur_normal_queue_element_count_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0); + _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0); + _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared>( + _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0); _cur_normal_queue_cache_size_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0); _cur_index_queue_element_count_metrics = std::make_shared>( @@ -84,6 +88,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, 7 * 24 * 60 * 60); _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements, 24 * 60 * 60); + _ttl_queue = LRUQueue(std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max()); LOG(INFO) << fmt::format( "file cache path={}, disposable queue size={} elements={}, index queue size={} " @@ -213,7 +219,8 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, boo result->push_back(cell.file_block); } - if (cell.file_block->cache_type() != FileCacheType::TTL) { + if (cell.file_block->cache_type() != FileCacheType::TTL || + config::enable_ttl_cache_evict_using_lru) { auto& queue = get_queue(cell.file_block->cache_type()); DCHECK(cell.queue_iterator) << "impossible"; /// Move to the end of the queue. The iterator remains valid. @@ -280,7 +287,14 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); - cell.queue_iterator.reset(); + if (config::enable_ttl_cache_evict_using_lru) { + auto& ttl_queue = get_queue(FileCacheType::TTL); + cell.queue_iterator = ttl_queue.add( + cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); + } else { + cell.queue_iterator.reset(); + } st = cell.file_block->update_expiration_time(context.expiration_time); } if (!st.ok()) { @@ -311,6 +325,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (context.expiration_time == 0) { for (auto& [_, cell] : file_blocks) { if (cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL)) { + if (config::enable_ttl_cache_evict_using_lru) { + auto& ttl_queue = get_queue(FileCacheType::TTL); + ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + } auto& queue = get_queue(FileCacheType::NORMAL); cell.queue_iterator = queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), @@ -669,10 +687,11 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha key.meta.type = context.cache_type; key.meta.expiration_time = context.expiration_time; FileBlockCell cell(std::make_shared(key, size, this, state), cache_lock); - if (context.cache_type != FileCacheType::TTL) { + if (context.cache_type != FileCacheType::TTL || config::enable_ttl_cache_evict_using_lru) { auto& queue = get_queue(context.cache_type); cell.queue_iterator = queue.add(hash, offset, size, cache_lock); - } else { + } + if (context.cache_type == FileCacheType::TTL) { if (_key_to_time.find(hash) == _key_to_time.end()) { _key_to_time[hash] = context.expiration_time; _time_to_key.insert(std::make_pair(context.expiration_time, hash)); @@ -711,6 +730,8 @@ BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) { return _disposable_queue; case FileCacheType::NORMAL: return _normal_queue; + case FileCacheType::TTL: + return _ttl_queue; default: DCHECK(false); } @@ -725,19 +746,16 @@ const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) co return _disposable_queue; case FileCacheType::NORMAL: return _normal_queue; + case FileCacheType::TTL: + return _ttl_queue; default: DCHECK(false); } return _normal_queue; } -bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard& cache_lock) { - size_t removed_size = 0; - size_t cur_cache_size = _cur_cache_size; - auto limit = config::max_ttl_cache_ratio * _capacity; - if ((_cur_ttl_size + size) * 100 > limit) { - return false; - } +void BlockFileCache::remove_file_blocks(std::vector& to_evict, + std::lock_guard& cache_lock) { auto remove_file_block_if = [&](FileBlockCell* cell) { FileBlockSPtr file_block = cell->file_block; if (file_block) { @@ -745,33 +763,53 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard to_evict; - auto collect_eliminate_fragments = [&](LRUQueue& queue) { - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; + std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); +} + +void BlockFileCache::remove_file_blocks_and_clean_time_maps( + std::vector& to_evict, std::lock_guard& cache_lock) { + auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) { + FileBlockSPtr file_block = cell->file_block; + if (file_block) { + std::lock_guard block_lock(file_block->_mutex); + auto hash = cell->file_block->get_hash_value(); + remove(file_block, cache_lock, block_lock); + if (_files.find(hash) == _files.end()) { + if (auto iter = _key_to_time.find(hash); + _key_to_time.find(hash) != _key_to_time.end()) { + auto _time_to_key_iter = _time_to_key.equal_range(iter->second); + while (_time_to_key_iter.first != _time_to_key_iter.second) { + if (_time_to_key_iter.first->second == hash) { + _time_to_key_iter.first = _time_to_key.erase(_time_to_key_iter.first); + break; + } + _time_to_key_iter.first++; + } + _key_to_time.erase(hash); + } } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); + } + }; + std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_and_clean_time_maps_if); +} - DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() - << ", offset: " << entry_offset; +void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, + size_t& removed_size, + std::vector& to_evict, + std::lock_guard& cache_lock) { + for (const auto& [entry_key, entry_offset, entry_size] : queue) { + if (!is_overflow(removed_size, size, cur_cache_size)) { + break; + } + auto* cell = get_cell(entry_key, entry_offset, cache_lock); - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); + DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string() + << ", offset: " << entry_offset; - /// It is guaranteed that cell is not removed from cache as long as - /// pointer to corresponding file block is hold by any other thread. - if (!cell->releasable()) { - continue; - } + size_t cell_size = cell->size(); + DCHECK(entry_size == cell_size); + if (cell->releasable()) { auto& file_block = cell->file_block; std::lock_guard block_lock(file_block->_mutex); @@ -779,6 +817,28 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard& cache_lock) { + size_t removed_size = 0; + size_t cur_cache_size = _cur_cache_size; + auto limit = config::max_ttl_cache_ratio * _capacity; + if ((_cur_ttl_size + size) * 100 > limit) { + return false; + } + + size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); + size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); + size_t index_queue_size = _index_queue.get_capacity(cache_lock); + if (is_overflow(removed_size, size, cur_cache_size) && normal_queue_size == 0 && + disposable_queue_size == 0 && index_queue_size == 0) { + return false; + } + std::vector to_evict; + auto collect_eliminate_fragments = [&](LRUQueue& queue) { + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); }; if (disposable_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE)); @@ -789,13 +849,31 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard& cache_lock) { + if (try_reserve_for_ttl_without_lru(size, cache_lock)) { + return true; + } else if (config::enable_ttl_cache_evict_using_lru) { + auto& queue = get_queue(FileCacheType::TTL); + size_t removed_size = 0; + size_t cur_cache_size = _cur_cache_size; + + std::vector to_evict; + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); + remove_file_blocks_and_clean_time_maps(to_evict, cache_lock); + + return !is_overflow(removed_size, size, cur_cache_size); + } else { + return false; + } +} + // 1. if async load file cache not finish // a. evict from lru queue // 2. if ttl cache @@ -909,6 +987,7 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, bool remove_directly, std::lock_guard& cache_lock) { + auto& ttl_queue = get_queue(FileCacheType::TTL); if (auto iter = _key_to_time.find(file_key); _key_to_time.find(file_key) != _key_to_time.end()) { if (!remove_directly) { @@ -924,6 +1003,9 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const UInt128Wrapper& file_key, b if (cell.file_block->cache_type() == FileCacheType::TTL) { auto st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL); if (st.ok()) { + if (config::enable_ttl_cache_evict_using_lru) { + ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + } auto& queue = get_queue(FileCacheType::NORMAL); cell.queue_iterator = queue.add( cell.file_block->get_hash_value(), cell.file_block->offset(), @@ -977,13 +1059,7 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { } } } - auto remove_file_block_if = [&](FileBlockCell* cell) { - if (FileBlockSPtr file_block = cell->file_block; file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - std::for_each(to_remove.begin(), to_remove.end(), remove_file_block_if); + remove_file_blocks(to_remove, cache_lock); } } @@ -1049,15 +1125,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval( } } } - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1076,37 +1144,9 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size( std::vector to_evict; for (FileCacheType cache_type : other_cache_types) { auto& queue = get_queue(cache_type); - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - DCHECK(cell) << "Cache became inconsistent. Key: " << entry_key.to_string() - << ", offset: " << entry_offset; - - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); - - if (cell->releasable()) { - auto& file_block = cell->file_block; - - std::lock_guard segment_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - removed_size += cell_size; - } - } + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); } - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard segment_lock(file_block->_mutex); - remove(file_block, cache_lock, segment_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); - + remove_file_blocks(to_evict, cache_lock); return !is_overflow(removed_size, size, cur_cache_size); } @@ -1146,38 +1186,8 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash, size_t cur_cache_size = _cur_cache_size; std::vector to_evict; - for (const auto& [entry_key, entry_offset, entry_size] : queue) { - if (!is_overflow(removed_size, size, cur_cache_size)) { - break; - } - auto* cell = get_cell(entry_key, entry_offset, cache_lock); - - DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string() - << ", offset: " << entry_offset; - - size_t cell_size = cell->size(); - DCHECK(entry_size == cell_size); - - if (cell->releasable()) { - auto& file_block = cell->file_block; - - std::lock_guard block_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - - removed_size += cell_size; - } - } - - auto remove_file_block_if = [&](FileBlockCell* cell) { - FileBlockSPtr file_block = cell->file_block; - if (file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; - - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock); + remove_file_blocks(to_evict, cache_lock); if (is_overflow(removed_size, size, cur_cache_size)) { return false; @@ -1463,6 +1473,9 @@ void BlockFileCache::run_background_operation() { _index_queue.get_capacity(cache_lock) - _normal_queue.get_capacity(cache_lock) - _disposable_queue.get_capacity(cache_lock)); + _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(_ttl_queue.get_capacity(cache_lock)); + _cur_ttl_cache_lru_queue_element_count_metrics->set_value( + _ttl_queue.get_elements_num(cache_lock)); _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock)); _cur_normal_queue_element_count_metrics->set_value( _normal_queue.get_elements_num(cache_lock)); @@ -1519,7 +1532,14 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); - cell.queue_iterator.reset(); + if (config::enable_ttl_cache_evict_using_lru) { + auto& ttl_queue = get_queue(FileCacheType::TTL); + cell.queue_iterator = + ttl_queue.add(hash, cell.file_block->offset(), + cell.file_block->range().size(), cache_lock); + } else { + cell.queue_iterator.reset(); + } } if (!st.ok()) { LOG_WARNING("").error(st); @@ -1558,12 +1578,6 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { bool BlockFileCache::try_reserve_for_lazy_load(size_t size, std::lock_guard& cache_lock) { size_t removed_size = 0; - auto remove_file_block_if = [&](FileBlockCell* cell) { - if (FileBlockSPtr file_block = cell->file_block; file_block) { - std::lock_guard block_lock(file_block->_mutex); - remove(file_block, cache_lock, block_lock); - } - }; size_t normal_queue_size = _normal_queue.get_capacity(cache_lock); size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock); size_t index_queue_size = _index_queue.get_capacity(cache_lock); @@ -1582,18 +1596,14 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size, size_t cell_size = cell->size(); DCHECK(entry_size == cell_size); - /// It is guaranteed that cell is not removed from cache as long as - /// pointer to corresponding file block is hold by any other thread. + if (cell->releasable()) { + auto& file_block = cell->file_block; - if (!cell->releasable()) { - continue; + std::lock_guard block_lock(file_block->_mutex); + DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); + to_evict.push_back(cell); + removed_size += cell_size; } - auto& file_block = cell->file_block; - - std::lock_guard block_lock(file_block->_mutex); - DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED); - to_evict.push_back(cell); - removed_size += cell_size; } }; if (disposable_queue_size != 0) { @@ -1605,7 +1615,7 @@ bool BlockFileCache::try_reserve_for_lazy_load(size_t size, if (index_queue_size != 0) { collect_eliminate_fragments(get_queue(FileCacheType::INDEX)); } - std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if); + remove_file_blocks(to_evict, cache_lock); return !_disk_resource_limit_mode || removed_size >= size; } @@ -1636,6 +1646,7 @@ std::string BlockFileCache::clear_file_cache_directly() { int64_t index_queue_size = _index_queue.get_elements_num(cache_lock); int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock); int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock); + int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock); _files.clear(); _cur_cache_size = 0; _cur_ttl_size = 0; @@ -1644,12 +1655,16 @@ std::string BlockFileCache::clear_file_cache_directly() { _index_queue.clear(cache_lock); _normal_queue.clear(cache_lock); _disposable_queue.clear(cache_lock); + _ttl_queue.clear(cache_lock); ss << "finish clear_file_cache_directly" << " path=" << _cache_base_path << " time_elapsed=" << duration_cast(steady_clock::now() - start).count() << " num_files=" << num_files << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size << " normal_queue_size=" << normal_queue_size << " disposible_queue_size=" << disposible_queue_size; + if (config::enable_ttl_cache_evict_using_lru) { + ss << "ttl_queue_size=" << ttl_queue_size; + } auto msg = ss.str(); LOG(INFO) << msg; return msg; diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 86ce1dc1196c39..2db9b68438b0ef 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -341,6 +341,8 @@ class BlockFileCache { bool try_reserve_for_ttl(size_t size, std::lock_guard& cache_lock); + bool try_reserve_for_ttl_without_lru(size_t size, std::lock_guard& cache_lock); + FileBlocks split_range_into_cells(const UInt128Wrapper& hash, const CacheContext& context, size_t offset, size_t size, FileBlock::State state, std::lock_guard& cache_lock); @@ -382,6 +384,14 @@ class BlockFileCache { bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const; + void remove_file_blocks(std::vector&, std::lock_guard&); + + void remove_file_blocks_and_clean_time_maps(std::vector&, + std::lock_guard&); + + void find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size, + size_t& removed_size, std::vector& to_evict, + std::lock_guard& cache_lock); // info std::string _cache_base_path; size_t _capacity = 0; @@ -418,6 +428,7 @@ class BlockFileCache { LRUQueue _index_queue; LRUQueue _normal_queue; LRUQueue _disposable_queue; + LRUQueue _ttl_queue; // metrics size_t _num_read_blocks = 0; @@ -425,6 +436,8 @@ class BlockFileCache { size_t _num_removed_blocks = 0; std::shared_ptr> _cur_cache_size_metrics; std::shared_ptr> _cur_ttl_cache_size_metrics; + std::shared_ptr> _cur_ttl_cache_lru_queue_cache_size_metrics; + std::shared_ptr> _cur_ttl_cache_lru_queue_element_count_metrics; std::shared_ptr> _cur_normal_queue_element_count_metrics; std::shared_ptr> _cur_normal_queue_cache_size_metrics; std::shared_ptr> _cur_index_queue_element_count_metrics; diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index 9feddf089d0b46..d4b4157388aa26 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -43,6 +43,8 @@ namespace doris::io { bvar::Adder s3_read_counter("cached_remote_reader_s3_read"); +bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); +bvar::Adder g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts) @@ -324,6 +326,9 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, statis->num_skip_cache_io_total += read_stats.skip_cache; statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache; statis->write_cache_io_timer += read_stats.local_write_timer; + + g_skip_cache_num << read_stats.skip_cache; + g_skip_cache_sum << read_stats.skip_cache; } } // namespace doris::io diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 149241c4dbd911..d6e2cf6053e17b 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -121,6 +121,7 @@ class BlockFileCacheTest : public testing::Test { public: static void SetUpTestSuite() { config::file_cache_enter_disk_resource_limit_mode_percent = 99; + config::enable_ttl_cache_evict_using_lru = false; bool exists {false}; ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok()); if (!exists) { @@ -4096,4 +4097,134 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) { } } +TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) { + config::file_cache_ttl_valid_check_interval_second = 4; + config::enable_ttl_cache_evict_using_lru = false; + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 0; + settings.disposable_queue_elements = 0; + settings.capacity = 60; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::CacheContext context; + context.query_id = query_id; + auto key = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = UnixSeconds() + 3600; + + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100 - 5); offset += 5) { + auto holder = cache.get_or_set(key, offset, 5, context); + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::DOWNLOADED); + } + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = UnixSeconds() + 3600; + for (int64_t offset = 60; offset < 70; offset += 5) { + auto holder = cache.get_or_set(key, offset, 5, context); + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::SKIP_CACHE); + } + + EXPECT_EQ(cache._cur_cache_size, 50); + EXPECT_EQ(cache._ttl_queue.cache_size, 0); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) { + config::file_cache_ttl_valid_check_interval_second = 4; + config::enable_ttl_cache_evict_using_lru = true; + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 0; + settings.disposable_queue_elements = 0; + settings.capacity = 60; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + io::CacheContext context; + context.query_id = query_id; + auto key = io::BlockFileCache::hash("key1"); + io::BlockFileCache cache(cache_base_path, settings); + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = UnixSeconds() + 3600; + + ASSERT_TRUE(cache.initialize()); + for (int i = 0; i < 100; i++) { + if (cache.get_lazy_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100); offset += 5) { + auto holder = cache.get_or_set(key, offset, 5, context); + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::DOWNLOADED); + } + context.cache_type = io::FileCacheType::TTL; + context.expiration_time = UnixSeconds() + 3600; + for (int64_t offset = 60; offset < 70; offset += 5) { + auto holder = cache.get_or_set(key, offset, 5, context); + auto segments = fromHolder(holder); + ASSERT_EQ(segments.size(), 1); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(segments[0]); + assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4), + io::FileBlock::State::DOWNLOADED); + } + + EXPECT_EQ(cache._cur_cache_size, 60); + EXPECT_EQ(cache._ttl_queue.cache_size, 60); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + } // namespace doris::io diff --git a/regression-test/suites/cloud_p0/cache/ttl/test_ttl_lru_evict.groovy b/regression-test/suites/cloud_p0/cache/ttl/test_ttl_lru_evict.groovy new file mode 100644 index 00000000000000..5eee1b06bb7d1d --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/ttl/test_ttl_lru_evict.groovy @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.apache.http.client.RedirectStrategy; +import org.apache.http.impl.client.LaxRedirectStrategy; + + +// Note: in order to trigger TTL full (thus LRU eviction), +// we need smaller file cache for TTL (<20G). To achive this goal: +// - set smaller total_size in be conf and/or +// - set smaller max_ttl_cache_ratio in this test + +suite("test_ttl_lru_evict") { + // sql """ use @regression_cluster_name1 """ + sql """ use @compute_cluster """ + def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="180") """ + String[][] backends = sql """ show backends """ + String backendId; + def backendIdToBackendIP = [:] + def backendIdToBackendHttpPort = [:] + def backendIdToBackendBrpcPort = [:] + for (String[] backend in backends) { + // if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) { + if (backend[9].equals("true") && backend[19].contains("compute_cluster")) { + backendIdToBackendIP.put(backend[0], backend[1]) + backendIdToBackendHttpPort.put(backend[0], backend[4]) + backendIdToBackendBrpcPort.put(backend[0], backend[5]) + } + } + assertEquals(backendIdToBackendIP.size(), 1) + + backendId = backendIdToBackendIP.keySet()[0] + def url = backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendHttpPort.get(backendId) + """/api/clear_file_cache""" + logger.info(url) + def clearFileCache = { check_func -> + httpTest { + endpoint "" + uri url + op "post" + body "{\"sync\"=\"true\"}" + check check_func + } + } + + def curl = { String method, String uri /* param */-> + if (method != "GET" && method != "POST") + { + throw new Exception(String.format("invalid curl method: %s", method)) + } + if (uri.isBlank()) + { + throw new Exception("invalid curl url, blank") + } + + Integer timeout = 10; // 10 seconds; + Integer maxRetries = 10; // Maximum number of retries + Integer retryCount = 0; // Current retry count + Integer sleepTime = 5000; // Sleep time in milliseconds + + String cmd = String.format("curl --max-time %d -X %s %s", timeout, method, uri).toString() + logger.info("curl cmd: " + cmd) + def process + int code + String err + String out + + while (retryCount < maxRetries) { + process = cmd.execute() + code = process.waitFor() + err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) + out = process.getText() + + // If the command was successful, break the loop + if (code == 0) { + break + } + + // If the command was not successful, increment the retry count, sleep for a while and try again + retryCount++ + sleep(sleepTime) + } + + // If the command was not successful after maxRetries attempts, log the failure and return the result + if (code != 0) { + logger.error("Command curl failed after " + maxRetries + " attempts. code: " + code + ", err: " + err) + } + + return [code, out, err] + } + + def show_be_config = { String ip, String port /*param */ -> + return curl("GET", String.format("http://%s:%s/api/show_config", ip, port)) + } + + def get_be_param = { paramName -> + // assuming paramName on all BEs have save value + def (code, out, err) = show_be_config(backendIdToBackendIP.get(backendId), backendIdToBackendHttpPort.get(backendId)) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == paramName) { + return ((List) ele)[2] + } + } + } + + def set_be_param = { paramName, paramValue -> + // for eache BE node, set paramName=paramValue + for (String id in backendIdToBackendIP.keySet()) { + def beIp = backendIdToBackendIP.get(id) + def bePort = backendIdToBackendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + long org_max_ttl_cache_ratio = Long.parseLong(get_be_param.call("max_ttl_cache_ratio")) + + try { + set_be_param.call("max_ttl_cache_ratio", "2") + + def tables = [customer_ttl: 15000000] + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + + sql new File("""${context.file.parent}/../ddl/customer_ttl_delete.sql""").text + def load_customer_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + sql (new File("""${context.file.parent}/../ddl/${table}.sql""").text + ttlProperties) + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + def getMetricsMethod = { check_func -> + httpTest { + endpoint backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendBrpcPort.get(backendId) + uri "/brpc_metrics" + op "get" + check check_func + } + } + + clearFileCache.call() { + respCode, body -> {} + } + + long ttl_cache_evict_size_begin = 0; + getMetricsMethod.call() { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag1 = false; + for (String line in strs) { + if (flag1) break; + if (line.contains("ttl_cache_evict_size")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + ttl_cache_evict_size_begin = line.substring(i).toLong() + flag1 = true + } + } + assertTrue(flag1) + } + + // load for MANY times to this dup table to make cache full enough to evict + // It will takes huge amount of time, so it should be p1/p2 level case. + // But someone told me it is the right place. Never mind just do it! + for (int i = 0; i < 10; i++) { + load_customer_once("customer_ttl") + } + sleep(10000) // 10s + // first, we test whether ttl evict actively + long ttl_cache_evict_size_end = 0; + getMetricsMethod.call() { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag1 = false; + for (String line in strs) { + if (flag1) break; + if (line.contains("ttl_cache_evict_size")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + ttl_cache_evict_size_end = line.substring(i).toLong() + flag1 = true + } + } + assertTrue(flag1) + } + // Note: this only applies when the case is run + // sequentially, coz we don't know what other cases are + // doing with TTL cache + logger.info("ttl evict diff:" + (ttl_cache_evict_size_end - ttl_cache_evict_size_begin).toString()) + assertTrue((ttl_cache_evict_size_end - ttl_cache_evict_size_begin) > 1073741824) + + // then we test skip_cache count when doing query when ttl cache is full + // we expect it to be rare + long skip_cache_count_begin = 0 + getMetricsMethod.call() { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag1 = false; + for (String line in strs) { + if (flag1) break; + if (line.contains("cached_remote_reader_skip_cache_sum")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + skip_cache_count_begin = line.substring(i).toLong() + flag1 = true + } + } + assertTrue(flag1) + } + + // another wave of load to flush the current TTL away + for (int i = 0; i < 10; i++) { + load_customer_once("customer_ttl") + } + // then we do query and hopefully, we should not run into too many SKIP_CACHE + sql """ select count(*) from customer_ttl where C_ADDRESS like '%ea%' and C_NAME like '%a%' and C_COMMENT like '%b%' """ + + long skip_cache_count_end = 0 + getMetricsMethod.call() { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag1 = false; + for (String line in strs) { + if (flag1) break; + if (line.contains("cached_remote_reader_skip_cache_sum")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + skip_cache_count_end = line.substring(i).toLong() + flag1 = true + } + } + assertTrue(flag1) + } + // Note: this only applies when the case is run + // sequentially, coz we don't know what other cases are + // doing with TTL cache + logger.info("skip cache diff:" + (skip_cache_count_end - skip_cache_count_begin).toString()) + assertTrue((skip_cache_count_end - skip_cache_count_begin) < 1000000) + + // finally, we test whether LRU queue clean itself up when all ttl + // cache evict after expiration + sleep(200000) + getMetricsMethod.call() { + respCode, body -> + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def strs = out.split('\n') + Boolean flag1 = false; + for (String line in strs) { + if (flag1) break; + if (line.contains("ttl_cache_lru_queue_size")) { + if (line.startsWith("#")) { + continue + } + def i = line.indexOf(' ') + // all ttl will expire, so the ttl LRU queue set to 0 + // Note: this only applies when the case is run + // sequentially, coz we don't know what other cases are + // doing with TTL cache + assertEquals(line.substring(i).toLong(), 0) + flag1 = true + } + } + assertTrue(flag1) + } + } finally { + set_be_param.call("max_ttl_cache_ratio", org_max_ttl_cache_ratio.toString()) + } +}