diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index 2d0d25735fe2fd..d43e3acea14daf 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -92,16 +92,13 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, LOG_ERROR("").tag("file cache path", cache_base_path).tag("error", strerror(errno)); return Status::IOError("{} statfs error {}", cache_base_path, strerror(errno)); } - size_t disk_capacity = static_cast( - static_cast(stat.f_blocks) * static_cast(stat.f_bsize) * - (static_cast(config::file_cache_enter_disk_resource_limit_mode_percent) / - 100)); + size_t disk_capacity = static_cast(static_cast(stat.f_blocks) * + static_cast(stat.f_bsize)); if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) { LOG_INFO( - "The cache {} config size {} is larger than {}% disk size {} or zero, recalc " + "The cache {} config size {} is larger than disk size {} or zero, recalc " "it.", - cache_base_path, file_cache_settings.capacity, - config::file_cache_enter_disk_resource_limit_mode_percent, disk_capacity); + cache_base_path, file_cache_settings.capacity, disk_capacity); file_cache_settings = get_file_cache_settings(disk_capacity, file_cache_settings.max_query_cache_size); } @@ -174,16 +171,59 @@ std::vector FileCacheFactory::get_base_paths() { return paths; } +std::string validate_capacity(const std::string& path, int64_t new_capacity, + int64_t& valid_capacity) { + struct statfs stat; + if (statfs(path.c_str(), &stat) < 0) { + auto ret = fmt::format("reset capacity {} statfs error {}. ", path, strerror(errno)); + LOG_ERROR(ret); + valid_capacity = 0; // caller will handle the error + return ret; + } + size_t disk_capacity = static_cast(static_cast(stat.f_blocks) * + static_cast(stat.f_bsize)); + if (new_capacity == 0 || disk_capacity < new_capacity) { + auto ret = fmt::format( + "The cache {} config size {} is larger than disk size {} or zero, recalc " + "it to disk size. ", + path, new_capacity, disk_capacity); + valid_capacity = disk_capacity; + LOG_WARNING(ret); + return ret; + } + valid_capacity = new_capacity; + return ""; +} + std::string FileCacheFactory::reset_capacity(const std::string& path, int64_t new_capacity) { + std::stringstream ss; + size_t total_capacity = 0; if (path.empty()) { - std::stringstream ss; - for (auto& [_, cache] : _path_to_cache) { - ss << cache->reset_capacity(new_capacity); + for (auto& [p, cache] : _path_to_cache) { + int64_t valid_capacity = 0; + ss << validate_capacity(p, new_capacity, valid_capacity); + if (valid_capacity <= 0) { + return ss.str(); + } + ss << cache->reset_capacity(valid_capacity); + total_capacity += cache->capacity(); } + _capacity = total_capacity; return ss.str(); } else { if (auto iter = _path_to_cache.find(path); iter != _path_to_cache.end()) { - return iter->second->reset_capacity(new_capacity); + int64_t valid_capacity = 0; + ss << validate_capacity(path, new_capacity, valid_capacity); + if (valid_capacity <= 0) { + return ss.str(); + } + ss << iter->second->reset_capacity(valid_capacity); + + for (auto& [p, cache] : _path_to_cache) { + total_capacity += cache->capacity(); + } + _capacity = total_capacity; + return ss.str(); } } return "Unknown the cache path " + path; diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 0d6883e4c8b4d6..1408919fe1784a 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -7905,4 +7905,81 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) { FileCacheFactory::instance()->_capacity = 0; } +TEST_F(BlockFileCacheTest, test_reset_capacity) { + std::string cache_path2 = caches_dir / "cache2" / ""; + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + if (fs::exists(cache_path2)) { + fs::remove_all(cache_path2); + } + + io::FileCacheSettings settings; + settings.query_queue_size = 30; + settings.query_queue_elements = 5; + settings.index_queue_size = 30; + settings.index_queue_elements = 5; + settings.disposable_queue_size = 30; + settings.disposable_queue_elements = 5; + settings.capacity = 90; + settings.max_file_block_size = 30; + settings.max_query_cache_size = 30; + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, settings).ok()); + ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_path2, settings).ok()); + EXPECT_EQ(FileCacheFactory::instance()->get_cache_instance_size(), 2); + EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 180); + + // valid path + valid capacity + auto s = FileCacheFactory::instance()->reset_capacity(cache_base_path, 80); + LOG(INFO) << s; + EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 170); + + // empty path + valid capacity + s = FileCacheFactory::instance()->reset_capacity("", 70); + LOG(INFO) << s; + EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 140); + + // invalid path + valid capacity + s = FileCacheFactory::instance()->reset_capacity("/not/exist/haha", 70); + LOG(INFO) << s; + EXPECT_EQ(FileCacheFactory::instance()->get_capacity(), 140); + + // valid path + invalid capacity + s = FileCacheFactory::instance()->reset_capacity(cache_base_path, INT64_MAX); + LOG(INFO) << s; + EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX); + EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70); + + // valid path + zero capacity + s = FileCacheFactory::instance()->reset_capacity(cache_base_path, 0); + LOG(INFO) << s; + EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX); + EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70); + + // empty path + invalid capacity + s = FileCacheFactory::instance()->reset_capacity("", INT64_MAX); + LOG(INFO) << s; + EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX); + EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70); + + // empty path + zero capacity + s = FileCacheFactory::instance()->reset_capacity("", 0); + LOG(INFO) << s; + EXPECT_LT(FileCacheFactory::instance()->get_capacity(), INT64_MAX); + EXPECT_GT(FileCacheFactory::instance()->get_capacity(), 70); + + FileCacheFactory::instance()->clear_file_caches(true); + std::this_thread::sleep_for(std::chrono::seconds(1)); + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + if (fs::exists(cache_path2)) { + fs::remove_all(cache_path2); + } + FileCacheFactory::instance()->_caches.clear(); + FileCacheFactory::instance()->_path_to_cache.clear(); + FileCacheFactory::instance()->_capacity = 0; +} + } // namespace doris::io