Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ message(STATUS "build fs benchmark tool: ${BUILD_FS_BENCHMARK}")
option(BUILD_TASK_EXECUTOR_SIMULATOR "ON for building task executor simulator or OFF for not" OFF)
message(STATUS "build task executor simulator: ${BUILD_TASK_EXECUTOR_SIMULATOR}")

option(BUILD_FILE_CACHE_LRU_TOOL "ON for building file cache lru tool or OFF for not" OFF)
message(STATUS "build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}")

set(CMAKE_SKIP_RPATH TRUE)
set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_STATIC_RUNTIME ON)
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
constexpr static std::string_view RELEASED_ELEMENTS = "released_elements";
constexpr static std::string_view DUMP = "dump";
constexpr static std::string_view VALUE = "value";

Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
Expand Down Expand Up @@ -127,6 +128,8 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
*json_metrics = json.ToString();
}
}
} else if (operation == DUMP) {
io::FileCacheFactory::instance()->dump_all_caches();
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
Expand Down
25 changes: 25 additions & 0 deletions be/src/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,28 @@ if (${BUILD_FS_BENCHMARK} STREQUAL "ON")
)

endif()

if (${BUILD_FILE_CACHE_LRU_TOOL} STREQUAL "ON")
add_executable(file_cache_lru_tool
cache/file_cache_lru_tool.cpp
)

pch_reuse(file_cache_lru_tool)

# This permits libraries loaded by dlopen to link to the symbols in the program.
set_target_properties(file_cache_lru_tool PROPERTIES ENABLE_EXPORTS 1)

target_link_libraries(file_cache_lru_tool
${DORIS_LINK_LIBS}
)

install(DIRECTORY DESTINATION ${OUTPUT_DIR}/lib/)
install(TARGETS file_cache_lru_tool DESTINATION ${OUTPUT_DIR}/lib/)

add_custom_command(TARGET file_cache_lru_tool POST_BUILD
COMMAND ${CMAKE_OBJCOPY} --only-keep-debug $<TARGET_FILE:file_cache_lru_tool> $<TARGET_FILE:file_cache_lru_tool>.dbg
COMMAND ${CMAKE_STRIP} --strip-debug --strip-unneeded $<TARGET_FILE:file_cache_lru_tool>
COMMAND ${CMAKE_OBJCOPY} --add-gnu-debuglink=$<TARGET_FILE:file_cache_lru_tool>.dbg $<TARGET_FILE:file_cache_lru_tool>
)

endif()
21 changes: 13 additions & 8 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,18 @@ void BlockFileCache::run_background_lru_log_replay() {
}
}

void BlockFileCache::dump_lru_queues(bool force) {
std::unique_lock dump_lock(_dump_lru_queues_mtx);
if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
!ExecEnv::GetInstance()->get_is_upgrading()) {
_lru_dumper->dump_queue("disposable", force);
_lru_dumper->dump_queue("normal", force);
_lru_dumper->dump_queue("index", force);
_lru_dumper->dump_queue("ttl", force);
_lru_dumper->set_first_dump_done();
}
}

void BlockFileCache::run_background_lru_dump() {
Thread::set_self_name("run_background_lru_dump");
while (!_close) {
Expand All @@ -2327,14 +2339,7 @@ void BlockFileCache::run_background_lru_dump() {
break;
}
}

if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
!ExecEnv::GetInstance()->get_is_upgrading()) {
_lru_dumper->dump_queue("disposable");
_lru_dumper->dump_queue("normal");
_lru_dumper->dump_queue("index");
_lru_dumper->dump_queue("ttl");
}
dump_lru_queues(false);
}
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class BlockFileCache {
std::string dump_structure(const UInt128Wrapper& hash);
std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset);

void dump_lru_queues(bool force);

[[nodiscard]] size_t get_used_cache_size(FileCacheType type) const;

[[nodiscard]] size_t get_file_blocks_num(FileCacheType type) const;
Expand Down Expand Up @@ -556,7 +558,7 @@ class BlockFileCache {
// so join this async load thread first
std::unique_ptr<FileCacheStorage> _storage;
std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;

std::mutex _dump_lru_queues_mtx;
moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
};

Expand Down
6 changes: 6 additions & 0 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ std::string FileCacheFactory::clear_file_caches(bool sync) {
return ss.str();
}

void FileCacheFactory::dump_all_caches() {
for (const auto& cache : _caches) {
cache->dump_lru_queues(true);
}
}

std::vector<std::string> FileCacheFactory::get_base_paths() {
std::vector<std::string> paths;
for (const auto& pair : _path_to_cache) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ class FileCacheFactory {
*/
std::string clear_file_caches(bool sync);

/**
* dump lru queue info for all file cache instances
*/
void dump_all_caches();

std::vector<std::string> get_base_paths();

/**
Expand Down
53 changes: 49 additions & 4 deletions be/src/io/cache/cache_lru_dumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,55 @@ Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,

out.close();

if (_is_first_dump) [[unlikely]] {
// we back up two dumps (one for last before be restart, one for first after be restart)
// for later debug the restore process
try {
if (std::filesystem::exists(final_filename)) {
std::string backup_filename = final_filename + "_" + _start_time + "_last";
std::rename(final_filename.c_str(), backup_filename.c_str());
}
std::string timestamped_filename = final_filename + "_" + _start_time;
std::filesystem::copy_file(tmp_filename, timestamped_filename);

std::filesystem::path dir = std::filesystem::path(final_filename).parent_path();
std::string prefix = std::filesystem::path(final_filename).filename().string();
uint64_t total_size = 0;
std::vector<std::pair<std::filesystem::path, std::filesystem::file_time_type>> files;
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
if (entry.path().filename().string().find(prefix) == 0) {
total_size += entry.file_size();
files.emplace_back(entry.path(), entry.last_write_time());
}
}
if (total_size > 5ULL * 1024 * 1024 * 1024) {
// delete oldest two files
std::sort(files.begin(), files.end(),
[](const auto& a, const auto& b) { return a.second < b.second; });
if (!files.empty()) {
auto remove_file = [](const std::filesystem::path& file_path) {
std::error_code ec;
bool removed = std::filesystem::remove(file_path, ec);
LOG(INFO) << "Remove " << (removed ? "succeeded" : "failed")
<< " for file: " << file_path
<< (ec ? ", error: " + ec.message() : "");
return removed;
};

remove_file(files[0].first);
if (files.size() > 1) {
remove_file(files[1].first);
}
}
}
} catch (const std::filesystem::filesystem_error& e) {
LOG(WARNING) << "failed to handle first dump case: " << e.what();
}
}

// Rename tmp to formal file
try {
std::rename(tmp_filename.c_str(), final_filename.c_str());
std::remove(tmp_filename.c_str());
file_size = std::filesystem::file_size(final_filename);
} catch (const std::filesystem::filesystem_error& e) {
LOG(WARNING) << "failed to rename " << tmp_filename << " to " << final_filename
Expand All @@ -247,10 +292,10 @@ Status CacheLRUDumper::finalize_dump(std::ofstream& out, size_t entry_num,
return Status::OK();
}

void CacheLRUDumper::dump_queue(const std::string& queue_name) {
void CacheLRUDumper::dump_queue(const std::string& queue_name, bool force) {
FileCacheType type = string_to_cache_type(queue_name);
if (_recorder->get_lru_queue_update_cnt_from_last_dump(type) >
config::file_cache_background_lru_dump_update_cnt_threshold) {
if (force || _recorder->get_lru_queue_update_cnt_from_last_dump(type) >
config::file_cache_background_lru_dump_update_cnt_threshold) {
LRUQueue& queue = _recorder->get_shadow_queue(type);
do_dump_queue(queue, queue_name);
_recorder->reset_lru_queue_update_cnt_from_last_dump(type);
Expand Down
17 changes: 15 additions & 2 deletions be/src/io/cache/cache_lru_dumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

#pragma once

#include <chrono>
#include <cstdint>
#include <cstring>
#include <ctime>
#include <filesystem>
#include <fstream>
#include <iostream>
Expand All @@ -38,11 +40,19 @@ class LRUQueueRecorder;
class CacheLRUDumper {
public:
CacheLRUDumper(BlockFileCache* mgr, LRUQueueRecorder* recorder)
: _mgr(mgr), _recorder(recorder) {};
void dump_queue(const std::string& queue_name);
: _mgr(mgr), _recorder(recorder) {
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
std::stringstream ss;
ss << std::put_time(std::localtime(&in_time_t), "%Y%m%d%H%M%S");
_start_time = ss.str();
};

void dump_queue(const std::string& queue_name, bool force);
void restore_queue(LRUQueue& queue, const std::string& queue_name,
std::lock_guard<std::mutex>& cache_lock);
void remove_lru_dump_files();
void set_first_dump_done() { _is_first_dump = false; }

private:
void do_dump_queue(LRUQueue& queue, const std::string& queue_name);
Expand Down Expand Up @@ -79,5 +89,8 @@ class CacheLRUDumper {

BlockFileCache* _mgr;
LRUQueueRecorder* _recorder;

std::string _start_time;
bool _is_first_dump = true;
};
} // namespace doris::io
7 changes: 7 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
for (auto& block : holder.file_blocks) {
switch (block->state()) {
case FileBlock::State::EMPTY:
VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}",
path().native(), _cache_hash.to_string(), _cache_hash.high(),
_cache_hash.low(), block->offset(), block->get_cache_file());
block->get_or_set_downloader();
if (block->is_downloader()) {
empty_blocks.push_back(block);
Expand All @@ -234,6 +237,10 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
stats.hit_cache = false;
break;
case FileBlock::State::SKIP_CACHE:
VLOG_DEBUG << fmt::format(
"Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}",
path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(),
block->offset(), block->get_cache_file());
empty_blocks.push_back(block);
stats.hit_cache = false;
stats.skip_cache = true;
Expand Down
Loading
Loading