Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,12 @@ DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
// dump queue only if the queue update specific times through several dump intervals
DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");

DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
Expand Down
9 changes: 8 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,16 @@ DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);

DECLARE_Int32(file_cache_downloader_thread_num_min);
DECLARE_Int32(file_cache_downloader_thread_num_max);
// used to persist lru information before be reboot and load the info back
DECLARE_mInt64(file_cache_background_lru_dump_interval_ms);
// dump queue only if the queue update specific times through several dump intervals
DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
DECLARE_mBool(enable_evaluate_shadow_queue_diff);

// inverted index searcher cache
// cache entry stay time after lookup
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/action/shrink_mem_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ void ShrinkMemAction::handle(HttpRequest* req) {
MemoryReclamation::revoke_process_memory("ShrinkMemAction");
LOG(INFO) << "shrink memory triggered, using Process GC Free Memory";
HttpChannel::send_reply(req, HttpStatus::OK, "shrinking");

ExecEnv::GetInstance()->set_is_upgrading();
}

} // namespace doris
241 changes: 189 additions & 52 deletions be/src/io/cache/block_file_cache.cpp

Large diffs are not rendered by default.

130 changes: 38 additions & 92 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
#include <optional>
#include <thread>

#include "io/cache/cache_lru_dumper.h"
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/file_cache_storage.h"
#include "io/cache/lru_queue_recorder.h"
#include "util/runtime_profile.h"
#include "util/threadpool.h"

Expand Down Expand Up @@ -72,10 +74,6 @@ class LockScopedTimer {
#define SCOPED_CACHE_LOCK(MUTEX, cache) std::lock_guard cache_lock(MUTEX);
#endif

template <class Lock>
concept IsXLock = std::same_as<Lock, std::lock_guard<std::mutex>> ||
std::same_as<Lock, std::unique_lock<std::mutex>>;

class FSFileCacheStorage;

// The BlockFileCache is responsible for the management of the blocks
Expand All @@ -85,16 +83,16 @@ class BlockFileCache {
friend class MemFileCacheStorage;
friend class FileBlock;
friend struct FileBlocksHolder;
friend class CacheLRUDumper;
friend class LRUQueueRecorder;

public:
static std::string cache_type_to_string(FileCacheType type);
static FileCacheType string_to_cache_type(const std::string& str);
// hash the file_name to uint128
static UInt128Wrapper hash(const std::string& path);

BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);

~BlockFileCache() {
virtual ~BlockFileCache() {
{
std::lock_guard lock(_close_mtx);
_close = true;
Expand All @@ -112,6 +110,12 @@ class BlockFileCache {
if (_cache_background_evict_in_advance_thread.joinable()) {
_cache_background_evict_in_advance_thread.join();
}
if (_cache_background_lru_dump_thread.joinable()) {
_cache_background_lru_dump_thread.join();
}
if (_cache_background_lru_log_replay_thread.joinable()) {
_cache_background_lru_log_replay_thread.join();
}
}

/// Restore cache from local filesystem.
Expand Down Expand Up @@ -216,86 +220,6 @@ class BlockFileCache {
// for be UTs
std::map<std::string, double> get_stats_unsafe();

class LRUQueue {
public:
LRUQueue() = default;
LRUQueue(size_t max_size, size_t max_element_size, int64_t hot_data_interval)
: max_size(max_size),
max_element_size(max_element_size),
hot_data_interval(hot_data_interval) {}

struct HashFileKeyAndOffset {
std::size_t operator()(const std::pair<UInt128Wrapper, size_t>& pair) const {
return KeyHash()(pair.first) + pair.second;
}
};

struct FileKeyAndOffset {
UInt128Wrapper hash;
size_t offset;
size_t size;

FileKeyAndOffset(const UInt128Wrapper& hash, size_t offset, size_t size)
: hash(hash), offset(offset), size(size) {}
};

using Iterator = typename std::list<FileKeyAndOffset>::iterator;

size_t get_max_size() const { return max_size; }
size_t get_max_element_size() const { return max_element_size; }

template <class T>
requires IsXLock<T>
size_t get_capacity(T& /* cache_lock */) const {
return cache_size;
}

size_t get_capacity_unsafe() const { return cache_size; }

size_t get_elements_num_unsafe() const { return queue.size(); }

size_t get_elements_num(std::lock_guard<std::mutex>& /* cache_lock */) const {
return queue.size();
}

Iterator add(const UInt128Wrapper& hash, size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock);
template <class T>
requires IsXLock<T>
void remove(Iterator queue_it, T& cache_lock);

void move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& cache_lock);

std::string to_string(std::lock_guard<std::mutex>& cache_lock) const;

bool contains(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& cache_lock) const;

Iterator begin() { return queue.begin(); }

Iterator end() { return queue.end(); }

void remove_all(std::lock_guard<std::mutex>& cache_lock);

Iterator get(const UInt128Wrapper& hash, size_t offset,
std::lock_guard<std::mutex>& /* cache_lock */) const;

int64_t get_hot_data_interval() const { return hot_data_interval; }

void clear(std::lock_guard<std::mutex>& cache_lock) {
queue.clear();
map.clear();
cache_size = 0;
}

size_t max_size;
size_t max_element_size;
std::list<FileKeyAndOffset> queue;
std::unordered_map<std::pair<UInt128Wrapper, size_t>, Iterator, HashFileKeyAndOffset> map;
size_t cache_size = 0;
int64_t hot_data_interval {0};
};

using AccessRecord =
std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;

Expand Down Expand Up @@ -389,8 +313,8 @@ class BlockFileCache {
FileBlockCell(const FileBlockCell&) = delete;
};

BlockFileCache::LRUQueue& get_queue(FileCacheType type);
const BlockFileCache::LRUQueue& get_queue(FileCacheType type) const;
LRUQueue& get_queue(FileCacheType type);
const LRUQueue& get_queue(FileCacheType type) const;

template <class T, class U>
requires IsXLock<T> && IsXLock<U>
Expand All @@ -403,9 +327,9 @@ class BlockFileCache {
requires IsXLock<T>
FileBlockCell* get_cell(const UInt128Wrapper& hash, size_t offset, T& cache_lock);

FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& context, size_t offset,
size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);
virtual FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset, size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);

Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);

Expand Down Expand Up @@ -463,6 +387,9 @@ class BlockFileCache {
void run_background_monitor();
void run_background_ttl_gc();
void run_background_gc();
void run_background_lru_log_replay();
void run_background_lru_dump();
void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
void run_background_evict_in_advance();

bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
Expand All @@ -489,6 +416,17 @@ class BlockFileCache {
std::lock_guard<std::mutex>& cache_lock, size_t& cur_removed_size,
bool evict_in_advance);

Status check_ofstream_status(std::ofstream& out, std::string& filename);
Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash,
size_t offset, size_t size);
Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& tmp_filename,
std::string& final_filename, size_t& file_size);
Status check_ifstream_status(std::ifstream& in, std::string& filename);
Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& entry_num);
Status parse_one_lru_entry(std::ifstream& in, std::string& filename, UInt128Wrapper& hash,
size_t& offset, size_t& size);
void remove_lru_dump_files();

// info
std::string _cache_base_path;
size_t _capacity = 0;
Expand All @@ -503,6 +441,8 @@ class BlockFileCache {
std::thread _cache_background_ttl_gc_thread;
std::thread _cache_background_gc_thread;
std::thread _cache_background_evict_in_advance_thread;
std::thread _cache_background_lru_dump_thread;
std::thread _cache_background_lru_log_replay_thread;
std::atomic_bool _async_open_done {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
Expand Down Expand Up @@ -532,6 +472,9 @@ class BlockFileCache {
// keys for async remove
RecycleFileCacheKeys _recycle_keys;

std::unique_ptr<LRUQueueRecorder> _lru_recorder;
std::unique_ptr<CacheLRUDumper> _lru_dumper;

// metrics
std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics;
Expand Down Expand Up @@ -576,11 +519,14 @@ class BlockFileCache {
std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;

std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
// keep _storage last so it will deconstruct first
// otherwise, load_cache_info_into_memory might crash
// coz it will use other members of BlockFileCache
// so join this async load thread first
std::unique_ptr<FileCacheStorage> _storage;
std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
};

} // namespace doris::io
Loading
Loading