diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 1d5b3d86b7bb92..56c201ff52192d 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -142,6 +142,12 @@ option(WITH_MYSQL "Support access MySQL" ON) option(BUILD_FS_BENCHMARK "ON for building fs benchmark tool or OFF for not" OFF) message(STATUS "build fs benchmark tool: ${BUILD_FS_BENCHMARK}") +option(BUILD_TASK_EXECUTOR_SIMULATOR "ON for building task executor simulator or OFF for not" OFF) +message(STATUS "build task executor simulator: ${BUILD_TASK_EXECUTOR_SIMULATOR}") + +option(BUILD_FILE_CACHE_LRU_TOOL "ON for building file cache lru tool or OFF for not" OFF) +message(STATUS "build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}") + set(CMAKE_SKIP_RPATH TRUE) set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_RUNTIME ON) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 34b68cfe0215a4..74905762605e72 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1135,6 +1135,12 @@ DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true"); DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000"); DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000"); DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000"); +DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000"); +// dump queue only if the queue update specific times through several dump intervals +DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000"); +DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000"); +DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000"); +DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false"); DEFINE_Int32(file_cache_downloader_thread_num_min, "32"); DEFINE_Int32(file_cache_downloader_thread_num_max, "32"); diff --git a/be/src/common/config.h b/be/src/common/config.h index cc324c11b6bdc3..f75936fe5c568f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1171,6 +1171,13 @@ DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms); DECLARE_mInt64(file_cache_background_ttl_gc_batch); DECLARE_Int32(file_cache_downloader_thread_num_min); DECLARE_Int32(file_cache_downloader_thread_num_max); +// used to persist lru information before be reboot and load the info back +DECLARE_mInt64(file_cache_background_lru_dump_interval_ms); +// dump queue only if the queue update specific times through several dump intervals +DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold); +DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num); +DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms); +DECLARE_mBool(enable_evaluate_shadow_queue_diff); DECLARE_mBool(enable_reader_dryrun_when_download_file_cache); diff --git a/be/src/http/action/file_cache_action.cpp b/be/src/http/action/file_cache_action.cpp index 740bac46edf2a7..dbd8dcc3b5e1d9 100644 --- a/be/src/http/action/file_cache_action.cpp +++ b/be/src/http/action/file_cache_action.cpp @@ -54,6 +54,7 @@ constexpr static std::string_view CAPACITY = "capacity"; constexpr static std::string_view RELEASE = "release"; constexpr static std::string_view BASE_PATH = "base_path"; constexpr static std::string_view RELEASED_ELEMENTS = "released_elements"; +constexpr static std::string_view DUMP = "dump"; constexpr static std::string_view VALUE = "value"; Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) { @@ -127,6 +128,8 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri *json_metrics = json.ToString(); } } + } else if (operation == DUMP) { + io::FileCacheFactory::instance()->dump_all_caches(); } else { st = Status::InternalError("invalid operation: {}", operation); } diff --git a/be/src/http/action/shrink_mem_action.cpp b/be/src/http/action/shrink_mem_action.cpp index e18e552cbbee0b..98e30efc7f3d68 100644 --- a/be/src/http/action/shrink_mem_action.cpp +++ b/be/src/http/action/shrink_mem_action.cpp @@ -36,6 +36,8 @@ void ShrinkMemAction::handle(HttpRequest* req) { MemoryReclamation::process_minor_gc(); LOG(INFO) << "shrink memory triggered, using Process Minor GC Free Memory"; HttpChannel::send_reply(req, HttpStatus::OK, "shrinking"); + + ExecEnv::GetInstance()->set_is_upgrading(); } } // namespace doris diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 02b34f2f0ea861..df04638ba52310 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -56,3 +56,28 @@ if (${BUILD_FS_BENCHMARK} STREQUAL "ON") ) endif() + +if (${BUILD_FILE_CACHE_LRU_TOOL} STREQUAL "ON") + add_executable(file_cache_lru_tool + cache/file_cache_lru_tool.cpp + ) + + pch_reuse(file_cache_lru_tool) + + # This permits libraries loaded by dlopen to link to the symbols in the program. + set_target_properties(file_cache_lru_tool PROPERTIES ENABLE_EXPORTS 1) + + target_link_libraries(file_cache_lru_tool + ${DORIS_LINK_LIBS} + ) + + install(DIRECTORY DESTINATION ${OUTPUT_DIR}/lib/) + install(TARGETS file_cache_lru_tool DESTINATION ${OUTPUT_DIR}/lib/) + + add_custom_command(TARGET file_cache_lru_tool POST_BUILD + COMMAND ${CMAKE_OBJCOPY} --only-keep-debug $ $.dbg + COMMAND ${CMAKE_STRIP} --strip-debug --strip-unneeded $ + COMMAND ${CMAKE_OBJCOPY} --add-gnu-debuglink=$.dbg $ + ) + +endif() diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 1e16d79dc5c057..af24095a82a7a4 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -20,8 +20,13 @@ #include "io/cache/block_file_cache.h" +#include +#include + #include "common/status.h" #include "cpp/sync_point.h" +#include "gen_cpp/file_cache.pb.h" +#include "runtime/exec_env.h" #if defined(__APPLE__) #include @@ -42,6 +47,7 @@ #include "io/cache/mem_file_cache_storage.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" +#include "util/thread.h" #include "util/time.h" #include "vec/common/sip_hash.h" #include "vec/common/uint128.h" @@ -220,11 +226,14 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us"); _evict_in_advance_latency_us = std::make_shared( _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us"); - + _lru_dump_latency_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_lru_dump_latency_us"); _recycle_keys_length_recorder = std::make_shared( _cache_base_path.c_str(), "file_cache_recycle_keys_length"); _ttl_gc_latency_us = std::make_shared(_cache_base_path.c_str(), "file_cache_ttl_gc_latency_us"); + _shadow_queue_levenshtein_distance = std::make_shared( + _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance"); _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, cache_settings.disposable_queue_elements, 60 * 60); @@ -234,6 +243,9 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, 24 * 60 * 60); _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements, std::numeric_limits::max()); + + _lru_recorder = std::make_unique(this); + _lru_dumper = std::make_unique(this, _lru_recorder.get()); if (cache_settings.storage == "memory") { _storage = std::make_unique(); _cache_base_path = "memory"; @@ -250,32 +262,6 @@ UInt128Wrapper BlockFileCache::hash(const std::string& path) { return UInt128Wrapper(value); } -std::string BlockFileCache::cache_type_to_string(FileCacheType type) { - switch (type) { - case FileCacheType::INDEX: - return "_idx"; - case FileCacheType::DISPOSABLE: - return "_disposable"; - case FileCacheType::NORMAL: - return ""; - case FileCacheType::TTL: - return "_ttl"; - } - return ""; -} - -FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) { - if (str == "idx") { - return FileCacheType::INDEX; - } else if (str == "disposable") { - return FileCacheType::DISPOSABLE; - } else if (str == "ttl") { - return FileCacheType::TTL; - } - DCHECK(false) << "The string is " << str; - return FileCacheType::DISPOSABLE; -} - BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( const TUniqueId& query_id) { SCOPED_CACHE_LOCK(_mutex, this); @@ -348,6 +334,15 @@ Status BlockFileCache::initialize() { Status BlockFileCache::initialize_unlocked(std::lock_guard& cache_lock) { DCHECK(!_is_initialized); _is_initialized = true; + if (config::file_cache_background_lru_dump_tail_record_num > 0) { + // requirements: + // 1. restored data should not overwrite the last dump + // 2. restore should happen before load and async load + // 3. all queues should be restored sequencially to avoid conflict + // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost + // to see if any improvement is a necessary + restore_lru_queues_from_disk(cache_lock); + } RETURN_IF_ERROR(_storage->init(this)); _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this); _cache_background_ttl_gc_thread = std::thread(&BlockFileCache::run_background_ttl_gc, this); @@ -355,6 +350,11 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard& cache_lo _cache_background_evict_in_advance_thread = std::thread(&BlockFileCache::run_background_evict_in_advance, this); + // Initialize LRU dump thread and restore queues + _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this); + _cache_background_lru_log_replay_thread = + std::thread(&BlockFileCache::run_background_lru_log_replay, this); + return Status::OK(); } @@ -368,6 +368,9 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, boo /// Move to the end of the queue. The iterator remains valid. if (cell.queue_iterator && move_iter_flag) { queue.move_to_end(*cell.queue_iterator, cache_lock); + _lru_recorder->record_queue_event(cell.file_block->cache_type(), + CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash, + cell.file_block->_key.offset, cell.size()); } cell.update_atime(); @@ -442,10 +445,16 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); + _lru_recorder->record_queue_event(origin_type, CacheLRULogType::REMOVE, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); auto& ttl_queue = get_queue(FileCacheType::TTL); cell.queue_iterator = ttl_queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), cell.file_block->range().size(), cache_lock); + _lru_recorder->record_queue_event(FileCacheType::TTL, CacheLRULogType::ADD, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } else { LOG_WARNING("Failed to change key meta").error(st); } @@ -482,11 +491,18 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte if (cell.queue_iterator) { auto& ttl_queue = get_queue(FileCacheType::TTL); ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + _lru_recorder->record_queue_event(FileCacheType::TTL, + CacheLRULogType::REMOVE, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } auto& queue = get_queue(FileCacheType::NORMAL); cell.queue_iterator = queue.add(cell.file_block->get_hash_value(), cell.file_block->offset(), cell.file_block->range().size(), cache_lock); + _lru_recorder->record_queue_event(FileCacheType::NORMAL, CacheLRULogType::ADD, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } else { LOG_WARNING("Failed to change key meta").error(st); } @@ -556,6 +572,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte std::string BlockFileCache::clear_file_cache_async() { LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path; + _lru_dumper->remove_lru_dump_files(); int64_t num_cells_all = 0; int64_t num_cells_to_delete = 0; int64_t num_cells_wait_recycle = 0; @@ -597,6 +614,7 @@ std::string BlockFileCache::clear_file_cache_async() { << " num_cells_wait_recycle=" << num_cells_wait_recycle; auto msg = ss.str(); LOG(INFO) << msg; + _lru_dumper->remove_lru_dump_files(); return msg; } @@ -766,10 +784,13 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha } auto& offsets = _files[hash]; - DCHECK_EQ(offsets.count(offset), 0) - << "Cache already exists for hash: " << hash.to_string() << ", offset: " << offset - << ", size: " << size - << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); + auto itr = offsets.find(offset); + if (itr != offsets.end()) { + VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string() + << ", offset: " << offset << ", size: " << size + << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock); + return &(itr->second); + } FileCacheKey key; key.hash = hash; @@ -791,6 +812,9 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha auto& queue = get_queue(cell.file_block->cache_type()); cell.queue_iterator = queue.add(hash, offset, size, cache_lock); + _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD, + cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.size()); if (cell.file_block->cache_type() == FileCacheType::TTL) { if (_key_to_time.find(hash) == _key_to_time.end()) { @@ -828,7 +852,7 @@ size_t BlockFileCache::try_release() { return trash.size(); } -BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) { +LRUQueue& BlockFileCache::get_queue(FileCacheType type) { switch (type) { case FileCacheType::INDEX: return _index_queue; @@ -844,7 +868,7 @@ BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) { return _normal_queue; } -const BlockFileCache::LRUQueue& BlockFileCache::get_queue(FileCacheType type) const { +const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const { switch (type) { case FileCacheType::INDEX: return _index_queue; @@ -971,7 +995,7 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& size_t cur_cache_size = _cur_cache_size; size_t query_context_cache_size = query_context->get_cache_size(cache_lock); - std::vector ghost; + std::vector ghost; std::vector to_evict; size_t max_size = queue.get_max_size(); @@ -1074,11 +1098,19 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, b if (st.ok()) { if (cell.queue_iterator) { ttl_queue.remove(cell.queue_iterator.value(), cache_lock); + _lru_recorder->record_queue_event( + FileCacheType::TTL, CacheLRULogType::REMOVE, + cell.file_block->get_hash_value(), cell.file_block->offset(), + cell.size()); } auto& queue = get_queue(FileCacheType::NORMAL); cell.queue_iterator = queue.add( cell.file_block->get_hash_value(), cell.file_block->offset(), cell.file_block->range().size(), cache_lock); + _lru_recorder->record_queue_event(FileCacheType::NORMAL, + CacheLRULogType::ADD, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } else { LOG_WARNING("Failed to change cache type to normal").error(st); } @@ -1367,6 +1399,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo if (cell->queue_iterator) { auto& queue = get_queue(file_block->cache_type()); queue.remove(*cell->queue_iterator, cache_lock); + _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE, + cell->file_block->get_hash_value(), + cell->file_block->offset(), cell->size()); } *_queue_evict_size_metrics[static_cast(file_block->cache_type())] << file_block->range().size(); @@ -1482,46 +1517,38 @@ BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block, } } -BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::add( - const UInt128Wrapper& hash, size_t offset, size_t size, - std::lock_guard& /* cache_lock */) { +LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size, + std::lock_guard& /* cache_lock */) { cache_size += size; auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size)); map.insert(std::make_pair(std::make_pair(hash, offset), iter)); return iter; } -template - requires IsXLock -void BlockFileCache::LRUQueue::remove(Iterator queue_it, T& /* cache_lock */) { - cache_size -= queue_it->size; - map.erase(std::make_pair(queue_it->hash, queue_it->offset)); - queue.erase(queue_it); -} - -void BlockFileCache::LRUQueue::remove_all(std::lock_guard& /* cache_lock */) { +void LRUQueue::remove_all(std::lock_guard& /* cache_lock */) { queue.clear(); map.clear(); cache_size = 0; } -void BlockFileCache::LRUQueue::move_to_end(Iterator queue_it, - std::lock_guard& /* cache_lock */) { +void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard& /* cache_lock */) { queue.splice(queue.end(), queue, queue_it); } -bool BlockFileCache::LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, - std::lock_guard& /* cache_lock */) const { +bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, + std::lock_guard& /* cache_lock */) const { return map.find(std::make_pair(hash, offset)) != map.end(); } -BlockFileCache::LRUQueue::Iterator BlockFileCache::LRUQueue::get( - const UInt128Wrapper& hash, size_t offset, - std::lock_guard& /* cache_lock */) const { - return map.find(std::make_pair(hash, offset))->second; +LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset, + std::lock_guard& /* cache_lock */) const { + auto itr = map.find(std::make_pair(hash, offset)); + if (itr != map.end()) { + return itr->second; + } + return std::list::iterator(); } -std::string BlockFileCache::LRUQueue::to_string( - std::lock_guard& /* cache_lock */) const { +std::string LRUQueue::to_string(std::lock_guard& /* cache_lock */) const { std::string result; for (const auto& [hash, offset, size] : queue) { if (!result.empty()) { @@ -1532,6 +1559,48 @@ std::string BlockFileCache::LRUQueue::to_string( return result; } +size_t LRUQueue::levenshtein_distance_from(LRUQueue& base, + std::lock_guard& cache_lock) { + std::list target_queue = this->queue; + std::list base_queue = base.queue; + std::vector vec1(target_queue.begin(), target_queue.end()); + std::vector vec2(base_queue.begin(), base_queue.end()); + + size_t m = vec1.size(); + size_t n = vec2.size(); + + // Create a 2D vector (matrix) to store the Levenshtein distances + // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2 + std::vector> dp(m + 1, std::vector(n + 1, 0)); + + // Initialize the first row and column of the matrix + // The distance between an empty list and a list of length k is k (all insertions or deletions) + for (size_t i = 0; i <= m; ++i) { + dp[i][0] = i; + } + for (size_t j = 0; j <= n; ++j) { + dp[0][j] = j; + } + + // Fill the matrix using dynamic programming + for (size_t i = 1; i <= m; ++i) { + for (size_t j = 1; j <= n; ++j) { + // Check if the current elements of both vectors are equal + size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash && + vec1[i - 1].offset == vec2[j - 1].offset) + ? 0 + : 1; + // Calculate the minimum cost of three possible operations: + // 1. Insertion: dp[i][j-1] + 1 + // 2. Deletion: dp[i-1][j] + 1 + // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not) + dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost}); + } + } + // The bottom-right cell of the matrix contains the Levenshtein distance + return dp[m][n]; +} + std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { SCOPED_CACHE_LOCK(_mutex, this); return dump_structure_unlocked(hash, cache_lock); @@ -1583,9 +1652,15 @@ void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset auto& cur_queue = get_queue(cell.file_block->cache_type()); DCHECK(cell.queue_iterator.has_value()); cur_queue.remove(*cell.queue_iterator, cache_lock); + _lru_recorder->record_queue_event( + cell.file_block->cache_type(), CacheLRULogType::REMOVE, + cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size()); auto& new_queue = get_queue(new_type); cell.queue_iterator = new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock); + _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } } } @@ -1781,6 +1856,7 @@ void BlockFileCache::check_need_evict_cache_in_advance() { } void BlockFileCache::run_background_monitor() { + Thread::set_self_name("run_background_monitor"); while (!_close) { int64_t interval_ms = config::file_cache_background_monitor_interval_ms; TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms); @@ -1839,6 +1915,7 @@ void BlockFileCache::run_background_monitor() { } void BlockFileCache::run_background_ttl_gc() { + Thread::set_self_name("run_background_ttl_gc"); while (!_close) { int64_t interval_ms = config::file_cache_background_ttl_gc_interval_ms; int64_t batch_size = config::file_cache_background_ttl_gc_batch; @@ -1870,6 +1947,7 @@ void BlockFileCache::run_background_ttl_gc() { } void BlockFileCache::run_background_gc() { + Thread::set_self_name("run_background_gc"); FileCacheKey key; size_t batch_count = 0; while (!_close) { @@ -1903,6 +1981,7 @@ void BlockFileCache::run_background_gc() { } void BlockFileCache::run_background_evict_in_advance() { + Thread::set_self_name("run_background_evict_in_advance"); LOG(INFO) << "Starting background evict in advance thread"; int64_t batch = 0; while (!_close) { @@ -1982,9 +2061,15 @@ void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, if (st.ok()) { auto& queue = get_queue(origin_type); queue.remove(cell.queue_iterator.value(), cache_lock); + _lru_recorder->record_queue_event(origin_type, CacheLRULogType::REMOVE, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); auto& ttl_queue = get_queue(FileCacheType::TTL); cell.queue_iterator = ttl_queue.add(hash, cell.file_block->offset(), cell.file_block->range().size(), cache_lock); + _lru_recorder->record_queue_event(FileCacheType::TTL, CacheLRULogType::ADD, + cell.file_block->get_hash_value(), + cell.file_block->offset(), cell.size()); } if (!st.ok()) { LOG_WARNING("").error(st); @@ -2066,6 +2151,7 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size, } std::string BlockFileCache::clear_file_cache_directly() { + _lru_dumper->remove_lru_dump_files(); using namespace std::chrono; std::stringstream ss; auto start = steady_clock::now(); @@ -2113,9 +2199,9 @@ std::string BlockFileCache::clear_file_cache_directly() { << " cache_size=" << cache_size << " index_queue_size=" << index_queue_size << " normal_queue_size=" << normal_queue_size << " disposible_queue_size=" << disposible_queue_size << "ttl_queue_size=" << ttl_queue_size; - auto msg = ss.str(); LOG(INFO) << msg; + _lru_dumper->remove_lru_dump_files(); return msg; } @@ -2144,6 +2230,68 @@ void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { }; } +void BlockFileCache::run_background_lru_log_replay() { + Thread::set_self_name("run_background_lru_log_replay"); + while (!_close) { + int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms; + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + if (_close) { + break; + } + } + + _lru_recorder->replay_queue_event(FileCacheType::TTL); + _lru_recorder->replay_queue_event(FileCacheType::INDEX); + _lru_recorder->replay_queue_event(FileCacheType::NORMAL); + _lru_recorder->replay_queue_event(FileCacheType::DISPOSABLE); + + if (config::enable_evaluate_shadow_queue_diff) { + SCOPED_CACHE_LOCK(_mutex, this); + _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock); + _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock); + _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock); + _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock); + } + } +} + +void BlockFileCache::dump_lru_queues(bool force) { + std::unique_lock dump_lock(_dump_lru_queues_mtx); + if (config::file_cache_background_lru_dump_tail_record_num > 0 && + !ExecEnv::GetInstance()->get_is_upgrading()) { + _lru_dumper->dump_queue("disposable", force); + _lru_dumper->dump_queue("normal", force); + _lru_dumper->dump_queue("index", force); + _lru_dumper->dump_queue("ttl", force); + _lru_dumper->set_first_dump_done(); + } +} + +void BlockFileCache::run_background_lru_dump() { + Thread::set_self_name("run_background_lru_dump"); + while (!_close) { + int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms; + { + std::unique_lock close_lock(_close_mtx); + _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms)); + if (_close) { + break; + } + } + dump_lru_queues(false); + } +} + +void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard& cache_lock) { + // keep this order coz may be duplicated in different queue, we use the first appearence + _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock); + _lru_dumper->restore_queue(_index_queue, "index", cache_lock); + _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock); + _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock); +} + std::map BlockFileCache::get_stats() { std::map stats; stats["hits_ratio"] = (double)_hit_ratio->get_value(); diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index 0e3d26ab60590f..53467da9c2dd10 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -27,9 +27,11 @@ #include #include +#include "io/cache/cache_lru_dumper.h" #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" #include "io/cache/file_cache_storage.h" +#include "io/cache/lru_queue_recorder.h" #include "util/runtime_profile.h" #include "util/threadpool.h" @@ -73,10 +75,6 @@ class LockScopedTimer { #define SCOPED_CACHE_LOCK(MUTEX, cache) std::lock_guard cache_lock(MUTEX); #endif -template -concept IsXLock = std::same_as> || - std::same_as>; - class FSFileCacheStorage; // The BlockFileCache is responsible for the management of the blocks @@ -86,16 +84,16 @@ class BlockFileCache { friend class MemFileCacheStorage; friend class FileBlock; friend struct FileBlocksHolder; + friend class CacheLRUDumper; + friend class LRUQueueRecorder; public: - static std::string cache_type_to_string(FileCacheType type); - static FileCacheType string_to_cache_type(const std::string& str); // hash the file_name to uint128 static UInt128Wrapper hash(const std::string& path); BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings); - ~BlockFileCache() { + virtual ~BlockFileCache() { { std::lock_guard lock(_close_mtx); _close = true; @@ -113,6 +111,12 @@ class BlockFileCache { if (_cache_background_evict_in_advance_thread.joinable()) { _cache_background_evict_in_advance_thread.join(); } + if (_cache_background_lru_dump_thread.joinable()) { + _cache_background_lru_dump_thread.join(); + } + if (_cache_background_lru_log_replay_thread.joinable()) { + _cache_background_lru_log_replay_thread.join(); + } } /// Restore cache from local filesystem. @@ -161,6 +165,8 @@ class BlockFileCache { std::string dump_structure(const UInt128Wrapper& hash); std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset); + void dump_lru_queues(bool force); + [[nodiscard]] size_t get_used_cache_size(FileCacheType type) const; [[nodiscard]] size_t get_file_blocks_num(FileCacheType type) const; @@ -217,86 +223,6 @@ class BlockFileCache { // for be UTs std::map get_stats_unsafe(); - class LRUQueue { - public: - LRUQueue() = default; - LRUQueue(size_t max_size, size_t max_element_size, int64_t hot_data_interval) - : max_size(max_size), - max_element_size(max_element_size), - hot_data_interval(hot_data_interval) {} - - struct HashFileKeyAndOffset { - std::size_t operator()(const std::pair& pair) const { - return KeyHash()(pair.first) + pair.second; - } - }; - - struct FileKeyAndOffset { - UInt128Wrapper hash; - size_t offset; - size_t size; - - FileKeyAndOffset(const UInt128Wrapper& hash, size_t offset, size_t size) - : hash(hash), offset(offset), size(size) {} - }; - - using Iterator = typename std::list::iterator; - - size_t get_max_size() const { return max_size; } - size_t get_max_element_size() const { return max_element_size; } - - template - requires IsXLock - size_t get_capacity(T& /* cache_lock */) const { - return cache_size; - } - - size_t get_capacity_unsafe() const { return cache_size; } - - size_t get_elements_num_unsafe() const { return queue.size(); } - - size_t get_elements_num(std::lock_guard& /* cache_lock */) const { - return queue.size(); - } - - Iterator add(const UInt128Wrapper& hash, size_t offset, size_t size, - std::lock_guard& cache_lock); - template - requires IsXLock - void remove(Iterator queue_it, T& cache_lock); - - void move_to_end(Iterator queue_it, std::lock_guard& cache_lock); - - std::string to_string(std::lock_guard& cache_lock) const; - - bool contains(const UInt128Wrapper& hash, size_t offset, - std::lock_guard& cache_lock) const; - - Iterator begin() { return queue.begin(); } - - Iterator end() { return queue.end(); } - - void remove_all(std::lock_guard& cache_lock); - - Iterator get(const UInt128Wrapper& hash, size_t offset, - std::lock_guard& /* cache_lock */) const; - - int64_t get_hot_data_interval() const { return hot_data_interval; } - - void clear(std::lock_guard& cache_lock) { - queue.clear(); - map.clear(); - cache_size = 0; - } - - size_t max_size; - size_t max_element_size; - std::list queue; - std::unordered_map, Iterator, HashFileKeyAndOffset> map; - size_t cache_size = 0; - int64_t hot_data_interval {0}; - }; - using AccessRecord = std::unordered_map; @@ -395,8 +321,8 @@ class BlockFileCache { FileBlockCell(const FileBlockCell&) = delete; }; - BlockFileCache::LRUQueue& get_queue(FileCacheType type); - const BlockFileCache::LRUQueue& get_queue(FileCacheType type) const; + LRUQueue& get_queue(FileCacheType type); + const LRUQueue& get_queue(FileCacheType type) const; template requires IsXLock && IsXLock @@ -409,9 +335,9 @@ class BlockFileCache { requires IsXLock FileBlockCell* get_cell(const UInt128Wrapper& hash, size_t offset, T& cache_lock); - FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& context, size_t offset, - size_t size, FileBlock::State state, - std::lock_guard& cache_lock); + virtual FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& context, + size_t offset, size_t size, FileBlock::State state, + std::lock_guard& cache_lock); Status initialize_unlocked(std::lock_guard& cache_lock); @@ -469,6 +395,9 @@ class BlockFileCache { void run_background_monitor(); void run_background_ttl_gc(); void run_background_gc(); + void run_background_lru_log_replay(); + void run_background_lru_dump(); + void restore_lru_queues_from_disk(std::lock_guard& cache_lock); void run_background_evict_in_advance(); bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type, @@ -495,6 +424,17 @@ class BlockFileCache { std::lock_guard& cache_lock, size_t& cur_removed_size, bool evict_in_advance); + Status check_ofstream_status(std::ofstream& out, std::string& filename); + Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash, + size_t offset, size_t size); + Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& tmp_filename, + std::string& final_filename, size_t& file_size); + Status check_ifstream_status(std::ifstream& in, std::string& filename); + Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& entry_num); + Status parse_one_lru_entry(std::ifstream& in, std::string& filename, UInt128Wrapper& hash, + size_t& offset, size_t& size); + void remove_lru_dump_files(); + // info std::string _cache_base_path; size_t _capacity = 0; @@ -509,6 +449,8 @@ class BlockFileCache { std::thread _cache_background_ttl_gc_thread; std::thread _cache_background_gc_thread; std::thread _cache_background_evict_in_advance_thread; + std::thread _cache_background_lru_dump_thread; + std::thread _cache_background_lru_log_replay_thread; std::atomic_bool _async_open_done {false}; // disk space or inode is less than the specified value bool _disk_resource_limit_mode {false}; @@ -538,6 +480,9 @@ class BlockFileCache { // keys for async remove RecycleFileCacheKeys _recycle_keys; + std::unique_ptr _lru_recorder; + std::unique_ptr _lru_dumper; + // metrics std::shared_ptr> _cache_capacity_metrics; std::shared_ptr> _cur_cache_size_metrics; @@ -582,11 +527,15 @@ class BlockFileCache { std::shared_ptr _evict_in_advance_latency_us; std::shared_ptr _recycle_keys_length_recorder; std::shared_ptr _ttl_gc_latency_us; + + std::shared_ptr _shadow_queue_levenshtein_distance; // keep _storage last so it will deconstruct first // otherwise, load_cache_info_into_memory might crash // coz it will use other members of BlockFileCache // so join this async load thread first std::unique_ptr _storage; + std::shared_ptr _lru_dump_latency_us; + std::mutex _dump_lru_queues_mtx; }; } // namespace doris::io diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index d43e3acea14daf..50a05957074bba 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -163,6 +163,12 @@ std::string FileCacheFactory::clear_file_caches(bool sync) { return ss.str(); } +void FileCacheFactory::dump_all_caches() { + for (const auto& cache : _caches) { + cache->dump_lru_queues(true); + } +} + std::vector FileCacheFactory::get_base_paths() { std::vector paths; for (const auto& pair : _path_to_cache) { diff --git a/be/src/io/cache/block_file_cache_factory.h b/be/src/io/cache/block_file_cache_factory.h index b00bd7bdfcb315..d5598bf97cb01d 100644 --- a/be/src/io/cache/block_file_cache_factory.h +++ b/be/src/io/cache/block_file_cache_factory.h @@ -77,6 +77,11 @@ class FileCacheFactory { */ std::string clear_file_caches(bool sync); + /** + * dump lru queue info for all file cache instances + */ + void dump_all_caches(); + std::vector get_base_paths(); /** diff --git a/be/src/io/cache/cache_lru_dumper.cpp b/be/src/io/cache/cache_lru_dumper.cpp new file mode 100644 index 00000000000000..c4c8d3c804fc52 --- /dev/null +++ b/be/src/io/cache/cache_lru_dumper.cpp @@ -0,0 +1,511 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/cache_lru_dumper.h" + +#include "io/cache/block_file_cache.h" +#include "io/cache/cache_lru_dumper.h" +#include "io/cache/lru_queue_recorder.h" +#include "util/coding.h" +#include "util/crc32c.h" + +namespace doris::io { + +std::string CacheLRUDumper::Footer::serialize_as_string() const { + std::string result; + result.reserve(sizeof(Footer)); + + // Serialize meta_offset (convert to little-endian) + uint64_t meta_offset_le; + encode_fixed64_le(reinterpret_cast(&meta_offset_le), meta_offset); + result.append(reinterpret_cast(&meta_offset_le), sizeof(meta_offset_le)); + + // Serialize checksum (convert to little-endian) + uint32_t checksum_le; + encode_fixed32_le(reinterpret_cast(&checksum_le), checksum); + + result.append(reinterpret_cast(&checksum_le), sizeof(checksum_le)); + + result.append(reinterpret_cast(&version), sizeof(version)); + + // Serialize magic + result.append(magic, sizeof(magic)); + + return result; +} + +bool CacheLRUDumper::Footer::deserialize_from_string(const std::string& data) { + DCHECK(data.size() == sizeof(Footer)); + + const char* ptr = data.data(); + + // Deserialize meta_offset (convert from little-endian) + uint64_t meta_offset_le; + std::memcpy(&meta_offset_le, ptr, sizeof(meta_offset_le)); + meta_offset = decode_fixed64_le(reinterpret_cast(&meta_offset_le)); + ptr += sizeof(meta_offset_le); + + // Deserialize checksum (convert from little-endian) + uint32_t checksum_le; + std::memcpy(&checksum_le, ptr, sizeof(checksum_le)); + checksum = decode_fixed32_le(reinterpret_cast(&checksum_le)); + ptr += sizeof(checksum_le); + + version = *((uint8_t*)ptr); + ptr += sizeof(version); + + // Deserialize magic + std::memcpy(magic, ptr, sizeof(magic)); + + return true; +} + +Status CacheLRUDumper::check_ofstream_status(std::ofstream& out, std::string& filename) { + if (!out.good()) { + std::ios::iostate state = out.rdstate(); + std::stringstream err_msg; + if (state & std::ios::eofbit) { + err_msg << "End of file reached."; + } + if (state & std::ios::failbit) { + err_msg << "Input/output operation failed, err_code: " << strerror(errno); + } + if (state & std::ios::badbit) { + err_msg << "Serious I/O error occurred, err_code: " << strerror(errno); + } + out.close(); + std::string warn_msg = fmt::format("dump lru writing failed, file={}, {}", filename, + err_msg.str().c_str()); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + return Status::OK(); +} + +Status CacheLRUDumper::check_ifstream_status(std::ifstream& in, std::string& filename) { + if (!in.good()) { + std::ios::iostate state = in.rdstate(); + std::stringstream err_msg; + if (state & std::ios::eofbit) { + err_msg << "End of file reached."; + } + if (state & std::ios::failbit) { + err_msg << "Input/output operation failed, err_code: " << strerror(errno); + } + if (state & std::ios::badbit) { + err_msg << "Serious I/O error occurred, err_code: " << strerror(errno); + } + in.close(); + std::string warn_msg = std::string( + fmt::format("dump lru reading failed, file={}, {}", filename, err_msg.str())); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + return Status::OK(); +} + +Status CacheLRUDumper::dump_one_lru_entry(std::ofstream& out, std::string& filename, + const UInt128Wrapper& hash, size_t offset, size_t size) { + // Dump file format description: + // +-----------------------------------------------+ + // | LRUDumpEntryGroupPb_1 | + // +-----------------------------------------------+ + // | LRUDumpEntryGroupPb_2 | + // +-----------------------------------------------+ + // | LRUDumpEntryGroupPb_3 | + // +-----------------------------------------------+ + // | ... | + // +-----------------------------------------------+ + // | LRUDumpEntryGroupPb_n | + // +-----------------------------------------------+ + // | LRUDumpMetaPb (List) | + // +-----------------------------------------------+ + // | FOOTER_OFFSET (8Bytes) | + // +-----------------------------------------------+ + // | CHECKSUM (4Bytes)|VERSION (1Byte)|MAGIC (3B)| + // +-----------------------------------------------+ + // + // why we are not using protobuf as a whole? + // AFAIK, current protobuf version dose not support streaming mode, + // so that we need to store all the message in memory which will + // consume loads of RAMs. + // Instead, we use protobuf serialize each of the single entry + // and provide the version field in the footer for upgrade + + ::doris::io::cache::LRUDumpEntryPb* entry = _current_dump_group.add_entries(); + ::doris::io::cache::UInt128WrapperPb* hash_pb = entry->mutable_hash(); + hash_pb->set_high(hash.high()); + hash_pb->set_low(hash.low()); + entry->set_offset(offset); + entry->set_size(size); + + _current_dump_group_count++; + if (_current_dump_group_count >= 10000) { + RETURN_IF_ERROR(flush_current_group(out, filename)); + } + return Status::OK(); +} + +Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& filename) { + if (_current_dump_group_count == 0) { + return Status::OK(); + } + + // Record current position as group start offset + size_t group_start = out.tellp(); + + // Serialize and write the group + std::string serialized; + VLOG_DEBUG << "Serialized size: " << serialized.size() + << " Before serialization: " << _current_dump_group.DebugString(); + if (!_current_dump_group.SerializeToString(&serialized)) { + std::string warn_msg = fmt::format("Failed to serialize LRUDumpEntryGroupPb"); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + out.write(serialized.data(), serialized.size()); + RETURN_IF_ERROR(check_ofstream_status(out, filename)); + + // Record group metadata + ::doris::io::cache::EntryGroupOffsetSizePb* group_info = _dump_meta.add_group_offset_size(); + group_info->set_offset(group_start); + group_info->set_size(serialized.size()); + uint32_t checksum = crc32c::Value(serialized.data(), serialized.size()); + group_info->set_checksum(checksum); + + // Reset for next group + _current_dump_group.Clear(); + _current_dump_group_count = 0; + return Status::OK(); +} + +Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num, + std::string& tmp_filename, std::string& final_filename, + size_t& file_size) { + // Flush any remaining entries + if (_current_dump_group_count > 0) { + RETURN_IF_ERROR(flush_current_group(out, tmp_filename)); + } + + // Write meta information + _dump_meta.set_entry_num(entry_num); + size_t meta_offset = out.tellp(); + LOG(INFO) << "dump meta: " << _dump_meta.DebugString(); + std::string meta_serialized; + if (!_dump_meta.SerializeToString(&meta_serialized)) { + std::string warn_msg = + fmt::format("Failed to serialize LRUDumpMetaPb, file={}", tmp_filename); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + out.write(meta_serialized.data(), meta_serialized.size()); + RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename)); + + // Write footer + Footer footer; + footer.meta_offset = meta_offset; + footer.checksum = 0; + footer.version = 1; + std::memcpy(footer.magic, "DOR", 3); + + std::string footer_str = footer.serialize_as_string(); + out.write(footer_str.data(), footer_str.size()); + RETURN_IF_ERROR(check_ofstream_status(out, tmp_filename)); + + out.close(); + + if (_is_first_dump) [[unlikely]] { + // we back up two dumps (one for last before be restart, one for first after be restart) + // for later debug the restore process + try { + if (std::filesystem::exists(final_filename)) { + std::string backup_filename = final_filename + "_" + _start_time + "_last"; + std::rename(final_filename.c_str(), backup_filename.c_str()); + } + std::string timestamped_filename = final_filename + "_" + _start_time; + std::filesystem::copy_file(tmp_filename, timestamped_filename); + + std::filesystem::path dir = std::filesystem::path(final_filename).parent_path(); + std::string prefix = std::filesystem::path(final_filename).filename().string(); + uint64_t total_size = 0; + std::vector> files; + for (const auto& entry : std::filesystem::directory_iterator(dir)) { + if (entry.path().filename().string().find(prefix) == 0) { + total_size += entry.file_size(); + files.emplace_back(entry.path(), entry.last_write_time()); + } + } + if (total_size > 5ULL * 1024 * 1024 * 1024) { + // delete oldest two files + std::sort(files.begin(), files.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + if (!files.empty()) { + auto remove_file = [](const std::filesystem::path& file_path) { + std::error_code ec; + bool removed = std::filesystem::remove(file_path, ec); + LOG(INFO) << "Remove " << (removed ? "succeeded" : "failed") + << " for file: " << file_path + << (ec ? ", error: " + ec.message() : ""); + return removed; + }; + + remove_file(files[0].first); + if (files.size() > 1) { + remove_file(files[1].first); + } + } + } + } catch (const std::filesystem::filesystem_error& e) { + LOG(WARNING) << "failed to handle first dump case: " << e.what(); + } + } + + // Rename tmp to formal file + try { + std::rename(tmp_filename.c_str(), final_filename.c_str()); + file_size = std::filesystem::file_size(final_filename); + } catch (const std::filesystem::filesystem_error& e) { + LOG(WARNING) << "failed to rename " << tmp_filename << " to " << final_filename + << " err: " << e.what(); + } + + _dump_meta.Clear(); + _current_dump_group.Clear(); + _current_dump_group_count = 0; + + return Status::OK(); +} + +void CacheLRUDumper::dump_queue(const std::string& queue_name, bool force) { + FileCacheType type = string_to_cache_type(queue_name); + if (force || _recorder->get_lru_queue_update_cnt_from_last_dump(type) > + config::file_cache_background_lru_dump_update_cnt_threshold) { + LRUQueue& queue = _recorder->get_shadow_queue(type); + do_dump_queue(queue, queue_name); + _recorder->reset_lru_queue_update_cnt_from_last_dump(type); + } +} + +void CacheLRUDumper::do_dump_queue(LRUQueue& queue, const std::string& queue_name) { + Status st; + std::vector> elements; + elements.reserve(config::file_cache_background_lru_dump_tail_record_num); + + { + std::lock_guard lru_log_lock(_recorder->_mutex_lru_log); + size_t count = 0; + for (const auto& [hash, offset, size] : queue) { + if (count++ >= config::file_cache_background_lru_dump_tail_record_num) break; + elements.emplace_back(hash, offset, size); + } + } + + // Write to disk + int64_t duration_ns = 0; + std::uintmax_t file_size = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + std::string tmp_filename = + fmt::format("{}/lru_dump_{}.tail.tmp", _mgr->_cache_base_path, queue_name); + std::string final_filename = + fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, queue_name); + std::ofstream out(tmp_filename, std::ios::binary); + if (out) { + LOG(INFO) << "begin dump " << queue_name << " with " << elements.size() << " elements"; + for (const auto& [hash, offset, size] : elements) { + RETURN_IF_STATUS_ERROR(st, + dump_one_lru_entry(out, tmp_filename, hash, offset, size)); + } + RETURN_IF_STATUS_ERROR(st, finalize_dump(out, elements.size(), tmp_filename, + final_filename, file_size)); + } else { + LOG(WARNING) << "open lru dump file failed, reason: " << tmp_filename + << " failed to create"; + } + } + *(_mgr->_lru_dump_latency_us) << (duration_ns / 1000); + LOG(INFO) << fmt::format("lru dump for {} size={} element={} time={}us", queue_name, file_size, + elements.size(), duration_ns / 1000); +}; + +Status CacheLRUDumper::parse_dump_footer(std::ifstream& in, std::string& filename, + size_t& entry_num) { + size_t file_size = std::filesystem::file_size(filename); + + // Read footer + Footer footer; + size_t footer_size = sizeof(footer); + if (file_size < footer_size) { + std::string warn_msg = std::string(fmt::format( + "LRU dump file too small to contain footer, file={}, skip restore", filename)); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + in.seekg(-footer_size, std::ios::end); + std::string footer_str(footer_size, '\0'); + in.read(&footer_str[0], footer_size); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + + if (!footer.deserialize_from_string(footer_str)) { + std::string warn_msg = std::string( + fmt::format("Failed to deserialize footer, file={}, skip restore", filename)); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + // Validate footer + if (footer.version != 1 || std::string(footer.magic, 3) != "DOR") { + std::string warn_msg = std::string(fmt::format( + "LRU dump file invalid footer format, file={}, skip restore", filename)); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + // Read meta + in.seekg(footer.meta_offset, std::ios::beg); + size_t meta_size = file_size - footer.meta_offset - footer_size; + if (meta_size <= 0) { + std::string warn_msg = std::string( + fmt::format("LRU dump file invalid meta size, file={}, skip restore", filename)); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + std::string meta_serialized(meta_size, '\0'); + in.read(&meta_serialized[0], meta_serialized.size()); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + _parse_meta.Clear(); + _current_parse_group.Clear(); + if (!_parse_meta.ParseFromString(meta_serialized)) { + std::string warn_msg = std::string( + fmt::format("LRU dump file meta parse failed, file={}, skip restore", filename)); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + VLOG_DEBUG << "parse meta: " << _parse_meta.DebugString(); + + entry_num = _parse_meta.entry_num(); + return Status::OK(); +} + +Status CacheLRUDumper::parse_one_lru_entry(std::ifstream& in, std::string& filename, + UInt128Wrapper& hash, size_t& offset, size_t& size) { + // Read next group if current is empty + if (_current_parse_group.entries_size() == 0) { + if (_parse_meta.group_offset_size_size() == 0) { + return Status::EndOfFile("No more entries"); + } + + auto group_info = _parse_meta.group_offset_size(0); + in.seekg(group_info.offset(), std::ios::beg); + std::string group_serialized(group_info.size(), '\0'); + in.read(&group_serialized[0], group_serialized.size()); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + uint32_t checksum = crc32c::Value(group_serialized.data(), group_serialized.size()); + if (checksum != group_info.checksum()) { + std::string warn_msg = + fmt::format("restore lru failed as checksum not match, file={}", filename); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + if (!_current_parse_group.ParseFromString(group_serialized)) { + std::string warn_msg = + fmt::format("restore lru failed to parse group, file={}", filename); + LOG(WARNING) << warn_msg; + return Status::InternalError(warn_msg); + } + + // Remove processed group info + _parse_meta.mutable_group_offset_size()->erase(_parse_meta.group_offset_size().begin()); + } + + // Get next entry from current group + VLOG_DEBUG << "After deserialization: " << _current_parse_group.DebugString(); + auto entry = _current_parse_group.entries(0); + hash = UInt128Wrapper((static_cast(entry.hash().high()) << 64) | entry.hash().low()); + offset = entry.offset(); + size = entry.size(); + + // Remove processed entry + _current_parse_group.mutable_entries()->erase(_current_parse_group.entries().begin()); + return Status::OK(); +} + +void CacheLRUDumper::restore_queue(LRUQueue& queue, const std::string& queue_name, + std::lock_guard& cache_lock) { + Status st; + std::string filename = fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, queue_name); + std::ifstream in(filename, std::ios::binary); + int64_t duration_ns = 0; + if (in) { + LOG(INFO) << "lru dump file is founded for " << queue_name << ". starting lru restore."; + + SCOPED_RAW_TIMER(&duration_ns); + size_t entry_num = 0; + RETURN_IF_STATUS_ERROR(st, parse_dump_footer(in, filename, entry_num)); + LOG(INFO) << "lru dump file for " << queue_name << " has " << entry_num << " entries."; + in.seekg(0, std::ios::beg); + UInt128Wrapper hash; + size_t offset, size; + for (int i = 0; i < entry_num; ++i) { + RETURN_IF_STATUS_ERROR(st, parse_one_lru_entry(in, filename, hash, offset, size)); + CacheContext ctx; + if (queue_name == "ttl") { + ctx.cache_type = FileCacheType::TTL; + // TODO(zhengyu): we haven't persist expiration time yet, use 3h default + // There are mulitiple places we can correct this fake 3h ttl, e.g.: + // 1. during load_cache_info_into_memory (this will cause overwriting the ttl of async load) + // 2. after restoring, use sync_meta to modify the ttl + // However, I plan not to do this in this commit but to figure a more elegant way + // after ttl expiration time being changed from file name encoding to rocksdb persistency. + ctx.expiration_time = 10800; + } else if (queue_name == "index") { + ctx.cache_type = FileCacheType::INDEX; + } else if (queue_name == "normal") { + ctx.cache_type = FileCacheType::NORMAL; + } else if (queue_name == "disposable") { + ctx.cache_type = FileCacheType::DISPOSABLE; + } else { + LOG_WARNING("unknown queue type for lru restore, skip"); + DCHECK(false); + return; + } + // TODO(zhengyu): we don't use stats yet, see if this will cause any problem + _mgr->add_cell(hash, ctx, offset, size, FileBlock::State::DOWNLOADED, cache_lock); + } + in.close(); + } else { + LOG(INFO) << "no lru dump file is founded for " << queue_name; + } + LOG(INFO) << "lru restore time costs: " << (duration_ns / 1000) << "us."; +}; + +void CacheLRUDumper::remove_lru_dump_files() { + std::vector queue_names = {"disposable", "index", "normal", "ttl"}; + for (const auto& queue_name : queue_names) { + std::string filename = + fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, queue_name); + if (std::filesystem::exists(filename)) { + std::filesystem::remove(filename); + } + } +} + +} // end of namespace doris::io diff --git a/be/src/io/cache/cache_lru_dumper.h b/be/src/io/cache/cache_lru_dumper.h new file mode 100644 index 00000000000000..d9addff614c685 --- /dev/null +++ b/be/src/io/cache/cache_lru_dumper.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gen_cpp/file_cache.pb.h" +#include "io/cache/file_cache_common.h" + +namespace doris::io { +class LRUQueue; +class LRUQueueRecorder; + +class CacheLRUDumper { +public: + CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder) + : _mgr(mgr), _recorder(recorder) { + auto now = std::chrono::system_clock::now(); + auto in_time_t = std::chrono::system_clock::to_time_t(now); + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y%m%d%H%M%S"); + _start_time = ss.str(); + }; + + void dump_queue(const std::string& queue_name, bool force); + void restore_queue(LRUQueue& queue, const std::string& queue_name, + std::lock_guard& cache_lock); + void remove_lru_dump_files(); + void set_first_dump_done() { _is_first_dump = false; } + +private: + void do_dump_queue(LRUQueue& queue, const std::string& queue_name); + Status check_ofstream_status(std::ofstream& out, std::string& filename); + Status check_ifstream_status(std::ifstream& in, std::string& filename); + Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash, + size_t offset, size_t size); + Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& tmp_filename, + std::string& final_filename, size_t& file_size); + Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& entry_num); + Status parse_one_lru_entry(std::ifstream& in, std::string& filename, UInt128Wrapper& hash, + size_t& offset, size_t& size); + Status flush_current_group(std::ofstream& out, std::string& filename); + + struct Footer { + size_t meta_offset; + uint32_t checksum; + uint8_t version; + char magic[3]; + + std::string serialize_as_string() const; + bool deserialize_from_string(const std::string& data); + } __attribute__((packed)); + +private: + // For dumping + doris::io::cache::LRUDumpEntryGroupPb _current_dump_group; + doris::io::cache::LRUDumpMetaPb _dump_meta; + size_t _current_dump_group_count = 0; + + // For parsing + doris::io::cache::LRUDumpEntryGroupPb _current_parse_group; + doris::io::cache::LRUDumpMetaPb _parse_meta; + + BlockFileCache* _mgr; + LRUQueueRecorder* _recorder; + + std::string _start_time; + bool _is_first_dump = true; +}; +} // namespace doris::io \ No newline at end of file diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index b89bdcf2f6de0a..ca3abeeba58e7b 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -194,6 +194,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* for (auto& block : holder.file_blocks) { switch (block->state()) { case FileBlock::State::EMPTY: + VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}", + path().native(), _cache_hash.to_string(), _cache_hash.high(), + _cache_hash.low(), block->offset(), block->get_cache_file()); block->get_or_set_downloader(); if (block->is_downloader()) { empty_blocks.push_back(block); @@ -202,6 +205,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* stats.hit_cache = false; break; case FileBlock::State::SKIP_CACHE: + VLOG_DEBUG << fmt::format( + "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}", + path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(), + block->offset(), block->get_cache_file()); empty_blocks.push_back(block); stats.hit_cache = false; stats.skip_cache = true; diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 06f58730296fed..731c1d311e6680 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -172,8 +172,8 @@ Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_typ if (!expr) { LOG(WARNING) << "none of the cache type is TTL" << ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset - << ", new type: " << BlockFileCache::cache_type_to_string(new_type) - << ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type); + << ", new type: " << cache_type_to_string(new_type) + << ", old type: " << cache_type_to_string(_key.meta.type); } DCHECK(expr); @@ -189,8 +189,8 @@ Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_t if (!expr) { LOG(WARNING) << "one of the cache type is TTL" << ", hash: " << _key.hash.to_string() << ", offset: " << _key.offset - << ", new type: " << BlockFileCache::cache_type_to_string(new_type) - << ", old type: " << BlockFileCache::cache_type_to_string(_key.meta.type); + << ", new type: " << cache_type_to_string(new_type) + << ", old type: " << cache_type_to_string(_key.meta.type); } DCHECK(expr); if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) { diff --git a/be/src/io/cache/file_cache_common.cpp b/be/src/io/cache/file_cache_common.cpp index 56525425f758d4..64faec3beb3b3e 100644 --- a/be/src/io/cache/file_cache_common.cpp +++ b/be/src/io/cache/file_cache_common.cpp @@ -26,6 +26,60 @@ namespace doris::io { +std::string cache_type_to_surfix(FileCacheType type) { + switch (type) { + case FileCacheType::INDEX: + return "_idx"; + case FileCacheType::DISPOSABLE: + return "_disposable"; + case FileCacheType::NORMAL: + return ""; + case FileCacheType::TTL: + return "_ttl"; + } + return ""; +} + +FileCacheType surfix_to_cache_type(const std::string& str) { + if (str == "idx") { + return FileCacheType::INDEX; + } else if (str == "disposable") { + return FileCacheType::DISPOSABLE; + } else if (str == "ttl") { + return FileCacheType::TTL; + } + DCHECK(false) << "The string is " << str; + return FileCacheType::DISPOSABLE; +} + +FileCacheType string_to_cache_type(const std::string& str) { + if (str == "normal") { + return FileCacheType::NORMAL; + } else if (str == "index") { + return FileCacheType::INDEX; + } else if (str == "disposable") { + return FileCacheType::DISPOSABLE; + } else if (str == "ttl") { + return FileCacheType::TTL; + } + DCHECK(false) << "The string is " << str; + return FileCacheType::NORMAL; +} +std::string cache_type_to_string(FileCacheType type) { + switch (type) { + case FileCacheType::INDEX: + return "index"; + case FileCacheType::DISPOSABLE: + return "disposable"; + case FileCacheType::NORMAL: + return "normal"; + case FileCacheType::TTL: + return "ttl"; + } + DCHECK(false) << "unknown type: " << type; + return "normal"; +} + std::string FileCacheSettings::to_string() const { std::stringstream ss; ss << "capacity: " << capacity << ", max_file_block_size: " << max_file_block_size @@ -89,4 +143,7 @@ FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs return std::make_unique(std::move(holder)); } +template size_t LRUQueue::get_capacity(std::lock_guard& cache_lock) const; +template void LRUQueue::remove(Iterator queue_it, std::lock_guard& cache_lock); + } // namespace doris::io diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 6e9396fb11acf8..f9ac525d0bef86 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -40,6 +40,12 @@ enum FileCacheType { TTL = 3, }; +std::string cache_type_to_surfix(FileCacheType type); +FileCacheType surfix_to_cache_type(const std::string& str); + +FileCacheType string_to_cache_type(const std::string& str); +std::string cache_type_to_string(FileCacheType type); + struct UInt128Wrapper { uint128_t value_; [[nodiscard]] std::string to_string() const; @@ -48,6 +54,9 @@ struct UInt128Wrapper { explicit UInt128Wrapper(const uint128_t& value) : value_(value) {} bool operator==(const UInt128Wrapper& other) const { return value_ == other.value_; } + + uint64_t high() const { return static_cast(value_ >> 64); } + uint64_t low() const { return static_cast(value_); } }; struct ReadStatistics { @@ -152,4 +161,94 @@ struct CacheContext { ReadStatistics* stats; }; +template +concept IsXLock = std::same_as> || + std::same_as>; + +class LRUQueue { +public: + LRUQueue() = default; + LRUQueue(size_t max_size, size_t max_element_size, int64_t hot_data_interval) + : max_size(max_size), + max_element_size(max_element_size), + hot_data_interval(hot_data_interval) {} + + struct HashFileKeyAndOffset { + std::size_t operator()(const std::pair& pair) const { + return KeyHash()(pair.first) + pair.second; + } + }; + + struct FileKeyAndOffset { + UInt128Wrapper hash; + size_t offset; + size_t size; + + FileKeyAndOffset(const UInt128Wrapper& hash, size_t offset, size_t size) + : hash(hash), offset(offset), size(size) {} + }; + + using Iterator = typename std::list::iterator; + + size_t get_max_size() const { return max_size; } + size_t get_max_element_size() const { return max_element_size; } + + template + requires IsXLock + size_t get_capacity(T& /* cache_lock */) const { + return cache_size; + } + + size_t get_capacity_unsafe() const { return cache_size; } + + size_t get_elements_num_unsafe() const { return queue.size(); } + + size_t get_elements_num(std::lock_guard& /* cache_lock */) const { + return queue.size(); + } + + Iterator add(const UInt128Wrapper& hash, size_t offset, size_t size, + std::lock_guard& cache_lock); + template + requires IsXLock + void remove(Iterator queue_it, T& /* cache_lock */) { + cache_size -= queue_it->size; + map.erase(std::make_pair(queue_it->hash, queue_it->offset)); + queue.erase(queue_it); + } + + void move_to_end(Iterator queue_it, std::lock_guard& cache_lock); + + std::string to_string(std::lock_guard& cache_lock) const; + + bool contains(const UInt128Wrapper& hash, size_t offset, + std::lock_guard& cache_lock) const; + + Iterator begin() { return queue.begin(); } + + Iterator end() { return queue.end(); } + + void remove_all(std::lock_guard& cache_lock); + + Iterator get(const UInt128Wrapper& hash, size_t offset, + std::lock_guard& /* cache_lock */) const; + + int64_t get_hot_data_interval() const { return hot_data_interval; } + + void clear(std::lock_guard& cache_lock) { + queue.clear(); + map.clear(); + cache_size = 0; + } + + size_t levenshtein_distance_from(LRUQueue& base, std::lock_guard& cache_lock); + + size_t max_size; + size_t max_element_size; + std::list queue; + std::unordered_map, Iterator, HashFileKeyAndOffset> map; + size_t cache_size = 0; + int64_t hot_data_interval {0}; +}; + } // namespace doris::io diff --git a/be/src/io/cache/file_cache_lru_tool.cpp b/be/src/io/cache/file_cache_lru_tool.cpp new file mode 100644 index 00000000000000..a6e133c7a56e35 --- /dev/null +++ b/be/src/io/cache/file_cache_lru_tool.cpp @@ -0,0 +1,233 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/file_cache.pb.h" +#include "io/cache/cache_lru_dumper.h" +#include "io/cache/file_cache_common.h" +#include "io/cache/lru_queue_recorder.h" +#include "util/coding.h" +#include "util/crc32c.h" + +using namespace doris; + +DEFINE_string(filename, "", "dump file name"); + +std::string get_usage(const std::string& progname) { + std::stringstream ss; + ss << progname << " is the Doris BE file cache lru tool for examing dumped content.\n"; + + ss << "Usage:\n"; + ss << progname << " --filename [filename]\n"; + ss << "\nExample:\n"; + ss << progname << " --filename ./lru_dump_ttl.tail\n"; + return ss.str(); +} + +Status check_ifstream_status(std::ifstream& in, std::string& filename) { + if (!in.good()) { + std::ios::iostate state = in.rdstate(); + std::stringstream err_msg; + if (state & std::ios::eofbit) { + err_msg << "End of file reached."; + } + if (state & std::ios::failbit) { + err_msg << "Input/output operation failed, err_code: " << strerror(errno); + } + if (state & std::ios::badbit) { + err_msg << "Serious I/O error occurred, err_code: " << strerror(errno); + } + in.close(); + std::string warn_msg = std::string( + fmt::format("dump lru reading failed, file={}, {}", filename, err_msg.str())); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + + return Status::OK(); +} + +struct Footer { + size_t meta_offset; + uint32_t checksum; + uint8_t version; + char magic[3]; + + std::string serialize_as_string() const; + bool deserialize_from_string(const std::string& data) { + DCHECK(data.size() == sizeof(Footer)); + + const char* ptr = data.data(); + + // Deserialize meta_offset (convert from little-endian) + uint64_t meta_offset_le; + std::memcpy(&meta_offset_le, ptr, sizeof(meta_offset_le)); + meta_offset = decode_fixed64_le(reinterpret_cast(&meta_offset_le)); + ptr += sizeof(meta_offset_le); + + // Deserialize checksum (convert from little-endian) + uint32_t checksum_le; + std::memcpy(&checksum_le, ptr, sizeof(checksum_le)); + checksum = decode_fixed32_le(reinterpret_cast(&checksum_le)); + ptr += sizeof(checksum_le); + + version = *((uint8_t*)ptr); + ptr += sizeof(version); + + // Deserialize magic + std::memcpy(magic, ptr, sizeof(magic)); + + return true; + } +} __attribute__((packed)); + +Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& entry_num, + doris::io::cache::LRUDumpMetaPb& parse_meta, + doris::io::cache::LRUDumpEntryGroupPb& current_parse_group) { + size_t file_size = std::filesystem::file_size(filename); + + // Read footer + Footer footer; + size_t footer_size = sizeof(footer); + if (file_size < footer_size) { + std::string warn_msg = std::string(fmt::format( + "LRU dump file too small to contain footer, file={}, skip restore", filename)); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + + in.seekg(-footer_size, std::ios::end); + std::string footer_str(footer_size, '\0'); + in.read(&footer_str[0], footer_size); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + + if (!footer.deserialize_from_string(footer_str)) { + std::string warn_msg = std::string( + fmt::format("Failed to deserialize footer, file={}, skip restore", filename)); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + + // Validate footer + if (footer.version != 1 || std::string(footer.magic, 3) != "DOR") { + std::string warn_msg = std::string(fmt::format( + "LRU dump file invalid footer format, file={}, skip restore", filename)); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + + // Read meta + in.seekg(footer.meta_offset, std::ios::beg); + size_t meta_size = file_size - footer.meta_offset - footer_size; + if (meta_size <= 0) { + std::string warn_msg = std::string( + fmt::format("LRU dump file invalid meta size, file={}, skip restore", filename)); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + std::string meta_serialized(meta_size, '\0'); + in.read(&meta_serialized[0], meta_serialized.size()); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + parse_meta.Clear(); + current_parse_group.Clear(); + if (!parse_meta.ParseFromString(meta_serialized)) { + std::string warn_msg = std::string( + fmt::format("LRU dump file meta parse failed, file={}, skip restore", filename)); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + std::cout << "parse meta: " << parse_meta.DebugString() << std::endl; + + entry_num = parse_meta.entry_num(); + return Status::OK(); +} + +Status parse_one_lru_entry(std::ifstream& in, std::string& filename, io::UInt128Wrapper& hash, + size_t& offset, size_t& size, + doris::io::cache::LRUDumpMetaPb& parse_meta, + doris::io::cache::LRUDumpEntryGroupPb& current_parse_group) { + // Read next group if current is empty + if (current_parse_group.entries_size() == 0) { + if (parse_meta.group_offset_size_size() == 0) { + return Status::EndOfFile("No more entries"); + } + + auto group_info = parse_meta.group_offset_size(0); + in.seekg(group_info.offset(), std::ios::beg); + std::string group_serialized(group_info.size(), '\0'); + in.read(&group_serialized[0], group_serialized.size()); + RETURN_IF_ERROR(check_ifstream_status(in, filename)); + uint32_t checksum = crc32c::Value(group_serialized.data(), group_serialized.size()); + if (checksum != group_info.checksum()) { + std::string warn_msg = + fmt::format("restore lru failed as checksum not match, file={}", filename); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + if (!current_parse_group.ParseFromString(group_serialized)) { + std::string warn_msg = + fmt::format("restore lru failed to parse group, file={}", filename); + std::cerr << warn_msg << std::endl; + return Status::InternalError(warn_msg); + } + + // Remove processed group info + parse_meta.mutable_group_offset_size()->erase(parse_meta.group_offset_size().begin()); + } + + // Get next entry from current group + std::cout << "After deserialization: " << current_parse_group.DebugString() << std::endl; + auto entry = current_parse_group.entries(0); + hash = io::UInt128Wrapper((static_cast(entry.hash().high()) << 64) | + entry.hash().low()); + offset = entry.offset(); + size = entry.size(); + + std::cout << hash.to_string() << " " << offset << " " << size << std::endl; + + // Remove processed entry + current_parse_group.mutable_entries()->erase(current_parse_group.entries().begin()); + return Status::OK(); +} + +int main(int argc, char** argv) { + std::string usage = get_usage(argv[0]); + gflags::SetUsageMessage(usage); + google::ParseCommandLineFlags(&argc, &argv, true); + + std::ifstream in(FLAGS_filename, std::ios::binary); + size_t entry_num; + doris::io::cache::LRUDumpMetaPb parse_meta; + doris::io::cache::LRUDumpEntryGroupPb current_parse_group; + auto s = parse_dump_footer(in, FLAGS_filename, entry_num, parse_meta, current_parse_group); + + in.seekg(0, std::ios::beg); + io::UInt128Wrapper hash; + size_t offset, size; + for (int i = 0; i < entry_num; ++i) { + EXIT_IF_ERROR(parse_one_lru_entry(in, FLAGS_filename, hash, offset, size, parse_meta, + current_parse_group)); + } + + return 0; +} diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 6af81ce78b7864..3df56973af7149 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -254,8 +254,8 @@ Status FSFileCacheStorage::change_key_meta_type(const FileCacheKey& key, const F if (!expr) { LOG(WARNING) << "TTL type file dose not need to change the suffix" << " key=" << key.hash.to_string() << " offset=" << key.offset - << " old_type=" << BlockFileCache::cache_type_to_string(key.meta.type) - << " new_type=" << BlockFileCache::cache_type_to_string(type); + << " old_type=" << cache_type_to_string(key.meta.type) + << " new_type=" << cache_type_to_string(type); } DCHECK(expr); std::string dir = get_path_in_local_cache(key.hash, key.meta.expiration_time); @@ -288,7 +288,7 @@ std::string FSFileCacheStorage::get_path_in_local_cache(const std::string& dir, } else if (type == FileCacheType::TTL) { return Path(dir) / std::to_string(offset); } else { - return Path(dir) / (std::to_string(offset) + BlockFileCache::cache_type_to_string(type)); + return Path(dir) / (std::to_string(offset) + cache_type_to_surfix(type)); } } @@ -297,7 +297,7 @@ std::string FSFileCacheStorage::get_path_in_local_cache_old_ttl_format(const std FileCacheType type, bool is_tmp) { DCHECK(type == FileCacheType::TTL); - return Path(dir) / (std::to_string(offset) + BlockFileCache::cache_type_to_string(type)); + return Path(dir) / (std::to_string(offset) + cache_type_to_surfix(type)); } std::vector FSFileCacheStorage::get_path_in_local_cache_all_candidates( @@ -607,7 +607,7 @@ Status FSFileCacheStorage::parse_filename_suffix_to_cache_type( if (suffix == "tmp") [[unlikely]] { *is_tmp = true; } else { - *cache_type = BlockFileCache::string_to_cache_type(suffix); + *cache_type = surfix_to_cache_type(suffix); } } } catch (...) { @@ -663,6 +663,7 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. if (_mgr->_files.contains(args.hash) && _mgr->_files[args.hash].contains(args.offset)) { + // TODO(zhengyu): update type&expiration if need return; } // if the file is tmp, it means it is the old file and it should be removed diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp new file mode 100644 index 00000000000000..8308a2a73ad6e3 --- /dev/null +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/lru_queue_recorder.h" + +#include "io/cache/block_file_cache.h" +#include "io/cache/file_cache_common.h" + +namespace doris::io { + +void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType log_type, + const UInt128Wrapper hash, const size_t offset, + const size_t size) { + CacheLRULogQueue& log_queue = get_lru_log_queue(type); + log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); + ++(_lru_queue_update_cnt_from_last_dump[type]); +} + +void LRUQueueRecorder::replay_queue_event(FileCacheType type) { + // we don't need the real cache lock for the shadow queue, but we do need a lock to prevent read/write contension + CacheLRULogQueue& log_queue = get_lru_log_queue(type); + LRUQueue& shadow_queue = get_shadow_queue(type); + + std::lock_guard lru_log_lock(_mutex_lru_log); + std::unique_ptr log; + while (log_queue.try_dequeue(log)) { + try { + switch (log->type) { + case CacheLRULogType::ADD: { + shadow_queue.add(log->hash, log->offset, log->size, lru_log_lock); + break; + } + case CacheLRULogType::REMOVE: { + auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); + if (it != std::list::iterator()) { + shadow_queue.remove(it, lru_log_lock); + } else { + LOG(WARNING) << "REMOVE failed, doesn't exist in shadow queue"; + } + break; + } + case CacheLRULogType::MOVETOBACK: { + auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); + if (it != std::list::iterator()) { + shadow_queue.move_to_end(it, lru_log_lock); + } else { + LOG(WARNING) << "MOVETOBACK failed, doesn't exist in shadow queue"; + } + break; + } + default: + LOG(WARNING) << "Unknown CacheLRULogType: " << static_cast(log->type); + break; + } + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to replay queue event: " << e.what(); + } + } +} + +// we evaluate the diff between two queue by calculate how many operation is +// needed for transfer one to another (Levenshtein Distance) +// NOTE: HEAVY calculation with cache lock, only for debugging +void LRUQueueRecorder::evaluate_queue_diff(LRUQueue& base, std::string name, + std::lock_guard& cache_lock) { + FileCacheType type = string_to_cache_type(name); + LRUQueue& target = get_shadow_queue(type); + size_t distance = target.levenshtein_distance_from(base, cache_lock); + *(_mgr->_shadow_queue_levenshtein_distance) << distance; + if (distance > 20) { + LOG(WARNING) << name << " shadow queue is different from real queue"; + } +} + +LRUQueue& LRUQueueRecorder::get_shadow_queue(FileCacheType type) { + switch (type) { + case FileCacheType::INDEX: + return _shadow_index_queue; + case FileCacheType::DISPOSABLE: + return _shadow_disposable_queue; + case FileCacheType::NORMAL: + return _shadow_normal_queue; + case FileCacheType::TTL: + return _shadow_ttl_queue; + default: + LOG(WARNING) << "invalid shadow queue type"; + DCHECK(false); + } + return _shadow_normal_queue; +} + +CacheLRULogQueue& LRUQueueRecorder::get_lru_log_queue(FileCacheType type) { + switch (type) { + case FileCacheType::INDEX: + return _index_lru_log_queue; + case FileCacheType::DISPOSABLE: + return _disposable_lru_log_queue; + case FileCacheType::NORMAL: + return _normal_lru_log_queue; + case FileCacheType::TTL: + return _ttl_lru_log_queue; + default: + LOG(WARNING) << "invalid lru log queue type"; + DCHECK(false); + } + return _normal_lru_log_queue; +} + +size_t LRUQueueRecorder::get_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + return _lru_queue_update_cnt_from_last_dump[type]; +} + +void LRUQueueRecorder::reset_lru_queue_update_cnt_from_last_dump(FileCacheType type) { + _lru_queue_update_cnt_from_last_dump[type] = 0; +} + +} // end of namespace doris::io diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h new file mode 100644 index 00000000000000..1f6d69493cf4a8 --- /dev/null +++ b/be/src/io/cache/lru_queue_recorder.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include + +#include "io/cache/file_cache_common.h" + +namespace doris::io { + +class LRUQueue; + +enum class CacheLRULogType { + ADD = 0, // all of the integer types + REMOVE = 1, + MOVETOBACK = 2, + INVALID = 3, +}; + +struct CacheLRULog { + CacheLRULogType type = CacheLRULogType::INVALID; + UInt128Wrapper hash; + size_t offset; + size_t size; + + CacheLRULog(CacheLRULogType t, UInt128Wrapper h, size_t o, size_t s) + : type(t), hash(h), offset(o), size(s) {} +}; + +using CacheLRULogQueue = moodycamel::ConcurrentQueue>; + +class LRUQueueRecorder { +public: + LRUQueueRecorder(BlockFileCache* mgr) : _mgr(mgr) { + _lru_queue_update_cnt_from_last_dump[FileCacheType::DISPOSABLE] = 0; + _lru_queue_update_cnt_from_last_dump[FileCacheType::NORMAL] = 0; + _lru_queue_update_cnt_from_last_dump[FileCacheType::INDEX] = 0; + _lru_queue_update_cnt_from_last_dump[FileCacheType::TTL] = 0; + } + void record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash, + const size_t offset, const size_t size); + void replay_queue_event(FileCacheType type); + void evaluate_queue_diff(LRUQueue& base, std::string name, + std::lock_guard& cache_lock); + size_t get_lru_queue_update_cnt_from_last_dump(FileCacheType type); + void reset_lru_queue_update_cnt_from_last_dump(FileCacheType type); + + CacheLRULogQueue& get_lru_log_queue(FileCacheType type); + LRUQueue& get_shadow_queue(FileCacheType type); + +public: + std::mutex _mutex_lru_log; + +private: + LRUQueue _shadow_index_queue; + LRUQueue _shadow_normal_queue; + LRUQueue _shadow_disposable_queue; + LRUQueue _shadow_ttl_queue; + + CacheLRULogQueue _ttl_lru_log_queue; + CacheLRULogQueue _index_lru_log_queue; + CacheLRULogQueue _normal_lru_log_queue; + CacheLRULogQueue _disposable_lru_log_queue; + + std::unordered_map _lru_queue_update_cnt_from_last_dump; + + BlockFileCache* _mgr; +}; + +} // namespace doris::io diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 86a1e4c6c73066..fac27382036612 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -154,6 +154,8 @@ class ExecEnv { static bool ready() { return _s_ready.load(std::memory_order_acquire); } static bool tracking_memory() { return _s_tracking_memory.load(std::memory_order_acquire); } + static bool get_is_upgrading() { return _s_upgrading.load(std::memory_order_acquire); } + static void set_is_upgrading() { _s_upgrading = true; } const std::string& token() const; ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; } vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; } @@ -378,6 +380,7 @@ class ExecEnv { inline static std::atomic_bool _s_tracking_memory {false}; std::vector _store_paths; std::vector _spill_store_paths; + inline static std::atomic_bool _s_upgrading {false}; io::FileCacheFactory* _file_cache_factory = nullptr; UserFunctionCache* _user_function_cache = nullptr; diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index 1408919fe1784a..2679dcf91ad7d8 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -18,69 +18,14 @@ // https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp // and modified by Doris -#include -#include -#include -#include -#if defined(__APPLE__) -#include -#else -#include -#endif - -// IWYU pragma: no_include -#include - -#include // IWYU pragma: keep -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "common/config.h" -#include "cpp/sync_point.h" -#include "gtest/gtest_pred_impl.h" -#include "io/cache/block_file_cache.h" -#include "io/cache/block_file_cache_factory.h" -#include "io/cache/block_file_cache_profile.h" -#include "io/cache/cached_remote_file_reader.h" -#include "io/cache/file_block.h" -#include "io/cache/file_cache_common.h" -#include "io/cache/fs_file_cache_storage.h" -#include "io/fs/path.h" -#include "olap/options.h" -#include "runtime/exec_env.h" -#include "util/slice.h" -#include "util/time.h" +#include "block_file_cache_test_common.h" namespace doris::io { -extern int disk_used_percentage(const std::string& path, std::pair* percent); - -namespace fs = std::filesystem; - fs::path caches_dir = fs::current_path() / "lru_cache_test"; std::string cache_base_path = caches_dir / "cache1" / ""; std::string tmp_file = caches_dir / "tmp_file"; -constexpr unsigned long long operator"" _mb(unsigned long long m) { - return m * 1024 * 1024; -} - -constexpr unsigned long long operator"" _kb(unsigned long long m) { - return m * 1024; -} - void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_block, const io::FileBlock::Range& expected_range, io::FileBlock::State expected_state) { auto range = file_block->range(); @@ -94,7 +39,7 @@ std::vector fromHolder(const io::FileBlocksHolder& holder) { return std::vector(holder.file_blocks.begin(), holder.file_blocks.end()); } -void download(io::FileBlockSPtr file_block, size_t size = 0) { +void download(io::FileBlockSPtr file_block, size_t size) { const auto& hash = file_block->get_hash_value(); if (size == 0) { size = file_block->range().size(); @@ -113,7 +58,7 @@ void download(io::FileBlockSPtr file_block, size_t size = 0) { ASSERT_TRUE(fs::exists(subdir)); } -void download_into_memory(io::FileBlockSPtr file_block, size_t size = 0) { +void download_into_memory(io::FileBlockSPtr file_block, size_t size) { if (size == 0) { size = file_block->range().size(); } @@ -138,101 +83,6 @@ void complete_into_memory(const io::FileBlocksHolder& holder) { } } -class BlockFileCacheTest : public testing::Test { -public: - static void SetUpTestSuite() { - config::file_cache_enter_disk_resource_limit_mode_percent = 99; - config::enable_evict_file_cache_in_advance = false; // disable evict in - // advance for most - // cases for simple - // verification - bool exists {false}; - ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok()); - if (!exists) { - ASSERT_TRUE(global_local_filesystem()->create_directory(caches_dir).ok()); - } - ASSERT_TRUE(global_local_filesystem()->exists(tmp_file, &exists).ok()); - if (!exists) { - FileWriterPtr writer; - ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file, &writer).ok()); - for (int i = 0; i < 10; i++) { - std::string data(1_mb, '0' + i); - ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok()); - } - std::string data(1, '0'); - ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok()); - ASSERT_TRUE(writer->close().ok()); - } - ExecEnv::GetInstance()->_file_cache_factory = factory.get(); - ExecEnv::GetInstance()->_file_cache_open_fd_cache = std::make_unique(); - } - static void TearDownTestSuite() { - config::file_cache_enter_disk_resource_limit_mode_percent = 99; - ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr); - } - -private: - inline static std::unique_ptr factory = std::make_unique(); -}; - -TEST_F(BlockFileCacheTest, init) { - std::string string = std::string(R"( - [ - { - "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", - "total_size" : 193273528320, - "query_limit" : 38654705664 - }, - { - "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", - "total_size" : 193273528320, - "query_limit" : 38654705664 - } - ] - )"); - config::enable_file_cache_query_limit = true; - std::vector cache_paths; - EXPECT_TRUE(parse_conf_cache_paths(string, cache_paths)); - EXPECT_EQ(cache_paths.size(), 2); - for (const auto& cache_path : cache_paths) { - io::FileCacheSettings settings = cache_path.init_settings(); - EXPECT_EQ(settings.capacity, 193273528320); - EXPECT_EQ(settings.max_query_cache_size, 38654705664); - } - - // err normal - std::string err_string = std::string(R"( - [ - { - "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", - "total_size" : "193273528320", - "query_limit" : -1 - } - ] - )"); - cache_paths.clear(); - EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); - - // err query_limit - err_string = std::string(R"( - [ - { - "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", - "total_size" : -1 - } - ] - )"); - cache_paths.clear(); - EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); - - err_string = std::string(R"( - [ - ] - )"); - cache_paths.clear(); - EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); -} - void test_file_cache(io::FileCacheType cache_type) { TUniqueId query_id; query_id.hi = 1; @@ -1060,6 +910,64 @@ void test_file_cache_memory_storage(io::FileCacheType cache_type) { } } +TEST_F(BlockFileCacheTest, init) { + std::string string = std::string(R"( + [ + { + "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", + "total_size" : 193273528320, + "query_limit" : 38654705664 + }, + { + "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", + "total_size" : 193273528320, + "query_limit" : 38654705664 + } + ] + )"); + config::enable_file_cache_query_limit = true; + std::vector cache_paths; + EXPECT_TRUE(parse_conf_cache_paths(string, cache_paths)); + EXPECT_EQ(cache_paths.size(), 2); + for (const auto& cache_path : cache_paths) { + io::FileCacheSettings settings = cache_path.init_settings(); + EXPECT_EQ(settings.capacity, 193273528320); + EXPECT_EQ(settings.max_query_cache_size, 38654705664); + } + + // err normal + std::string err_string = std::string(R"( + [ + { + "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", + "total_size" : "193273528320", + "query_limit" : -1 + } + ] + )"); + cache_paths.clear(); + EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); + + // err query_limit + err_string = std::string(R"( + [ + { + "path" : "/mnt/ssd01/clickbench/hot/be/file_cache", + "total_size" : -1 + } + ] + )"); + cache_paths.clear(); + EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); + + err_string = std::string(R"( + [ + ] + )"); + cache_paths.clear(); + EXPECT_FALSE(parse_conf_cache_paths(err_string, cache_paths)); +} + TEST_F(BlockFileCacheTest, normal) { if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); @@ -3489,10 +3397,10 @@ TEST_F(BlockFileCacheTest, state_to_string) { EXPECT_EQ(FileBlock::state_to_string(FileBlock::State::DOWNLOADED), "DOWNLOADED"); } -TEST_F(BlockFileCacheTest, string_to_cache_type) { - EXPECT_EQ(BlockFileCache::string_to_cache_type("idx"), FileCacheType::INDEX); - EXPECT_EQ(BlockFileCache::string_to_cache_type("disposable"), FileCacheType::DISPOSABLE); - EXPECT_EQ(BlockFileCache::string_to_cache_type("ttl"), FileCacheType::TTL); +TEST_F(BlockFileCacheTest, surfix_to_cache_type) { + EXPECT_EQ(surfix_to_cache_type("idx"), FileCacheType::INDEX); + EXPECT_EQ(surfix_to_cache_type("disposable"), FileCacheType::DISPOSABLE); + EXPECT_EQ(surfix_to_cache_type("ttl"), FileCacheType::TTL); } TEST_F(BlockFileCacheTest, append_many_time) { @@ -4002,6 +3910,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { } EXPECT_TRUE(reader.close().ok()); EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -4062,6 +3971,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) { CachedRemoteFileReader reader(local_reader, opts); EXPECT_EQ(reader._cache->get_base_path(), cache_base_path); } + std::this_thread::sleep_for(std::chrono::seconds(1)); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -5341,7 +5251,7 @@ TEST_F(BlockFileCacheTest, test_load) { { auto type = cache.dump_single_cache_type(key, 10086); - ASSERT_TRUE(type == "_ttl"); + ASSERT_TRUE(type == "ttl"); auto holder = cache.get_or_set(key, 10086, 3, context); auto blocks = fromHolder(holder); ASSERT_EQ(blocks.size(), 1); @@ -5362,7 +5272,7 @@ TEST_F(BlockFileCacheTest, test_load) { } { auto type = cache.dump_single_cache_type(key, 20086); - ASSERT_TRUE(type == "_ttl"); + ASSERT_TRUE(type == "ttl"); auto holder = cache.get_or_set(key, 20086, 3, context); auto blocks = fromHolder(holder); ASSERT_EQ(blocks.size(), 1); @@ -5987,9 +5897,8 @@ TEST_F(BlockFileCacheTest, seize_after_full) { }; for (auto& args : args_vec) { - std::cout << "filled with " << io::BlockFileCache::cache_type_to_string(args.first_type) - << " and seize with " - << io::BlockFileCache::cache_type_to_string(args.second_type) << std::endl; + std::cout << "filled with " << io::cache_type_to_string(args.first_type) + << " and seize with " << io::cache_type_to_string(args.second_type) << std::endl; if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } diff --git a/be/test/io/cache/block_file_cache_test_common.h b/be/test/io/cache/block_file_cache_test_common.h new file mode 100644 index 00000000000000..0bf4acf2466781 --- /dev/null +++ b/be/test/io/cache/block_file_cache_test_common.h @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp +// and modified by Doris + +#pragma once + +#include +#include +#include +#include + +#include "runtime/thread_context.h" +#if defined(__APPLE__) +#include +#else +#include +#endif + +// IWYU pragma: no_include +#include + +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "cpp/sync_point.h" +#include "gtest/gtest_pred_impl.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/block_file_cache_profile.h" +#include "io/cache/cached_remote_file_reader.h" +#include "io/cache/file_block.h" +#include "io/cache/file_cache_common.h" +#include "io/cache/fs_file_cache_storage.h" +#include "io/fs/path.h" +#include "olap/options.h" +#include "runtime/exec_env.h" +#include "util/slice.h" +#include "util/time.h" + +namespace doris::io { +namespace fs = std::filesystem; + +extern int disk_used_percentage(const std::string& path, std::pair* percent); +extern fs::path caches_dir; +extern std::string cache_base_path; +extern std::string tmp_file; + +constexpr unsigned long long operator"" _mb(unsigned long long m) { + return m * 1024 * 1024; +} + +constexpr unsigned long long operator"" _kb(unsigned long long m) { + return m * 1024; +} + +extern void assert_range([[maybe_unused]] size_t assert_n, io::FileBlockSPtr file_block, + const io::FileBlock::Range& expected_range, + io::FileBlock::State expected_state); +extern std::vector fromHolder(const io::FileBlocksHolder& holder); +extern void download(io::FileBlockSPtr file_block, size_t size = 0); +extern void download_into_memory(io::FileBlockSPtr file_block, size_t size = 0); +extern void complete(const io::FileBlocksHolder& holder); +extern void complete_into_memory(const io::FileBlocksHolder& holder); +extern void test_file_cache(io::FileCacheType cache_type); +extern void test_file_cache_memory_storage(io::FileCacheType cache_type); + +class BlockFileCacheTest : public testing::Test { +public: + static void SetUpTestSuite() { + config::file_cache_enter_disk_resource_limit_mode_percent = 99; + config::enable_evict_file_cache_in_advance = false; // disable evict in + // advance for most + // cases for simple + // verification + bool exists {false}; + ASSERT_TRUE(global_local_filesystem()->exists(caches_dir, &exists).ok()); + if (!exists) { + ASSERT_TRUE(global_local_filesystem()->create_directory(caches_dir).ok()); + } + ASSERT_TRUE(global_local_filesystem()->exists(tmp_file, &exists).ok()); + if (!exists) { + FileWriterPtr writer; + ASSERT_TRUE(global_local_filesystem()->create_file(tmp_file, &writer).ok()); + for (int i = 0; i < 10; i++) { + std::string data(1_mb, '0' + i); + ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok()); + } + std::string data(1, '0'); + ASSERT_TRUE(writer->append(Slice(data.data(), data.size())).ok()); + ASSERT_TRUE(writer->close().ok()); + } + ExecEnv::GetInstance()->_file_cache_factory = factory.get(); + ExecEnv::GetInstance()->_file_cache_open_fd_cache = std::make_unique(); + } + static void TearDownTestSuite() { + config::file_cache_enter_disk_resource_limit_mode_percent = 99; + ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr); + } + +private: + inline static std::unique_ptr factory = std::make_unique(); +}; + +} // end of namespace doris::io \ No newline at end of file diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp new file mode 100644 index 00000000000000..baa953ac7ff57c --- /dev/null +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -0,0 +1,496 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/tests/gtest_lru_file_cache.cpp +// and modified by Doris + +#include "block_file_cache_test_common.h" + +namespace doris::io { + +TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { + config::enable_evict_file_cache_in_advance = false; + config::file_cache_enter_disk_resource_limit_mode_percent = 99; + config::file_cache_background_lru_dump_interval_ms = 3000; + config::file_cache_background_lru_dump_update_cnt_threshold = 0; + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 5000000; + settings.query_queue_elements = 50000; + settings.index_queue_size = 5000000; + settings.index_queue_elements = 50000; + settings.disposable_queue_size = 5000000; + settings.disposable_queue_elements = 50000; + settings.capacity = 20000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + ReadStatistics rstats; + context1.stats = &rstats; + context1.cache_type = io::FileCacheType::NORMAL; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context2; + context2.stats = &rstats; + context2.cache_type = io::FileCacheType::INDEX; + context2.query_id = query_id; + auto key2 = io::BlockFileCache::hash("key2"); + + offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache.get_or_set(key2, offset, 100000, context2); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + io::CacheContext context3; + context3.stats = &rstats; + context3.cache_type = io::FileCacheType::TTL; + context3.query_id = query_id; + context3.expiration_time = UnixSeconds() + 120; + auto key3 = io::BlockFileCache::hash("key3"); + + offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache.get_or_set(key3, offset, 100000, context3); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + + io::CacheContext context4; + context4.stats = &rstats; + context4.cache_type = io::FileCacheType::DISPOSABLE; + context4.query_id = query_id; + auto key4 = io::BlockFileCache::hash("key4"); + + offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache.get_or_set(key4, offset, 100000, context4); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 500000); + ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 500000); + ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 500000); + ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000); + + // all queue are filled, let's check the lru log records + ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5); + + // then check the log replay + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_lru_log_replay_interval_ms)); + ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(), 5); + + // ok, let do some MOVETOBACK & REMOVE + { + auto holder = cache.get_or_set(key2, 200000, 100000, + context2); // move index queue 3rd element to the end + cache.remove_if_cached(key3); // remove all element from ttl queue + } + ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1); + ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0); + ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0); + + std::this_thread::sleep_for(std::chrono::milliseconds( + 2 * config::file_cache_background_lru_log_replay_interval_ms)); + ASSERT_EQ(cache._lru_recorder->_shadow_ttl_queue.get_elements_num_unsafe(), 0); + ASSERT_EQ(cache._lru_recorder->_shadow_index_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache._lru_recorder->_shadow_normal_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache._lru_recorder->_shadow_disposable_queue.get_elements_num_unsafe(), 5); + + // check the order + std::vector offsets; + for (auto it = cache._lru_recorder->_shadow_index_queue.begin(); + it != cache._lru_recorder->_shadow_index_queue.end(); ++it) { + offsets.push_back(it->offset); + } + ASSERT_EQ(offsets.size(), 5); + ASSERT_EQ(offsets[0], 0); + ASSERT_EQ(offsets[1], 100000); + ASSERT_EQ(offsets[2], 300000); + ASSERT_EQ(offsets[3], 400000); + ASSERT_EQ(offsets[4], 200000); + + std::this_thread::sleep_for( + std::chrono::milliseconds(2 * config::file_cache_background_lru_dump_interval_ms)); + +#if 0 + // Verify all 4 dump files + // TODO(zhengyu): abstract those read/write into a function + { + std::string filename = fmt::format("{}/lru_dump_{}.tail", cache_base_path, "ttl"); + + struct stat file_stat; + EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << filename << " not found"; + + EXPECT_EQ(file_stat.st_size, 12) << "File " << filename << " has more data than footer"; + std::ifstream in(filename, std::ios::binary); + ASSERT_TRUE(in) << "Failed to open " << filename; + size_t entry_num = 0; + int8_t version = 0; + char magic_str[3]; + char target_str[3] = {'D', 'O', 'R'}; + in.read(reinterpret_cast(&entry_num), sizeof(entry_num)); + in.read(reinterpret_cast(&version), sizeof(version)); + in.read(magic_str, sizeof(magic_str)); + EXPECT_EQ(entry_num, 0); + EXPECT_EQ(version, 1); + EXPECT_TRUE(memcmp(magic_str, target_str, sizeof(magic_str)) == 0); + } + + { + std::string filename = fmt::format("{}/lru_dump_{}.tail", cache_base_path, "normal"); + + struct stat file_stat; + EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << filename << " not found"; + + EXPECT_GT(file_stat.st_size, 12) << "File " << filename << " is empty"; + + std::ifstream in(filename, std::ios::binary); + ASSERT_TRUE(in) << "Failed to open " << filename; + UInt128Wrapper hash; + size_t offset, size; + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 0) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 100000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 200000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 300000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key1")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 400000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + EXPECT_TRUE(in.fail()) << "still read from " << filename << " which should be EOF"; + } + + { + std::string filename = fmt::format("{}/lru_dump_{}.tail", cache_base_path, "index"); + + struct stat file_stat; + EXPECT_EQ(stat(filename.c_str(), &file_stat), 0) << "File " << filename << " not found"; + + EXPECT_GT(file_stat.st_size, 12) << "File " << filename << " is empty"; + + std::ifstream in(filename, std::ios::binary); + ASSERT_TRUE(in) << "Failed to open " << filename; + UInt128Wrapper hash; + size_t offset, size; + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 0) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 100000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 300000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 400000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + in.read(reinterpret_cast(&offset), sizeof(offset)); + in.read(reinterpret_cast(&size), sizeof(size)); + + EXPECT_FALSE(in.fail()) << "Failed to read from " << filename; + EXPECT_EQ(hash, io::BlockFileCache::hash("key2")) << "wrong hash value in " << filename; + EXPECT_EQ(offset, 200000) << "wrong offset value in " << filename; + EXPECT_EQ(size, 100000) << "wrong size value in " << filename; + + in.read(reinterpret_cast(&hash), sizeof(hash)); + EXPECT_TRUE(in.fail()) << "still read from " << filename << " which should be EOF"; + } +#endif + + // dump looks good, let's try restore + io::BlockFileCache cache2(cache_base_path, settings); + ASSERT_TRUE(cache2.initialize()); + for (i = 0; i < 100; i++) { + if (cache2.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache2.get_async_open_success()); + + // check the size of cache2 + ASSERT_EQ(cache2._ttl_queue.get_elements_num_unsafe(), 0); + ASSERT_EQ(cache2._index_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache2._normal_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache2._disposable_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache2._cur_cache_size, 1500000); + + // then check the order of restored cache2 + std::vector offsets2; + for (auto it = cache2._index_queue.begin(); it != cache2._index_queue.end(); ++it) { + offsets2.push_back(it->offset); + } + ASSERT_EQ(offsets2.size(), 5); + ASSERT_EQ(offsets2[0], 0); + ASSERT_EQ(offsets2[1], 100000); + ASSERT_EQ(offsets2[2], 300000); + ASSERT_EQ(offsets2[3], 400000); + ASSERT_EQ(offsets2[4], 200000); + + io::CacheContext context22; + context22.stats = &rstats; + context22.cache_type = io::FileCacheType::INDEX; + context22.query_id = query_id; + + offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache2.get_or_set(key2, offset, 100000, context22); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + blocks.clear(); + } + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +TEST_F(BlockFileCacheTest, test_lru_duplicate_queue_entry_restore) { + config::enable_evict_file_cache_in_advance = false; + config::file_cache_enter_disk_resource_limit_mode_percent = 99; + config::file_cache_background_lru_dump_interval_ms = 3000; + config::file_cache_background_lru_dump_update_cnt_threshold = 0; + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } + fs::create_directories(cache_base_path); + TUniqueId query_id; + query_id.hi = 1; + query_id.lo = 1; + io::FileCacheSettings settings; + + settings.ttl_queue_size = 5000000; + settings.ttl_queue_elements = 50000; + settings.query_queue_size = 5000000; + settings.query_queue_elements = 50000; + settings.index_queue_size = 5000000; + settings.index_queue_elements = 50000; + settings.disposable_queue_size = 5000000; + settings.disposable_queue_elements = 50000; + settings.capacity = 20000000; + settings.max_file_block_size = 100000; + settings.max_query_cache_size = 30; + + io::BlockFileCache cache(cache_base_path, settings); + ASSERT_TRUE(cache.initialize()); + int i = 0; + for (; i < 100; i++) { + if (cache.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + ASSERT_TRUE(cache.get_async_open_success()); + + io::CacheContext context1; + ReadStatistics rstats; + context1.stats = &rstats; + context1.cache_type = io::FileCacheType::NORMAL; + context1.query_id = query_id; + auto key1 = io::BlockFileCache::hash("key1"); + + int64_t offset = 0; + + for (; offset < 500000; offset += 100000) { + auto holder = cache.get_or_set(key1, offset, 100000, context1); + auto blocks = fromHolder(holder); + ASSERT_EQ(blocks.size(), 1); + + assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::EMPTY); + ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id()); + download(blocks[0]); + assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 99999), + io::FileBlock::State::DOWNLOADED); + + blocks.clear(); + } + + std::this_thread::sleep_for( + std::chrono::milliseconds(2 * config::file_cache_background_lru_dump_interval_ms)); + + // now we have NORMAL queue dump, let's copy the dump and name it as TTL to create dup + std::filesystem::path src = cache_base_path / "lru_dump_normal.tail"; + std::filesystem::path dst = cache_base_path / "lru_dump_ttl.tail"; + std::filesystem::copy(src, dst); + + // let's try restore + io::BlockFileCache cache2(cache_base_path, settings); + ASSERT_TRUE(cache2.initialize()); + for (i = 0; i < 100; i++) { + if (cache2.get_async_open_success()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_TRUE(cache2.get_async_open_success()); + + // the dup part should be ttl because ttl has higner priority + ASSERT_EQ(cache2._ttl_queue.get_elements_num_unsafe(), 5); + ASSERT_EQ(cache2._index_queue.get_elements_num_unsafe(), 0); + ASSERT_EQ(cache2._normal_queue.get_elements_num_unsafe(), 0); + ASSERT_EQ(cache2._disposable_queue.get_elements_num_unsafe(), 0); + ASSERT_EQ(cache2._cur_cache_size, 500000); + + if (fs::exists(cache_base_path)) { + fs::remove_all(cache_base_path); + } +} + +} // namespace doris::io diff --git a/be/test/io/cache/cache_lru_dumper_test.cpp b/be/test/io/cache/cache_lru_dumper_test.cpp new file mode 100644 index 00000000000000..76647ba544fa05 --- /dev/null +++ b/be/test/io/cache/cache_lru_dumper_test.cpp @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/cache/cache_lru_dumper.h" + +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/file_block.h" +#include "io/cache/file_cache_common.h" + +using ::testing::_; +using ::testing::Return; +using ::testing::NiceMock; + +namespace doris::io { +std::mutex _mutex; + +static const std::string test_dir = "./cache_lru_dumper_test_dir/"; + +class MockBlockFileCache : public BlockFileCache { +public: + LRUQueue* dst_queue; // Pointer to the destination queue + + MockBlockFileCache(LRUQueue* queue) : BlockFileCache("", {}), dst_queue(queue) { + _cache_base_path = test_dir; + } + + FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& ctx, size_t offset, + size_t size, FileBlock::State state, + std::lock_guard& lock) { + static std::unordered_set added_entries; + std::string key = hash.to_string() + ":" + std::to_string(offset); + + if (added_entries.find(key) != added_entries.end()) { + std::cerr << "Error: Duplicate entry detected for hash: " << key << std::endl; + EXPECT_TRUE(false); + return nullptr; + } + + added_entries.insert(key); + dst_queue->add(hash, offset, size, lock); + return nullptr; + } + + std::mutex& mutex() { return _mutex; } + +private: + std::mutex _mutex; + struct { + std::string _cache_base_path; + } _mgr; +}; + +class CacheLRUDumperTest : public ::testing::Test { +protected: + LRUQueue dst_queue; // Member variable for destination queue + + void SetUp() override { + std::filesystem::remove_all(test_dir); + std::filesystem::create_directory(test_dir); + + mock_cache = std::make_unique>(&dst_queue); + recorder = std::make_unique(mock_cache.get()); + + dumper = std::make_unique(mock_cache.get(), recorder.get()); + } + + void TearDown() override { + dumper.reset(); + mock_cache.reset(); + std::filesystem::remove_all(test_dir); + } + + std::unique_ptr> mock_cache; + std::unique_ptr dumper; + std::unique_ptr recorder; +}; + +TEST_F(CacheLRUDumperTest, test_finalize_dump_and_parse_dump_footer) { + std::string tmp_filename = test_dir + "test_finalize.bin.tmp"; + std::string final_filename = test_dir + "test_finalize.bin"; + std::ofstream out(tmp_filename, std::ios::binary); + size_t file_size = 0; + size_t entry_num = 10; + + // Test finalize dump + EXPECT_TRUE( + dumper->finalize_dump(out, entry_num, tmp_filename, final_filename, file_size).ok()); + + // Test parse footer + std::ifstream in(final_filename, std::ios::binary); + size_t parsed_entry_num = 0; + EXPECT_TRUE(dumper->parse_dump_footer(in, final_filename, parsed_entry_num).ok()); + EXPECT_EQ(entry_num, parsed_entry_num); + in.close(); +} + +TEST_F(CacheLRUDumperTest, test_remove_lru_dump_files) { + // Create test files + std::vector queue_names = {"disposable", "index", "normal", "ttl"}; + for (const auto& name : queue_names) { + std::ofstream(fmt::format("{}lru_dump_{}.tail", test_dir, name)); + } + + // Test remove + dumper->remove_lru_dump_files(); + + // Verify files are removed + for (const auto& name : queue_names) { + EXPECT_FALSE(std::filesystem::exists(fmt::format("{}lru_dump_{}.tail", test_dir, name))); + } +} + +TEST_F(CacheLRUDumperTest, test_dump_and_restore_queue) { + LRUQueue src_queue; + std::string queue_name = "normal"; + + // Add test data + UInt128Wrapper hash(123456789ULL); + size_t offset = 1024; + size_t size = 4096; + std::lock_guard lock(_mutex); + src_queue.add(hash, offset, size, lock); + + // Test dump + dumper->do_dump_queue(src_queue, queue_name); + + // Test restore + std::lock_guard cache_lock(mock_cache->mutex()); + dumper->restore_queue(dst_queue, queue_name, cache_lock); + + // Verify queue content and order + auto src_it = src_queue.begin(); + auto dst_it = dst_queue.begin(); + while (src_it != src_queue.end() && dst_it != dst_queue.end()) { + EXPECT_EQ(src_it->hash, dst_it->hash); + EXPECT_EQ(src_it->offset, dst_it->offset); + EXPECT_EQ(src_it->size, dst_it->size); + ++src_it; + ++dst_it; + } +} + +} // namespace doris::io \ No newline at end of file diff --git a/be/test/io/cache/lru_queue_test.cpp b/be/test/io/cache/lru_queue_test.cpp new file mode 100644 index 00000000000000..4a01fb27e3dcfe --- /dev/null +++ b/be/test/io/cache/lru_queue_test.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +// IWYU pragma: no_include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "cpp/sync_point.h" +#include "gtest/gtest_pred_impl.h" +#include "io/cache/block_file_cache.h" +#include "util/time.h" + +using namespace doris::io; + +class LRUQueueTest : public ::testing::Test { +protected: + void SetUp() override { + queue1 = std::make_shared(); + queue2 = std::make_shared(); + } + + std::shared_ptr queue1; + std::shared_ptr queue2; +}; + +TEST_F(LRUQueueTest, SameQueueDistance) { + std::mutex mutex; + std::lock_guard lock(mutex); + + queue1->add(UInt128Wrapper(123), 0, 1024, lock); + queue1->add(UInt128Wrapper(456), 0, 1024, lock); + + EXPECT_EQ(queue1->levenshtein_distance_from(*queue1, lock), 0); +} + +TEST_F(LRUQueueTest, DifferentQueueDistance) { + std::mutex mutex; + std::lock_guard lock(mutex); + + queue1->add(UInt128Wrapper(123), 0, 1024, lock); + queue1->add(UInt128Wrapper(456), 0, 1024, lock); + + queue2->add(UInt128Wrapper(123), 0, 1024, lock); + queue2->add(UInt128Wrapper(789), 0, 1024, lock); + + EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 1); +} + +TEST_F(LRUQueueTest, EmptyQueueDistance) { + std::mutex mutex; + std::lock_guard lock(mutex); + + queue1->add(UInt128Wrapper(123), 0, 1024, lock); + queue1->add(UInt128Wrapper(456), 0, 1024, lock); + + EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2); +} + +TEST_F(LRUQueueTest, PartialMatchDistance) { + std::mutex mutex; + std::lock_guard lock(mutex); + + queue1->add(UInt128Wrapper(123), 0, 1024, lock); + queue1->add(UInt128Wrapper(456), 0, 1024, lock); + queue1->add(UInt128Wrapper(789), 0, 1024, lock); + + queue2->add(UInt128Wrapper(123), 0, 1024, lock); + queue2->add(UInt128Wrapper(101), 0, 1024, lock); + queue2->add(UInt128Wrapper(789), 0, 1024, lock); + + EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 1); +} + +TEST_F(LRUQueueTest, SameElementsDifferentOrder) { + std::mutex mutex; + std::lock_guard lock(mutex); + + queue1->add(UInt128Wrapper(123), 0, 1024, lock); + queue1->add(UInt128Wrapper(456), 0, 1024, lock); + queue1->add(UInt128Wrapper(789), 0, 1024, lock); + + queue2->add(UInt128Wrapper(789), 0, 1024, lock); + queue2->add(UInt128Wrapper(456), 0, 1024, lock); + queue2->add(UInt128Wrapper(123), 0, 1024, lock); + + EXPECT_EQ(queue1->levenshtein_distance_from(*queue2, lock), 2); +} diff --git a/build.sh b/build.sh index 2cf95d8732609a..c55f79788cebc7 100755 --- a/build.sh +++ b/build.sh @@ -583,10 +583,15 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then BUILD_FS_BENCHMARK=OFF fi + if [[ -z "${BUILD_FILE_CACHE_LRU_TOOL}" ]]; then + BUILD_FILE_CACHE_LRU_TOOL=OFF + fi + echo "-- Make program: ${MAKE_PROGRAM}" echo "-- Use ccache: ${CMAKE_USE_CCACHE}" echo "-- Extra cxx flags: ${EXTRA_CXX_FLAGS:-}" echo "-- Build fs benchmark tool: ${BUILD_FS_BENCHMARK}" + echo "-- Build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}" mkdir -p "${CMAKE_BUILD_DIR}" cd "${CMAKE_BUILD_DIR}" @@ -598,6 +603,7 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then -DENABLE_CACHE_LOCK_DEBUG="${ENABLE_CACHE_LOCK_DEBUG}" \ -DMAKE_TEST=OFF \ -DBUILD_FS_BENCHMARK="${BUILD_FS_BENCHMARK}" \ + -DBUILD_FILE_CACHE_LRU_TOOL="${BUILD_FILE_CACHE_LRU_TOOL}" \ ${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \ -DWITH_MYSQL="${WITH_MYSQL}" \ -DUSE_LIBCPP="${USE_LIBCPP}" \ diff --git a/gensrc/proto/file_cache.proto b/gensrc/proto/file_cache.proto new file mode 100644 index 00000000000000..f11375586aa16c --- /dev/null +++ b/gensrc/proto/file_cache.proto @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Define file format struct, like data header, index header. + +syntax="proto2"; + +package doris.io.cache; + +message UInt128WrapperPb { + optional uint64 high = 1; + optional uint64 low = 2; +} + +message LRUDumpEntryPb { + optional UInt128WrapperPb hash = 1; + optional uint64 offset = 2; + optional uint64 size = 3; +} + +message LRUDumpEntryGroupPb { + repeated LRUDumpEntryPb entries = 1; +} + +message EntryGroupOffsetSizePb { + optional uint64 offset = 1; + optional uint64 size = 2; + optional uint32 checksum = 3; +} + +message LRUDumpMetaPb{ + optional uint64 entry_num = 1; + optional string queue_name = 2; + repeated EntryGroupOffsetSizePb group_offset_size = 3; +} + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index e18f6f8061c2f1..4f416eb148200d 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -169,6 +169,9 @@ class ServerNode { assert false : 'Unknown node type' } + String getBasePath() { + return path + } } class Frontend extends ServerNode { diff --git a/regression-test/suites/demo_p0/test_lru_persist.groovy b/regression-test/suites/demo_p0/test_lru_persist.groovy new file mode 100644 index 00000000000000..249faadeedad95 --- /dev/null +++ b/regression-test/suites/demo_p0/test_lru_persist.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +// Run docker suite steps: +// 1. Read 'docker/runtime/doris-compose/Readme.md', make sure you can setup a doris docker cluster; +// 2. update regression-conf-custom.groovy with config: +// image = "xxxx" // your doris docker image +// excludeDockerTest = false // do run docker suite, default is true +// dockerEndDeleteFiles = false // after run docker suite, whether delete contains's log and data in directory '/tmp/doris/' + +// When run docker suite, then no need an external doris cluster. +// But whether run a docker suite, need more check. +// Firstly, get the pipeline's run mode (cloud or not_cloud): +// If there's an external doris cluster, then fetch pipeline's runMode from it. +// If there's no external doris cluster, then set pipeline's runMode with command args. +// for example: sh run-regression-test.sh --run docker_action -runMode=cloud/not_cloud +// Secondly, compare ClusterOptions.cloudMode and pipeline's runMode +// If ClusterOptions.cloudMode = null then let ClusterOptions.cloudMode = pipeline's cloudMode, and run docker suite. +// if ClusterOptions.cloudMode = true or false, if cloudMode == pipeline's cloudMode or pipeline's cloudMode is unknown, +// then run docker suite, otherwise don't run docker suite. + +// NOTICE: +// 1. Need add 'docker' to suite's group, and don't add 'nonConcurrent' to it; +// 2. In docker closure: +// a. Don't use 'Awaitility.await()...until(f)', but use 'dockerAwaitUntil(..., f)'; +// 3. No need to use code ` if (isCloudMode()) { return } ` in docker suites, +// instead should use `ClusterOptions.cloudMode = true/false` is enough. +// Because when run docker suite without an external doris cluster, if suite use code `isCloudMode()`, it need specific -runMode=cloud/not_cloud. +// On the contrary, `ClusterOptions.cloudMode = true/false` no need specific -runMode=cloud/not_cloud when no external doris cluster exists. + +suite('test_lru_persist', 'docker') { + def options = new ClusterOptions() + + options.feNum = 1 + options.beNum = 1 + options.msNum = 1 + options.cloudMode = true + options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2'] + options.beConfigs += ['enable_file_cache=true', 'enable_java_support=false', 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'file_cache_background_lru_dump_interval_ms=2000', 'file_cache_background_lru_log_replay_interval_ms=500', + 'disable_auto_compation=true', 'file_cache_enter_need_evict_cache_in_advance_percent=99', + 'file_cache_background_lru_dump_update_cnt_threshold=0' + ] + + // run another docker + docker(options) { + cluster.checkFeIsAlive(1, true) + cluster.checkBeIsAlive(1, true) + sql '''set global enable_auto_analyze=false''' + + sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="1")''' + sql '''insert into tb1 values (1),(2),(3)''' + sql '''insert into tb1 values (4),(5),(6)''' + sql '''insert into tb1 values (7),(8),(9)''' + sql '''insert into tb1 values (10),(11),(12)''' + + def be = cluster.getBeByIndex(1) + def beBasePath = be.getBasePath() + def cachePath = beBasePath + "/storage/file_cache/" + + sleep(15000); + cluster.stopBackends(1) + + def normalBefore = "md5sum ${cachePath}/lru_dump_normal.tail".execute().text.trim().split()[0] + logger.info("normalBefore: ${normalBefore}") + + cluster.startBackends(1) + sleep(10000); + + cluster.stopBackends(1) + + // check md5sum again after be restart + def normalAfter = "md5sum ${cachePath}/lru_dump_normal.tail".execute().text.trim().split()[0] + logger.info("normalAfter: ${normalAfter}") + + assert normalBefore == normalAfter + } +}