Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](cloud) support TTL file cache evict through LRU #37312

Merged
merged 7 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,9 @@ DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "false");
DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not checking
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,9 @@ DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
DECLARE_mBool(enable_read_cache_file_directly);
DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache when full.
DECLARE_Bool(enable_ttl_cache_evict_using_lru);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
275 changes: 145 additions & 130 deletions be/src/io/cache/block_file_cache.cpp

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ class BlockFileCache {

bool try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex>& cache_lock);

bool try_reserve_for_ttl_without_lru(size_t size, std::lock_guard<std::mutex>& cache_lock);

FileBlocks split_range_into_cells(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset, size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);
Expand Down Expand Up @@ -382,6 +384,14 @@ class BlockFileCache {

bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const;

void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&);

void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);

void find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
size_t& removed_size, std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock);
// info
std::string _cache_base_path;
size_t _capacity = 0;
Expand Down Expand Up @@ -418,13 +428,16 @@ class BlockFileCache {
LRUQueue _index_queue;
LRUQueue _normal_queue;
LRUQueue _disposable_queue;
LRUQueue _ttl_queue;

// metrics
size_t _num_read_blocks = 0;
size_t _num_hit_blocks = 0;
size_t _num_removed_blocks = 0;
std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_lru_queue_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_lru_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_index_queue_element_count_metrics;
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
namespace doris::io {

bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");

CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
const FileReaderOptions& opts)
Expand Down Expand Up @@ -324,6 +326,9 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
statis->num_skip_cache_io_total += read_stats.skip_cache;
statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
statis->write_cache_io_timer += read_stats.local_write_timer;

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}

} // namespace doris::io
131 changes: 131 additions & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class BlockFileCacheTest : public testing::Test {
public:
static void SetUpTestSuite() {
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
config::enable_ttl_cache_evict_using_lru = false;
bool exists {false};
ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok());
if (!exists) {
Expand Down Expand Up @@ -4096,4 +4097,134 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
}
}

TEST_F(BlockFileCacheTest, ttl_reserve_wo_evict_using_lru) {
config::file_cache_ttl_valid_check_interval_second = 4;
config::enable_ttl_cache_evict_using_lru = false;

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
settings.disposable_queue_size = 0;
settings.disposable_queue_elements = 0;
settings.capacity = 60;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
context.query_id = query_id;
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
context.cache_type = io::FileCacheType::TTL;
context.expiration_time = UnixSeconds() + 3600;

ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100 - 5); offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
download(segments[0]);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
}
context.cache_type = io::FileCacheType::TTL;
context.expiration_time = UnixSeconds() + 3600;
for (int64_t offset = 60; offset < 70; offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::SKIP_CACHE);
}

EXPECT_EQ(cache._cur_cache_size, 50);
EXPECT_EQ(cache._ttl_queue.cache_size, 0);
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}

TEST_F(BlockFileCacheTest, ttl_reserve_with_evict_using_lru) {
config::file_cache_ttl_valid_check_interval_second = 4;
config::enable_ttl_cache_evict_using_lru = true;

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
settings.disposable_queue_size = 0;
settings.disposable_queue_elements = 0;
settings.capacity = 60;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 30;
io::CacheContext context;
context.query_id = query_id;
auto key = io::BlockFileCache::hash("key1");
io::BlockFileCache cache(cache_base_path, settings);
context.cache_type = io::FileCacheType::TTL;
context.expiration_time = UnixSeconds() + 3600;

ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
if (cache.get_lazy_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
for (int64_t offset = 0; offset < (60 * config::max_ttl_cache_ratio / 100); offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
download(segments[0]);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
}
context.cache_type = io::FileCacheType::TTL;
context.expiration_time = UnixSeconds() + 3600;
for (int64_t offset = 60; offset < 70; offset += 5) {
auto holder = cache.get_or_set(key, offset, 5, context);
auto segments = fromHolder(holder);
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(segments[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
download(segments[0]);
assert_range(1, segments[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
}

EXPECT_EQ(cache._cur_cache_size, 60);
EXPECT_EQ(cache._ttl_queue.cache_size, 60);
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}

} // namespace doris::io
Loading
Loading