diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 4522db10601ab4..a4453b9d937522 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -376,10 +376,10 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, boo /// Move to the end of the queue. The iterator remains valid. if (cell.queue_iterator && move_iter_flag) { queue.move_to_end(*cell.queue_iterator, cache_lock); + _lru_recorder->record_queue_event(cell.file_block->cache_type(), + CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash, + cell.file_block->_key.offset, cell.size()); } - _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::MOVETOBACK, - cell.file_block->_key.hash, cell.file_block->_key.offset, - cell.size()); cell.update_atime(); } @@ -1546,7 +1546,11 @@ bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset, LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset, std::lock_guard& /* cache_lock */) const { - return map.find(std::make_pair(hash, offset))->second; + auto itr = map.find(std::make_pair(hash, offset)); + if (itr != map.end()) { + return itr->second; + } + return std::list::iterator(); } std::string LRUQueue::to_string(std::lock_guard& /* cache_lock */) const { diff --git a/be/src/io/cache/lru_queue_recorder.cpp b/be/src/io/cache/lru_queue_recorder.cpp index 12da29d42b7f1a..c26c19ec371811 100644 --- a/be/src/io/cache/lru_queue_recorder.cpp +++ b/be/src/io/cache/lru_queue_recorder.cpp @@ -26,7 +26,7 @@ void LRUQueueRecorder::record_queue_event(FileCacheType type, CacheLRULogType lo const UInt128Wrapper hash, const size_t offset, const size_t size) { CacheLRULogQueue& log_queue = get_lru_log_queue(type); - log_queue.push_back(std::make_unique(log_type, hash, offset, size)); + log_queue.enqueue(std::make_unique(log_type, hash, offset, size)); ++(_lru_queue_update_cnt_from_last_dump[type]); } @@ -36,9 +36,8 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { LRUQueue& shadow_queue = get_shadow_queue(type); std::lock_guard lru_log_lock(_mutex_lru_log); - while (!log_queue.empty()) { - auto log = std::move(log_queue.front()); - log_queue.pop_front(); + std::unique_ptr log; + while (log_queue.try_dequeue(log)) { try { switch (log->type) { case CacheLRULogType::ADD: { @@ -47,7 +46,7 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { } case CacheLRULogType::REMOVE: { auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); - if (it != shadow_queue.end()) { + if (it != std::list::iterator()) { shadow_queue.remove(it, lru_log_lock); } else { LOG(WARNING) << "REMOVE failed, doesn't exist in shadow queue"; @@ -55,8 +54,9 @@ void LRUQueueRecorder::replay_queue_event(FileCacheType type) { break; } case CacheLRULogType::MOVETOBACK: { + LOG(INFO) << "MOVETOBACK" << log->hash.to_string() << " " << log->offset; auto it = shadow_queue.get(log->hash, log->offset, lru_log_lock); - if (it != shadow_queue.end()) { + if (it != std::list::iterator()) { shadow_queue.move_to_end(it, lru_log_lock); } else { LOG(WARNING) << "MOVETOBACK failed, doesn't exist in shadow queue"; diff --git a/be/src/io/cache/lru_queue_recorder.h b/be/src/io/cache/lru_queue_recorder.h index dceef7a493ce27..1f6d69493cf4a8 100644 --- a/be/src/io/cache/lru_queue_recorder.h +++ b/be/src/io/cache/lru_queue_recorder.h @@ -17,6 +17,10 @@ #pragma once +#include + +#include + #include "io/cache/file_cache_common.h" namespace doris::io { @@ -40,7 +44,7 @@ struct CacheLRULog { : type(t), hash(h), offset(o), size(s) {} }; -using CacheLRULogQueue = std::list>; +using CacheLRULogQueue = moodycamel::ConcurrentQueue>; class LRUQueueRecorder { public: diff --git a/be/test/io/cache/block_file_cache_test.cpp b/be/test/io/cache/block_file_cache_test.cpp index fbb5bdeae46336..743abeb89869f3 100644 --- a/be/test/io/cache/block_file_cache_test.cpp +++ b/be/test/io/cache/block_file_cache_test.cpp @@ -3909,6 +3909,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) { } EXPECT_TRUE(reader.close().ok()); EXPECT_TRUE(reader.closed()); + std::this_thread::sleep_for(std::chrono::seconds(1)); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } @@ -3969,6 +3970,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) { CachedRemoteFileReader reader(local_reader, opts); EXPECT_EQ(reader._cache->get_base_path(), cache_base_path); } + std::this_thread::sleep_for(std::chrono::seconds(1)); if (fs::exists(cache_base_path)) { fs::remove_all(cache_base_path); } diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp b/be/test/io/cache/block_file_cache_test_lru_dump.cpp index ea3cb63601e8ad..eabf4829f05199 100644 --- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp +++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp @@ -156,10 +156,10 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 500000); // all queue are filled, let's check the lru log records - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 5); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 5); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 5); + ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 5); // then check the log replay std::this_thread::sleep_for(std::chrono::milliseconds( @@ -175,10 +175,10 @@ TEST_F(BlockFileCacheTest, test_lru_log_record_replay_dump_restore) { context2); // move index queue 3rd element to the end cache.remove_if_cached(key3); // remove all element from ttl queue } - ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size(), 5); - ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size(), 1); - ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size(), 0); - ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size(), 0); + ASSERT_EQ(cache._lru_recorder->_ttl_lru_log_queue.size_approx(), 5); + ASSERT_EQ(cache._lru_recorder->_index_lru_log_queue.size_approx(), 1); + ASSERT_EQ(cache._lru_recorder->_normal_lru_log_queue.size_approx(), 0); + ASSERT_EQ(cache._lru_recorder->_disposable_lru_log_queue.size_approx(), 0); std::this_thread::sleep_for(std::chrono::milliseconds( 2 * config::file_cache_background_lru_log_replay_interval_ms));