Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
if (rs.use_count() > 1) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
Expand Down
4 changes: 3 additions & 1 deletion be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {

num_vacuumed += t->delete_expired_stale_rowsets();
}
LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed);
LOG_INFO("finish vacuum stale rowsets")
.tag("num_vacuumed", num_vacuumed)
.tag("num_tablets", tablets_to_vacuum.size());
}

std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,6 @@ DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,6 @@ DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file cache.
Expand Down
194 changes: 75 additions & 119 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
_ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
std::numeric_limits<int>::max());

_recycle_keys = std::make_shared<boost::lockfree::spsc_queue<FileCacheKey>>(
config::file_cache_recycle_keys_size);
if (cache_settings.storage == "memory") {
_storage = std::make_unique<MemFileCacheStorage>();
_cache_base_path = "memory";
Expand Down Expand Up @@ -328,7 +326,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
DCHECK(!_is_initialized);
_is_initialized = true;
RETURN_IF_ERROR(_storage->init(this));
_cache_background_thread = std::thread(&BlockFileCache::run_background_operation, this);
_cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
_cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);

return Status::OK();
}
Expand Down Expand Up @@ -562,7 +561,7 @@ std::string BlockFileCache::clear_file_cache_async() {

void BlockFileCache::recycle_deleted_blocks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UT should be added to make it fully tested

using namespace std::chrono;
static int remove_batch = 100;
static int remove_batch = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it 100, and make the cond.wait_for(cache_lock, std::chrono::microseconds(100)); configurable to do throttle.

TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch);
TEST_SYNC_POINT_CALLBACK("BlockFileCache::recycle_deleted_blocks");
std::unique_lock cache_lock(_mutex);
Expand All @@ -573,45 +572,34 @@ void BlockFileCache::recycle_deleted_blocks() {
int i = 0;
std::condition_variable cond;
auto start_time = steady_clock::time_point();
if (_async_clear_file_cache) {
LOG_INFO("Start clear file cache async").tag("path", _cache_base_path);
auto remove_file_block = [&cache_lock, this](FileBlockCell* cell) {
std::lock_guard segment_lock(cell->file_block->_mutex);
remove(cell->file_block, cache_lock, segment_lock);
};
static int remove_batch = 100;
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_remove_batch", &remove_batch);
int i = 0;
std::condition_variable cond;
auto iter_queue = [&](LRUQueue& queue) {
bool end = false;
while (queue.get_capacity(cache_lock) != 0 && !end) {
std::vector<FileBlockCell*> cells;
for (const auto& [entry_key, entry_offset, _] : queue) {
if (i == remove_batch) {
i = 0;
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
if (!cell) continue;
if (!cell->is_deleted) {
end = true;
break;
} else if (cell->releasable()) {
i++;
cells.push_back(cell);
}

LOG_INFO("Start clear file cache async").tag("path", _cache_base_path);
auto iter_queue = [&](LRUQueue& queue) {
bool end = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is end set to true?

while (queue.get_capacity(cache_lock) != 0 && !end) {
std::vector<FileBlockCell*> cells;
for (const auto& [entry_key, entry_offset, _] : queue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that we should not iterate from the beginning again?
there may be performance penalty if there are lots of running query while we are deleting elements that are far from the head.
can we iterate from rbegin() to get better performance?

if (i == remove_batch) {
i = 0;
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
if (!cell) continue;
if (cell->releasable()) {
i++;
cells.push_back(cell);
}
std::ranges::for_each(cells, remove_file_block);
// just for sleep
cond.wait_for(cache_lock, std::chrono::microseconds(100));
}
};
iter_queue(get_queue(FileCacheType::DISPOSABLE));
iter_queue(get_queue(FileCacheType::NORMAL));
iter_queue(get_queue(FileCacheType::INDEX));
}
if (_async_clear_file_cache || config::file_cache_ttl_valid_check_interval_second != 0) {
std::ranges::for_each(cells, remove_file_block);
// just for sleep
cond.wait_for(cache_lock, std::chrono::microseconds(100));
}
};
iter_queue(get_queue(FileCacheType::DISPOSABLE));
iter_queue(get_queue(FileCacheType::NORMAL));
iter_queue(get_queue(FileCacheType::INDEX));

if (config::file_cache_ttl_valid_check_interval_second != 0) {
std::vector<UInt128Wrapper> ttl_keys;
ttl_keys.reserve(_key_to_time.size());
for (auto& [key, _] : _key_to_time) {
Expand All @@ -630,14 +618,11 @@ void BlockFileCache::recycle_deleted_blocks() {
cell.is_deleted =
cell.is_deleted
? true
: (config::file_cache_ttl_valid_check_interval_second == 0
? false
: std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now()
.time_since_epoch())
.count() -
cell.atime >
config::file_cache_ttl_valid_check_interval_second);
: std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count() -
cell.atime >
config::file_cache_ttl_valid_check_interval_second;
if (!cell.is_deleted) {
continue;
} else if (cell.releasable()) {
Expand All @@ -648,14 +633,11 @@ void BlockFileCache::recycle_deleted_blocks() {
std::ranges::for_each(cells, remove_file_block);
}
}
if (_async_clear_file_cache) {
_async_clear_file_cache = false;
auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time);
LOG_INFO("End clear file cache async")
.tag("path", _cache_base_path)
.tag("use_time", static_cast<int64_t>(use_time.count()));
}
}
auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - start_time);
LOG_INFO("End clear file cache async")
.tag("path", _cache_base_path)
.tag("use_time", static_cast<int64_t>(use_time.count()));
}

FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
Expand Down Expand Up @@ -925,18 +907,6 @@ void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::remove_file_blocks_async(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, /*sync*/ false);
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
}

void BlockFileCache::remove_file_blocks_and_clean_time_maps(
std::vector<FileBlockCell*>& to_evict, std::lock_guard<std::mutex>& cache_lock) {
auto remove_file_block_and_clean_time_maps_if = [&](FileBlockCell* cell) {
Expand Down Expand Up @@ -1183,15 +1153,11 @@ void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
bool is_ttl_file = remove_if_ttl_file_unlock(file_key, true, cache_lock);
if (!is_ttl_file) {
auto iter = _files.find(file_key);
std::vector<FileBlockCell*> to_remove;
if (iter != _files.end()) {
for (auto& [_, cell] : iter->second) {
if (cell.releasable()) {
to_remove.push_back(&cell);
}
cell.is_deleted = true;
}
}
remove_file_blocks_async(to_remove, cache_lock);
}
}

Expand Down Expand Up @@ -1378,7 +1344,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,

template <class T, class U>
requires IsXLock<T> && IsXLock<U>
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock) {
auto hash = file_block->get_hash_value();
auto offset = file_block->offset();
auto type = file_block->cache_type();
Expand All @@ -1398,25 +1364,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
key.offset = offset;
key.meta.type = type;
key.meta.expiration_time = expiration_time;
if (sync) {
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
} else {
// the file will be deleted in the bottom half
// so there will be a window that the file is not in the cache but still in the storage
// but it's ok, because the rowset is stale already
// in case something unexpected happen, set the _recycle_keys queue to zero to fallback
bool ret = _recycle_keys->push(key);
if (!ret) {
LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
} else if (cell->file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
cell->is_deleted = true;
}
_cur_cache_size -= file_block->range().size();
if (FileCacheType::TTL == type) {
Expand All @@ -1430,16 +1383,6 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
*_num_removed_blocks << 1;
}

void BlockFileCache::recycle_stale_rowset_async_bottom_half() {
FileCacheKey key;
while (_recycle_keys->pop(key)) {
Status st = _storage->remove(key);
if (!st.ok()) {
LOG_WARNING("").error(st);
}
}
}

size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
SCOPED_CACHE_LOCK(_mutex);
return get_used_cache_size_unlocked(cache_type, cache_lock);
Expand Down Expand Up @@ -1729,7 +1672,35 @@ void BlockFileCache::check_disk_resource_limit() {
}
}

void BlockFileCache::run_background_operation() {
void BlockFileCache::run_background_gc() {
int64_t interval_time_seconds = 10;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock, std::chrono::seconds(interval_time_seconds));
if (_close) {
break;
}
}

recycle_deleted_blocks();

{
int64_t cur_time = UnixSeconds();
SCOPED_CACHE_LOCK(_mutex);
while (!_time_to_key.empty()) {
auto begin = _time_to_key.begin();
if (cur_time < begin->first) {
break;
}
remove_if_ttl_file_unlock(begin->second, false, cache_lock);
}
}
}
}

void BlockFileCache::run_background_monitor() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_time_seconds);
Expand Down Expand Up @@ -1777,21 +1748,6 @@ void BlockFileCache::run_background_operation() {
_num_read_blocks_1h->get_value());
}
}

recycle_stale_rowset_async_bottom_half();
recycle_deleted_blocks();
// gc
{
int64_t cur_time = UnixSeconds();
SCOPED_CACHE_LOCK(_mutex);
while (!_time_to_key.empty()) {
auto begin = _time_to_key.begin();
if (cur_time < begin->first) {
break;
}
remove_if_ttl_file_unlock(begin->second, false, cache_lock);
}
}
}
}

Expand Down Expand Up @@ -2053,5 +2009,5 @@ std::map<std::string, double> BlockFileCache::get_stats_unsafe() {

template void BlockFileCache::remove(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock,
std::lock_guard<std::mutex>& block_lock, bool sync);
std::lock_guard<std::mutex>& block_lock);
} // namespace doris::io
Loading
Loading