From 0c94d75254ad6f57bd3af649b9a77f299989e506 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 10 Jun 2025 10:59:42 +0800 Subject: [PATCH] [fix](cloud) fix file cache types priority order (#51463) TTL is first-class citizen in Doris FileCache. But the original order make TTL override by INDEX if the context happens to be index of TTL data. We ensure it will be TTL in this PR. Signed-off-by: zhengyu --- be/src/io/cache/file_cache_common.h | 8 +- be/test/io/cache/block_file_cache_test.cpp | 153 +++++++++++++++++++++ 2 files changed, 157 insertions(+), 4 deletions(-) diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 25df07b5ddff20..6e9396fb11acf8 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -128,13 +128,13 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach struct CacheContext { CacheContext(const IOContext* io_context) { - if (io_context->is_index_data) { + if (io_context->expiration_time != 0) { + cache_type = FileCacheType::TTL; + expiration_time = io_context->expiration_time; + } else if (io_context->is_index_data) { cache_type = FileCacheType::INDEX; } else if (io_context->is_disposable) { cache_type = FileCacheType::DISPOSABLE; - } else if (io_context->expiration_time != 0) { - cache_type = FileCacheType::TTL; - expiration_time = io_context->expiration_time; } else { cache_type = FileCacheType::NORMAL; } diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index d0546787026208..0d6883e4c8b4d6 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -7752,4 +7752,157 @@ TEST_F(BlockFileCacheTest, test_upgrade_cache_dir_version) { } } +TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.query_id = query_id; + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(cache_base_path); + + for (int i = 0; i < 100; i++) { + if (cache->get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + CachedRemoteFileReader reader(local_reader, opts); + auto key = io::BlockFileCache::hash("tmp_file"); + EXPECT_EQ(reader._cache_hash, key); + EXPECT_EQ(local_reader->path().native(), reader.path().native()); + EXPECT_EQ(local_reader->size(), reader.size()); + EXPECT_FALSE(reader.closed()); + EXPECT_EQ(local_reader->path().native(), reader.get_remote_reader()->path().native()); + { + std::string buffer; + buffer.resize(64_kb); + IOContext io_ctx; + FileCacheStatistics stats; + io_ctx.file_cache_stats = &stats; + io_ctx.is_index_data = true; + int64_t cur_time = UnixSeconds(); + io_ctx.expiration_time = cur_time + 120; + size_t bytes_read {0}; + EXPECT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + } + std::this_thread::sleep_for(std::chrono::seconds(3)); + LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size; + LOG(INFO) << "index:" << cache->_index_queue.cache_size; + LOG(INFO) << "normal:" << cache->_normal_queue.cache_size; + LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size; + EXPECT_EQ(cache->_ttl_queue.cache_size, 1048576); + EXPECT_EQ(cache->_index_queue.cache_size, 0); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + +TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) { + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + settings.query_queue_size = 6291456; + settings.query_queue_elements = 6; + settings.index_queue_size = 1048576; + settings.index_queue_elements = 1; + settings.disposable_queue_size = 1048576; + settings.disposable_queue_elements = 1; + settings.capacity = 8388608; + settings.max_file_block_size = 1048576; + settings.max_query_cache_size = 0; + io::CacheContext context; + ReadStatistics rstats; + context.stats = &rstats; + context.query_id = query_id; + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); + BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(cache_base_path); + + for (int i = 0; i < 100; i++) { + if (cache->get_async_open_success()) { + break; + }; + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + FileReaderSPtr local_reader; + ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader)); + io::FileReaderOptions opts; + opts.cache_type = io::cache_type_from_string("file_block_cache"); + opts.is_doris_table = true; + CachedRemoteFileReader reader(local_reader, opts); + auto key = io::BlockFileCache::hash("tmp_file"); + EXPECT_EQ(reader._cache_hash, key); + EXPECT_EQ(local_reader->path().native(), reader.path().native()); + EXPECT_EQ(local_reader->size(), reader.size()); + EXPECT_FALSE(reader.closed()); + EXPECT_EQ(local_reader->path().native(), reader.get_remote_reader()->path().native()); + + { + std::string buffer; + buffer.resize(64_kb); + IOContext io_ctx; + FileCacheStatistics stats; + io_ctx.file_cache_stats = &stats; + io_ctx.is_index_data = true; + // int64_t cur_time = UnixSeconds(); + // io_ctx.expiration_time = cur_time + 120; + size_t bytes_read {0}; + EXPECT_TRUE( + reader.read_at(0, Slice(buffer.data(), buffer.size()), &bytes_read, &io_ctx).ok()); + } + std::this_thread::sleep_for(std::chrono::seconds(3)); + LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size; + LOG(INFO) << "index:" << cache->_index_queue.cache_size; + LOG(INFO) << "normal:" << cache->_normal_queue.cache_size; + LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size; + EXPECT_EQ(cache->_ttl_queue.cache_size, 0); + EXPECT_EQ(cache->_index_queue.cache_size, 1048576); + + EXPECT_TRUE(reader.close().ok()); + EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + } // namespace doris::io