diff --git a/src/v/cloud_storage/access_time_tracker.cc b/src/v/cloud_storage/access_time_tracker.cc index 52829214f629d..358050375e5ba 100644 --- a/src/v/cloud_storage/access_time_tracker.cc +++ b/src/v/cloud_storage/access_time_tracker.cc @@ -16,11 +16,13 @@ #include #include +#include #include #include #include +#include #include namespace absl { @@ -51,20 +53,22 @@ namespace cloud_storage { // is defined by hand in the read/write methods so that it can be done // with streaming. struct table_header - : serde::envelope, serde::compat_version<0>> { + : serde::envelope, serde::compat_version<0>> { size_t table_size{0}; + tracker_version version{tracker_version::v1}; - auto serde_fields() { return std::tie(table_size); } + auto serde_fields() { return std::tie(table_size, version); } }; -ss::future<> access_time_tracker::write(ss::output_stream& out) { +ss::future<> access_time_tracker::write( + ss::output_stream& out, tracker_version version) { // This lock protects us from the _table being mutated while we // are iterating over it and yielding during the loop. auto lock_guard = co_await ss::get_units(_table_lock, 1); _dirty = false; - const table_header h{.table_size = _table.size()}; + const table_header h{.table_size = _table.size(), .version = version}; iobuf header_buf; serde::write(header_buf, h); co_await write_iobuf_to_output_stream(std::move(header_buf), out); @@ -74,11 +78,15 @@ ss::future<> access_time_tracker::write(ss::output_stream& out) { size_t i = 0; iobuf serialize_buf; - for (auto it : _table) { - serde::write(serialize_buf, it.first); - serde::write(serialize_buf, it.second); + for (const auto& [path, metadata] : _table) { + serde::write(serialize_buf, path); + serde::write(serialize_buf, metadata.atime_sec); + serde::write(serialize_buf, metadata.size); ++i; if (i % chunk_count == 0 || i == _table.size()) { + iobuf chunk_size; + serde::write(chunk_size, serialize_buf.size_bytes()); + co_await write_iobuf_to_output_stream(std::move(chunk_size), out); for (const auto& f : serialize_buf) { co_await out.write(f.get(), f.size()); } @@ -91,7 +99,9 @@ ss::future<> access_time_tracker::write(ss::output_stream& out) { } bool access_time_tracker::should_track(std::string_view key) const { - if (key.ends_with(".tx") || key.ends_with(".index")) { + if ( + key.ends_with(".tx") || key.ends_with(".index") + || key.ends_with(cache_tmp_file_extension)) { return false; } @@ -139,64 +149,81 @@ ss::future<> access_time_tracker::read(ss::input_stream& in) { auto tmp = co_await in.read_exactly(header_size); header_buf.append(tmp.get(), tmp.size()); auto h_parser = iobuf_parser(std::move(header_buf)); - table_header h = serde::read_nested(h_parser, 0); + auto h = serde::read_nested(h_parser, 0); - // How many items to consume per stream read() - constexpr size_t chunk_count = 2048; + auto defer = ss::defer([&] { + lock_guard.return_all(); + // Drop writes accumulated while reading + _pending_upserts.clear(); + }); + + // Skip loading data for older version + if (h.version == tracker_version::v1) { + co_return; + } - for (size_t i = 0; i < h.table_size; i += chunk_count) { - auto item_count = std::min(chunk_count, h.table_size - i); - auto tmp_buf = co_await in.read_exactly(item_count * table_item_size); + while (!in.eof()) { + auto chunk_sz_buf = co_await in.read_exactly(sizeof(size_t)); + if (chunk_sz_buf.empty() && in.eof()) { + break; + } + + iobuf chunk_sz; + chunk_sz.append(std::move(chunk_sz_buf)); + auto chunk_sz_parser = iobuf_parser{std::move(chunk_sz)}; + auto chunk_size = serde::read(chunk_sz_parser); + auto tmp_buf = co_await in.read_exactly(chunk_size); iobuf items_buf; items_buf.append(std::move(tmp_buf)); auto parser = iobuf_parser(std::move(items_buf)); - for (size_t j = 0; j < item_count; ++j) { - uint32_t hash = serde::read_nested(parser, 0); - timestamp_t t = serde::read_nested(parser, 0); - _table.emplace(hash, t); + while (parser.bytes_left() > 0) { + auto path = serde::read_nested(parser, 0); + auto atime = serde::read_nested(parser, 0); + auto size = serde::read_nested(parser, 0); + _table.emplace( + path, file_metadata{.atime_sec = atime, .size = size}); } } - lock_guard.return_all(); - // Any writes while we were reading are dropped - _pending_upserts.clear(); + vassert( + _table.size() == h.table_size, + "unexpected tracker size, loaded {} items, expected {} items", + _table.size(), + h.table_size); } -void access_time_tracker::add_timestamp( - std::string_view key, std::chrono::system_clock::time_point ts) { - if (!should_track(key)) { +void access_time_tracker::add( + ss::sstring path, std::chrono::system_clock::time_point atime, size_t size) { + if (!should_track(path)) { return; } - uint32_t seconds = std::chrono::time_point_cast(ts) + uint32_t seconds = std::chrono::time_point_cast(atime) .time_since_epoch() .count(); - uint32_t hash = xxhash_32(key.data(), key.size()); - auto units = seastar::try_get_units(_table_lock, 1); if (units.has_value()) { // Got lock, update main table - _table[hash] = seconds; + _table[path] = {.atime_sec = seconds, .size = size}; _dirty = true; } else { // Locked during serialization, defer write - _pending_upserts[hash] = seconds; + _pending_upserts[path] = {.atime_sec = seconds, .size = size}; } } -void access_time_tracker::remove_timestamp(std::string_view key) noexcept { +void access_time_tracker::remove(std::string_view key) noexcept { try { - uint32_t hash = xxhash_32(key.data(), key.size()); - + ss::sstring k{key.data(), key.size()}; auto units = seastar::try_get_units(_table_lock, 1); if (units.has_value()) { // Unlocked, update main table - _table.erase(hash); + _table.erase(k); _dirty = true; } else { // Locked during serialization, defer write - _pending_upserts[hash] = std::nullopt; + _pending_upserts[k] = std::nullopt; } } catch (...) { vassert( @@ -207,22 +234,42 @@ void access_time_tracker::remove_timestamp(std::string_view key) noexcept { } } -ss::future<> -access_time_tracker::trim(const fragmented_vector& existent) { - absl::btree_set existent_hashes; +ss::future<> access_time_tracker::sync( + const fragmented_vector& existent, + add_entries_t add_entries) { + absl::btree_set paths; for (const auto& i : existent) { - existent_hashes.insert(xxhash_32(i.path.data(), i.path.size())); + paths.insert(i.path); } auto lock_guard = co_await ss::get_units(_table_lock, 1); table_t tmp; - for (auto it : _table) { - if (existent_hashes.contains(it.first)) { + + for (const auto& it : _table) { + if (paths.contains(it.first)) { tmp.insert(it); } co_await ss::maybe_yield(); } + + if (add_entries) { + auto should_add = [this, &tmp](const auto& e) { + return should_track(e.path) && !tmp.contains(e.path); + }; + for (const auto& entry : existent | std::views::filter(should_add)) { + _dirty = true; + tmp.insert( + {entry.path, + {static_cast( + std::chrono::time_point_cast( + entry.access_time) + .time_since_epoch() + .count()), + entry.size}}); + } + } + if (_table.size() != tmp.size()) { // We dropped one or more entries, therefore mutated the table. _dirty = true; @@ -233,18 +280,30 @@ access_time_tracker::trim(const fragmented_vector& existent) { on_released_table_lock(); } -std::optional -access_time_tracker::estimate_timestamp(std::string_view key) const { - uint32_t hash = xxhash_32(key.data(), key.size()); - auto it = _table.find(hash); - if (it == _table.end()) { - return std::nullopt; +std::optional +access_time_tracker::get(const std::string& key) const { + if (auto it = _table.find(key); it != _table.end()) { + return it->second; } - auto seconds = std::chrono::seconds(it->second); - std::chrono::system_clock::time_point ts(seconds); - return ts; + return std::nullopt; } bool access_time_tracker::is_dirty() const { return _dirty; } +fragmented_vector access_time_tracker::lru_entries() const { + fragmented_vector items; + items.reserve(_table.size()); + for (const auto& [path, metadata] : _table) { + items.emplace_back(metadata.time_point(), path, metadata.size); + } + std::ranges::sort( + items, {}, [](const auto& item) { return item.access_time; }); + return items; +} + +std::chrono::system_clock::time_point file_metadata::time_point() const { + return std::chrono::system_clock::time_point{ + std::chrono::seconds{atime_sec}}; +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/access_time_tracker.h b/src/v/cloud_storage/access_time_tracker.h index 2680b6004d0e6..b402be04bf434 100644 --- a/src/v/cloud_storage/access_time_tracker.h +++ b/src/v/cloud_storage/access_time_tracker.h @@ -27,50 +27,53 @@ namespace cloud_storage { -/// Access time tracker maintains map from filename hash to -/// the timestamp that represents the time when the file was -/// accessed last. -/// -/// It is possible to have conflicts. In case of conflict -/// 'add_timestamp' method will overwrite another key. For that -/// key we will observe larger access time. When one of the -/// conflicted entries will be deleted another will be deleted -/// as well. This is OK because the code in the -/// 'cloud_storage/cache_service' is ready for that. +enum class tracker_version : uint8_t { v1, v2 }; + +struct file_metadata { + uint32_t atime_sec; + uint64_t size; + std::chrono::system_clock::time_point time_point() const; +}; + +/// Access time tracker maps cache entry file paths to their last accessed +/// timestamp and file size. class access_time_tracker { using timestamp_t = uint32_t; - using table_t = absl::btree_map; - - // Serialized size of each pair in table_t - static constexpr size_t table_item_size = 8; + using table_t = absl::btree_map; public: - /// Add access time to the container. - void add_timestamp( - std::string_view key, std::chrono::system_clock::time_point ts); + /// Add metadata to the container. + void add( + ss::sstring path, + std::chrono::system_clock::time_point atime, + size_t size); /// Remove key from the container. - void remove_timestamp(std::string_view) noexcept; + void remove(std::string_view) noexcept; - /// Return access time estimate (it can differ if there is a conflict - /// on file name hash). - std::optional - estimate_timestamp(std::string_view key) const; + /// Return file metadata for key. + std::optional get(const std::string& key) const; - ss::future<> write(ss::output_stream&); + ss::future<> write( + ss::output_stream&, tracker_version version = tracker_version::v2); ss::future<> read(ss::input_stream&); /// Returns true if tracker has new data which wasn't serialized /// to disk. bool is_dirty() const; - /// Remove every key which isn't present in list of existing files - ss::future<> trim(const fragmented_vector&); + using add_entries_t = ss::bool_class; + /// Remove every key which isn't present in list of input files + ss::future<> sync( + const fragmented_vector&, + add_entries_t add_entries = add_entries_t::no); size_t size() const { return _table.size(); } + fragmented_vector lru_entries() const; + private: - /// Returns true if the key's access time should be tracked. + /// Returns true if the key's metadata should be tracked. /// We do not wish to track index files and transaction manifests /// as they are just an appendage to segment/chunk files and are /// purged along with them. @@ -79,17 +82,17 @@ class access_time_tracker { /// Drain _pending_upserts for any writes made while table lock was held void on_released_table_lock(); - absl::btree_map _table; + table_t _table; // Lock taken during async loops over the table (ser/de and trim()) // modifications may proceed without the lock if it is not taken. // When releasing lock, drain _pending_upserts. ss::semaphore _table_lock{1}; - // Calls into add_timestamp/remove_timestamp populate this - // if the _serialization_lock is unavailable. The serialization code is - // responsible for draining it upon releasing the lock. - absl::btree_map> _pending_upserts; + // Calls into add/remove populate this if the _serialization_lock is + // unavailable. The serialization code is responsible for draining it upon + // releasing the lock. + absl::btree_map> _pending_upserts; bool _dirty{false}; }; diff --git a/src/v/cloud_storage/cache_probe.cc b/src/v/cloud_storage/cache_probe.cc index 5bd06b72fb7c2..c5da6e1ad77d8 100644 --- a/src/v/cloud_storage/cache_probe.cc +++ b/src/v/cloud_storage/cache_probe.cc @@ -37,7 +37,6 @@ cache_probe::cache_probe() { [this] { return _num_cached_gets; }, sm::description( "Total number of get requests that are already in cache.")), - sm::make_gauge( "size_bytes", [this] { return _cur_size_bytes; }, @@ -84,6 +83,18 @@ cache_probe::cache_probe() { sm::description( "High watermark of number of objects in cache.")) .aggregate(aggregate_labels), + sm::make_counter( + "tracker_syncs", + [this] { return _tracker_syncs; }, + sm::description( + "Number of times the access tracker was updated " + "with cache disk data")) + .aggregate(aggregate_labels), + sm::make_gauge( + "tracker_size", + [this] { return _tracker_size; }, + sm::description("Number of entries in cache access tracker")) + .aggregate(aggregate_labels), }); _public_metrics.add_group( @@ -114,6 +125,12 @@ cache_probe::cache_probe() { "Number of times could not free the expected amount of " "space, indicating possible bug or configuration issue.")) .aggregate(aggregate_labels), + sm::make_counter( + "in_mem_trims", + [this] { return _in_mem_trims; }, + sm::description("Number of times we trimmed the cache using " + "the in-memory access tracker.")) + .aggregate(aggregate_labels), }); } diff --git a/src/v/cloud_storage/cache_probe.h b/src/v/cloud_storage/cache_probe.h index 791d211b5f031..dfbe4e0f047fb 100644 --- a/src/v/cloud_storage/cache_probe.h +++ b/src/v/cloud_storage/cache_probe.h @@ -37,10 +37,15 @@ class cache_probe { void put_started() { ++_cur_in_progress_files; } void put_ended() { --_cur_in_progress_files; } + void set_tracker_size(uint64_t size) { _tracker_size = size; } + void fast_trim() { ++_fast_trims; } void exhaustive_trim() { ++_exhaustive_trims; } void carryover_trim() { ++_carryover_trims; } void failed_trim() { ++_failed_trims; } + void in_mem_trim() { ++_in_mem_trims; } + + void tracker_sync() { ++_tracker_syncs; } private: uint64_t _num_puts = 0; @@ -53,11 +58,15 @@ class cache_probe { int64_t _cur_num_files = 0; int64_t _hwm_num_files = 0; int64_t _cur_in_progress_files = 0; + uint64_t _tracker_size{0}; int64_t _fast_trims{0}; int64_t _exhaustive_trims{0}; int64_t _carryover_trims{0}; int64_t _failed_trims{0}; + int64_t _in_mem_trims{0}; + + uint64_t _tracker_syncs{0}; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; diff --git a/src/v/cloud_storage/cache_service.cc b/src/v/cloud_storage/cache_service.cc index 9ade85b43b28b..66a2c9184b9b9 100644 --- a/src/v/cloud_storage/cache_service.cc +++ b/src/v/cloud_storage/cache_service.cc @@ -68,7 +68,7 @@ std::ostream& operator<<(std::ostream& o, cache_element_status s) { return o; } -static constexpr std::string_view tmp_extension{".part"}; +static constexpr auto tracker_sync_period = 3600s * 6; cache::cache( std::filesystem::path cache_dir, @@ -190,7 +190,8 @@ uint64_t cache::get_total_cleaned() { return _total_cleaned; } ss::future<> cache::clean_up_at_start() { auto guard = _gate.hold(); - auto [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs] + auto + [walked_size, filtered_out_files, candidates_for_deletion, empty_dirs, _] = co_await _walker.walk( _cache_dir.native(), _access_time_tracker, _walk_concurrency()); @@ -201,7 +202,21 @@ ss::future<> cache::clean_up_at_start() { // The state of the _access_time_tracker and the actual content of the // cache directory might diverge over time (if the user removes segment // files manually). We need to take this into account. - co_await _access_time_tracker.trim(candidates_for_deletion); + + // On startup we perform a bi-directional sync, IE entries found during + // directory walk which are not in tracker are added to it. This covers the + // following scenarios: + // 1. Following an upgrade, the tracker was loaded as empty to discard + // previous serialized data. Now we need to rehydrate the tracker and it is + // easier to do it now than wait for get requests to do this. + // 2. In a previous run the tracker had entries which it was not able to + // write to disk due to a crash. A directory walk will bring the tracker to + // an up to date state. + co_await _access_time_tracker.sync( + candidates_for_deletion, access_time_tracker::add_entries_t::yes); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); uint64_t deleted_bytes{0}; size_t deleted_count{0}; @@ -209,7 +224,8 @@ ss::future<> cache::clean_up_at_start() { auto filepath_to_remove = file_item.path; // delete only tmp files that are left from previous RedPanda run - if (std::string_view(filepath_to_remove).ends_with(tmp_extension)) { + if (std::string_view(filepath_to_remove) + .ends_with(cache_tmp_file_extension)) { try { co_await delete_file_and_empty_parents(filepath_to_remove); deleted_bytes += file_item.size; @@ -315,20 +331,6 @@ ss::future<> cache::trim( std::optional object_limit_override) { vassert(ss::this_shard_id() == 0, "Method can only be invoked on shard 0"); auto guard = _gate.hold(); - auto [walked_cache_size, filtered_out_files, candidates_for_deletion, _] - = co_await _walker.walk( - _cache_dir.native(), - _access_time_tracker, - _walk_concurrency(), - [](std::string_view path) { - return !( - std::string_view(path).ends_with(".tx") - || std::string_view(path).ends_with(".index")); - }); - - // Updating the access time tracker in case if some files were removed - // from cache directory by the user manually. - co_await _access_time_tracker.trim(candidates_for_deletion); auto size_limit = size_limit_override.value_or(_max_bytes); auto object_limit = object_limit_override.value_or(_max_objects()); @@ -368,15 +370,98 @@ ss::future<> cache::trim( target_size); } - // Calculate total space used by tmp files: we will use this later - // when updating current_cache_size. - uint64_t tmp_files_size{0}; - for (const auto& i : candidates_for_deletion) { - if (std::string_view(i.path).ends_with(tmp_extension)) { - tmp_files_size += i.size; - } + if ( + _current_cache_size + _reserved_cache_size < target_size + && _current_cache_objects + _reserved_cache_objects < target_objects) { + // Exit early if we are already within the target + co_return; + } + + // Calculate how much to delete + auto size_to_delete + = (_current_cache_size + _reserved_cache_size) + - std::min(target_size, _current_cache_size + _reserved_cache_size); + auto objects_to_delete + = _current_cache_objects + _reserved_cache_objects + - std::min( + target_objects, _current_cache_objects + _reserved_cache_objects); + + auto tracker_lru_entries = _access_time_tracker.lru_entries(); + vlog( + cst_log.debug, + "in-memory trim: set target_size {}/{}, size {}/{}, reserved {}/{}, " + "pending {}/{}), candidates for deletion: {}, size to delete: {}, " + "objects to delete: {}", + target_size, + target_objects, + _current_cache_size, + _current_cache_objects, + _reserved_cache_size, + _reserved_cache_objects, + _reservations_pending, + _reservations_pending_objects, + tracker_lru_entries.size(), + size_to_delete, + objects_to_delete); + + auto trim_result = co_await do_trim( + tracker_lru_entries, size_to_delete, objects_to_delete); + + probe.in_mem_trim(); + vlog( + cst_log.debug, + "in-memory trim result: deleted size: {}, deleted count: {}", + trim_result.deleted_size, + trim_result.deleted_count); + + _total_cleaned += trim_result.deleted_size; + probe.set_size(_current_cache_size); + probe.set_num_files(_current_cache_objects); + + size_to_delete -= std::min(trim_result.deleted_size, size_to_delete); + objects_to_delete -= std::min(trim_result.deleted_count, objects_to_delete); + + // Subsequent calculations require knowledge of how much data cannot + // possibly be deleted (because all trims skip it) in order to decide + // whether the trim worked properly. + static constexpr size_t undeletable_objects = 1; + auto undeletable_bytes = (co_await access_time_tracker_size()).value_or(0); + + if ( + size_to_delete < undeletable_bytes + && objects_to_delete < undeletable_objects) { + _last_clean_up = ss::lowres_clock::now(); + _last_trim_failed = false; + co_return; } + // We are going to do a walk, rearm the periodic tracker sync if it is about + // to run soon. + _tracker_sync_timer.rearm(ss::lowres_clock::now() + tracker_sync_period); + + auto + [walked_cache_size, + filtered_out_files, + candidates_for_deletion, + _, + tmp_files_size] + = co_await _walker.walk( + _cache_dir.native(), + _access_time_tracker, + _walk_concurrency(), + [](std::string_view path) { + return !( + std::string_view(path).ends_with(".tx") + || std::string_view(path).ends_with(".index")); + }); + + // Updating the access time tracker in case if some files were removed + // from cache directory by the user manually. + co_await _access_time_tracker.sync(candidates_for_deletion); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); + vlog( cst_log.debug, "trim: set target_size {}/{}, size {}/{}, walked size {} (max {}/{}), " @@ -396,28 +481,12 @@ ss::future<> cache::trim( candidates_for_deletion.size(), filtered_out_files); - if ( - _current_cache_size + _reserved_cache_size < target_size - && _current_cache_objects + _reserved_cache_objects < target_objects) { - // Exit early if we are already within the target - co_return; - } - // Sort by atime for the subsequent LRU trimming loop std::sort( candidates_for_deletion.begin(), candidates_for_deletion.end(), [](auto& a, auto& b) { return a.access_time < b.access_time; }); - // Calculate how much to delete - auto size_to_delete - = (_current_cache_size + _reserved_cache_size) - - std::min(target_size, _current_cache_size + _reserved_cache_size); - auto objects_to_delete - = _current_cache_objects + _reserved_cache_objects - - std::min( - target_objects, _current_cache_objects + _reserved_cache_objects); - vlog( cst_log.debug, "trim: removing {}/{} bytes, {}/{} objects ({}% of cache) to reach " @@ -432,15 +501,9 @@ ss::future<> cache::trim( tmp_files_size); // Execute the ordinary trim, prioritize removing - auto fast_result = co_await trim_fast( + trim_result = co_await trim_fast( candidates_for_deletion, size_to_delete, objects_to_delete); - // Subsequent calculations require knowledge of how much data cannot - // possibly be deleted (because all trims skip it) in order to decide - // whether the trim worked properly. - static constexpr size_t undeletable_objects = 1; - auto undeletable_bytes = (co_await access_time_tracker_size()).value_or(0); - // We aim to keep current_cache_size continuously up to date, but // in case of housekeeping issues, correct it if it apepars to have // drifted too far from the result of our directory walk. @@ -448,7 +511,7 @@ ss::future<> cache::trim( // by the amount of data currently in tmp files, because they may be // updated while the walk is happening. uint64_t cache_size_lower_bound = walked_cache_size - - fast_result.deleted_size + - trim_result.deleted_size - tmp_files_size - undeletable_bytes; if (_current_cache_size < cache_size_lower_bound) { vlog( @@ -459,7 +522,7 @@ ss::future<> cache::trim( _current_cache_size = cache_size_lower_bound; _current_cache_objects = filtered_out_files + candidates_for_deletion.size() - - fast_result.deleted_count; + - trim_result.deleted_count; } const auto cache_entries_before_trim = candidates_for_deletion.size() @@ -468,29 +531,29 @@ ss::future<> cache::trim( vlog( cst_log.debug, "trim: deleted {}/{} files of total size {}. Undeletable size {}.", - fast_result.deleted_count, + trim_result.deleted_count, cache_entries_before_trim, - fast_result.deleted_size, + trim_result.deleted_size, undeletable_bytes); - _total_cleaned += fast_result.deleted_size; + _total_cleaned += trim_result.deleted_size; probe.set_size(_current_cache_size); - probe.set_num_files(cache_entries_before_trim - fast_result.deleted_count); + probe.set_num_files(cache_entries_before_trim - trim_result.deleted_count); - size_to_delete -= std::min(fast_result.deleted_size, size_to_delete); - objects_to_delete -= std::min(fast_result.deleted_count, objects_to_delete); + size_to_delete -= std::min(trim_result.deleted_size, size_to_delete); + objects_to_delete -= std::min(trim_result.deleted_count, objects_to_delete); // Before we (maybe) proceed to do an exhaustive trim, make sure we're not // trying to trim more data than was physically seen while walking the // cache. size_to_delete = std::min( - walked_cache_size - fast_result.deleted_size, size_to_delete); + walked_cache_size - trim_result.deleted_size, size_to_delete); // If we were not able to delete enough files and there are some filtered // out files, force an exhaustive trim. This ensures that if the cache is // dominated by filtered out files, we do not skip trimming them by reducing // the objects_to_delete counter next. - bool force_exhaustive_trim = fast_result.deleted_count < objects_to_delete + bool force_exhaustive_trim = trim_result.deleted_count < objects_to_delete && filtered_out_files > 0; // In the situation where all files in cache are filtered out, @@ -500,7 +563,7 @@ ss::future<> cache::trim( // force_exhaustive_trim avoids this. if (!force_exhaustive_trim) { objects_to_delete = std::min( - candidates_for_deletion.size() - fast_result.deleted_count, + candidates_for_deletion.size() - trim_result.deleted_count, objects_to_delete); } @@ -626,7 +689,7 @@ cache::remove_segment_full(const file_list_item& file_stat) { // Remove key if possible to make sure there is no resource // leak - _access_time_tracker.remove_timestamp(std::string_view(file_stat.path)); + _access_time_tracker.remove(file_stat.path); vlog( cst_log.trace, @@ -651,7 +714,13 @@ ss::future cache::trim_fast( uint64_t size_to_delete, size_t objects_to_delete) { probe.fast_trim(); + co_return co_await do_trim(candidates, size_to_delete, objects_to_delete); +} +ss::future cache::do_trim( + const fragmented_vector& candidates, + uint64_t size_to_delete, + size_t objects_to_delete) { trim_result result; // Reset carryover list @@ -663,7 +732,8 @@ ss::future cache::trim_fast( } // skip tmp files since someone may be writing to it - if (std::string_view(file_stat.path).ends_with(tmp_extension)) { + if (std::string_view(file_stat.path) + .ends_with(cache_tmp_file_extension)) { return true; } @@ -737,7 +807,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // Enumerate ALL files in the cache (as opposed to trim_fast that strips out // indices/tx/tmp files) - auto [walked_cache_size, _filtered_out, candidates, _] + auto [walked_cache_size, _filtered_out, candidates, _, tmp_files_size] = co_await _walker.walk( _cache_dir.native(), _access_time_tracker, _walk_concurrency()); @@ -772,8 +842,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // exhaustive trim because they are occupying too much space. try { co_await delete_file_and_empty_parents(file_stat.path); - _access_time_tracker.remove_timestamp( - std::string_view(file_stat.path)); + _access_time_tracker.remove(file_stat.path); _current_cache_size -= std::min( file_stat.size, _current_cache_size); @@ -792,7 +861,7 @@ cache::trim_exhaustive(uint64_t size_to_delete, size_t objects_to_delete) { // We are shutting down, stop iterating and propagate throw; } catch (const std::filesystem::filesystem_error& e) { - if (likely(file_stat.path.ends_with(tmp_extension))) { + if (likely(file_stat.path.ends_with(cache_tmp_file_extension))) { // In exhaustive scan we might hit a .part file and get ENOENT, // this is expected behavior occasionally. result.trim_missed_tmp_files = true; @@ -942,15 +1011,30 @@ ss::future<> cache::start() { }); }); _tracker_timer.arm_periodic(access_timer_period); + + _tracker_sync_timer.set_callback([this] { + ssx::spawn_with_gate(_gate, [this]() -> ss::future<> { + return sync_access_time_tracker().handle_exception( + [](auto eptr) { + vlog( + cst_log.error, + "failed to sync access time tracker: {}", + eptr); + }); + }); + }); + _tracker_sync_timer.arm(tracker_sync_period); } } ss::future<> cache::stop() { vlog(cst_log.debug, "Stopping archival cache service"); _tracker_timer.cancel(); + _tracker_sync_timer.cancel(); _as.request_abort(); _block_puts_cond.broken(); _cleanup_sm.broken(); + _tracker_sync_timer_sem.broken(); if (ss::this_shard_id() == 0) { co_await save_access_time_tracker().handle_exception([](auto eptr) { // NOTE: see issue/11270 if the exception is "filesystem error: @@ -1052,19 +1136,22 @@ ss::future> cache::_get(std::filesystem::path key) { vlog(cst_log.debug, "Trying to get {} from archival cache.", key.native()); probe.get(); ss::file cache_file; + + size_t data_size{0}; try { auto source = (_cache_dir / key).native(); cache_file = co_await ss::open_file_dma(source, ss::open_flags::ro); + data_size = co_await cache_file.size(); // Bump access time of the file if (ss::this_shard_id() == 0) { - _access_time_tracker.add_timestamp( - source, std::chrono::system_clock::now()); + _access_time_tracker.add( + source, std::chrono::system_clock::now(), data_size); } else { - ssx::spawn_with_gate(_gate, [this, source] { - return container().invoke_on(0, [source](cache& c) { - c._access_time_tracker.add_timestamp( - source, std::chrono::system_clock::now()); + ssx::spawn_with_gate(_gate, [this, source, data_size] { + return container().invoke_on(0, [source, data_size](cache& c) { + c._access_time_tracker.add( + source, std::chrono::system_clock::now(), data_size); }); }); } @@ -1077,7 +1164,6 @@ ss::future> cache::_get(std::filesystem::path key) { } } - auto data_size = co_await cache_file.size(); probe.cached_get(); co_return std::optional(cache_item{std::move(cache_file), data_size}); } @@ -1120,11 +1206,12 @@ ss::future<> cache::put( probe.put_ended(); }); auto filename = normal_key_path.filename(); - if (std::string_view(filename.native()).ends_with(tmp_extension)) { + if (std::string_view(filename.native()) + .ends_with(cache_tmp_file_extension)) { throw std::invalid_argument(fmt::format( "Cache file key {} is ending with tmp extension {}.", normal_key_path.native(), - tmp_extension)); + cache_tmp_file_extension)); } auto dir_path = normal_key_path.remove_filename(); @@ -1138,7 +1225,7 @@ ss::future<> cache::put( filename.native(), ss::this_shard_id(), (++_cnt), - tmp_extension)); + cache_tmp_file_extension)); ss::file tmp_cache_file; while (true) { @@ -1264,7 +1351,7 @@ ss::future<> cache::_invalidate(const std::filesystem::path& key) { try { auto path = (_cache_dir / key).native(); auto stat = co_await ss::file_stat(path); - _access_time_tracker.remove_timestamp(key.native()); + _access_time_tracker.remove(key.native()); co_await delete_file_and_empty_parents(path); _current_cache_size -= stat.size; _current_cache_objects -= 1; @@ -1458,7 +1545,8 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { // Don't hit access time tracker file/tmp if ( is_trim_exempt(file_stat.path) - || std::string_view(file_stat.path).ends_with(tmp_extension)) { + || std::string_view(file_stat.path) + .ends_with(cache_tmp_file_extension)) { continue; } // Both tx and index files are handled as part of the segment @@ -1472,15 +1560,16 @@ cache::trim_carryover(uint64_t delete_bytes, uint64_t delete_objects) { auto rel_path = _cache_dir / std::filesystem::relative( std::filesystem::path(file_stat.path), _cache_dir); - auto estimate = _access_time_tracker.estimate_timestamp( - rel_path.native()); - if (estimate != file_stat.access_time) { + + if (auto estimate = _access_time_tracker.get(rel_path.native()); + estimate.has_value() + && estimate->time_point() != file_stat.access_time) { vlog( cst_log.trace, "carryover file {} was accessed ({}) since the last trim ({}), " "ignoring", rel_path.native(), - estimate->time_since_epoch().count(), + estimate->atime_sec, file_stat.access_time.time_since_epoch().count()); // The file was accessed since we get the stats continue; @@ -1815,4 +1904,52 @@ ss::future<> cache::initialize(std::filesystem::path cache_dir) { co_await ss::recursive_touch_directory(cache_dir.string()); } } + +ss::future<> cache::sync_access_time_tracker( + access_time_tracker::add_entries_t add_entries) { + if (_cleanup_sm.available_units() <= 0) { + vlog( + cst_log.debug, + "syncing access time tracker postponed, trim is running"); + _tracker_sync_timer.rearm( + ss::lowres_clock::now() + tracker_sync_period); + co_return; + } + + if (_tracker_sync_timer_sem.try_wait()) { + vlog(cst_log.debug, "syncing access time tracker with disk"); + auto [cache_size, filtered_out, items, empty_dirs, tmp_files_size] + = co_await _walker.walk( + _cache_dir.native(), _access_time_tracker, _walk_concurrency()); + + co_await _access_time_tracker.sync(items, add_entries); + vlog( + cst_log.debug, + "syncing access time tracker with disk complete: cache size {}, " + "items: {}", + cache_size, + items.size()); + + const auto tracker_size + = (co_await access_time_tracker_size()).value_or(0); + + _current_cache_size = cache_size - tmp_files_size - tracker_size; + _current_cache_objects = items.size(); + + probe.set_size(_current_cache_size); + probe.set_num_files(_current_cache_objects); + + probe.tracker_sync(); + probe.set_tracker_size(_access_time_tracker.size()); + + _tracker_sync_timer.rearm( + ss::lowres_clock::now() + tracker_sync_period); + } else { + vlog( + cst_log.debug, + "syncing access time tracker with disk skipped, sync is already " + "running"); + } +} + } // namespace cloud_storage diff --git a/src/v/cloud_storage/cache_service.h b/src/v/cloud_storage/cache_service.h index aa12d45d8aad3..e88040310f505 100644 --- a/src/v/cloud_storage/cache_service.h +++ b/src/v/cloud_storage/cache_service.h @@ -235,6 +235,11 @@ class cache : public ss::peering_sharded_service { uint64_t delete_bytes, size_t delete_objects); + ss::future do_trim( + const fragmented_vector& candidates, + uint64_t delete_bytes, + size_t delete_objects); + /// Exhaustive trim: walk all files including indices, remove whatever is /// least recently accessed. ss::future @@ -307,6 +312,9 @@ class cache : public ss::peering_sharded_service { ss::future remove_segment_full(const file_list_item& file_stat); + ss::future<> sync_access_time_tracker( + access_time_tracker::add_entries_t add_entries + = access_time_tracker::add_entries_t::no); std::filesystem::path _cache_dir; size_t _disk_size; config::binding _disk_reservation; @@ -377,6 +385,10 @@ class cache : public ss::peering_sharded_service { // List of probable deletion candidates from the last trim. std::optional> _last_trim_carryover; + + ss::timer _tracker_sync_timer; + ssx::semaphore _tracker_sync_timer_sem{ + 1, "cloud/cache/access_tracker_sync"}; }; } // namespace cloud_storage diff --git a/src/v/cloud_storage/recursive_directory_walker.cc b/src/v/cloud_storage/recursive_directory_walker.cc index e6c3cc5f57e14..a90ab34d103fe 100644 --- a/src/v/cloud_storage/recursive_directory_walker.cc +++ b/src/v/cloud_storage/recursive_directory_walker.cc @@ -43,24 +43,34 @@ struct walk_accumulator { ss::future<> visit(ss::sstring const& target, ss::directory_entry entry) { auto entry_path = fmt::format("{}/{}", target, entry.name); if (entry.type && entry.type == ss::directory_entry_type::regular) { - auto file_stats = co_await ss::file_stat(entry_path); + size_t file_size{0}; + std::chrono::system_clock::time_point atime; + if (const auto tracker_entry = tracker.get(entry_path); + tracker_entry.has_value()) { + file_size = tracker_entry->size; + atime = tracker_entry->time_point(); + } else { + auto file_stats = co_await ss::file_stat(entry_path); + file_size = file_stats.size; + atime = file_stats.time_accessed; + } vlog( cst_log.debug, "Regular file found {} ({})", entry_path, - file_stats.size); - - auto last_access_timepoint = tracker.estimate_timestamp(entry_path) - .value_or(file_stats.time_accessed); + file_size); - current_cache_size += static_cast(file_stats.size); + current_cache_size += static_cast(file_size); + if (entry_path.ends_with(cache_tmp_file_extension)) { + tmp_files_size += file_size; + } if (!filter || filter.value()(entry_path)) { files.push_back( - {last_access_timepoint, + {atime, (std::filesystem::path(target) / entry.name.data()).native(), - static_cast(file_stats.size)}); + static_cast(file_size)}); } else if (filter) { ++filtered_out_files; } @@ -85,6 +95,7 @@ struct walk_accumulator { fragmented_vector files; uint64_t current_cache_size{0}; size_t filtered_out_files{0}; + size_t tmp_files_size{0}; }; } // namespace cloud_storage @@ -191,7 +202,8 @@ ss::future recursive_directory_walker::walk( .cache_size = state.current_cache_size, .filtered_out_files = state.filtered_out_files, .regular_files = std::move(state.files), - .empty_dirs = std::move(empty_dirs)}; + .empty_dirs = std::move(empty_dirs), + .tmp_files_size = state.tmp_files_size}; } ss::future<> recursive_directory_walker::stop() { diff --git a/src/v/cloud_storage/recursive_directory_walker.h b/src/v/cloud_storage/recursive_directory_walker.h index 080660cfd502a..1cabfcd20a4ff 100644 --- a/src/v/cloud_storage/recursive_directory_walker.h +++ b/src/v/cloud_storage/recursive_directory_walker.h @@ -20,6 +20,8 @@ namespace cloud_storage { +constexpr auto cache_tmp_file_extension{".part"}; + class access_time_tracker; struct file_list_item { @@ -33,6 +35,7 @@ struct walk_result { size_t filtered_out_files{0}; fragmented_vector regular_files; fragmented_vector empty_dirs; + size_t tmp_files_size{0}; }; class recursive_directory_walker { diff --git a/src/v/cloud_storage/tests/cache_bench.cc b/src/v/cloud_storage/tests/cache_bench.cc index f16a48731470f..70e5c176b5199 100644 --- a/src/v/cloud_storage/tests/cache_bench.cc +++ b/src/v/cloud_storage/tests/cache_bench.cc @@ -28,7 +28,7 @@ static void run_test(int test_scale) { for (int i = 0; i < test_scale; i++) { perf_tests::start_measuring_time(); - tracker.add_timestamp(names[i % test_scale], make_ts(i)); + tracker.add(names[i % test_scale], make_ts(i), 0); perf_tests::stop_measuring_time(); } } diff --git a/src/v/cloud_storage/tests/cache_test.cc b/src/v/cloud_storage/tests/cache_test.cc index 8c2d5b21fbd64..e8208e2cec360 100644 --- a/src/v/cloud_storage/tests/cache_test.cc +++ b/src/v/cloud_storage/tests/cache_test.cc @@ -332,7 +332,7 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker) { make_ts(1653000009), }; - std::vector names = { + std::vector names = { "key0", "key1", "key2", @@ -346,20 +346,23 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker) { }; for (int i = 0; i < 10; i++) { - cm.add_timestamp(names[i], timestamps[i]); + cm.add(names[i], timestamps[i], i); } for (int i = 0; i < 10; i++) { - auto ts = cm.estimate_timestamp(names[i]); - BOOST_REQUIRE(ts.value() >= timestamps[i]); + auto ts = cm.get(names[i]); + BOOST_REQUIRE(ts.has_value()); + BOOST_REQUIRE(ts->time_point() == timestamps[i]); + BOOST_REQUIRE(ts->size == i); } } -static access_time_tracker serde_roundtrip(access_time_tracker& t) { +static access_time_tracker serde_roundtrip( + access_time_tracker& t, tracker_version version = tracker_version::v2) { // Round trip iobuf serialized; auto out_stream = make_iobuf_ref_output_stream(serialized); - t.write(out_stream).get(); + t.write(out_stream, version).get(); out_stream.flush().get(); access_time_tracker out; @@ -393,7 +396,7 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer) { make_ts(0xbeefed09), }; - std::vector names = { + std::vector names = { "key0", "key1", "key2", @@ -407,15 +410,16 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer) { }; for (int i = 0; i < 10; i++) { - in.add_timestamp(names[i], timestamps[i]); + in.add(names[i], timestamps[i], i); } auto out = serde_roundtrip(in); for (int i = 0; i < timestamps.size(); i++) { - auto ts = out.estimate_timestamp(names[i]); + auto ts = out.get(names[i]); BOOST_REQUIRE(ts.has_value()); - BOOST_REQUIRE(ts.value() >= timestamps[i]); + BOOST_REQUIRE(ts->time_point() == timestamps[i]); + BOOST_REQUIRE(ts->size == i); } } @@ -426,11 +430,24 @@ SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_serializer_large) { // to verify the chunking code works properly; uint32_t item_count = 7777; for (uint32_t i = 0; i < item_count; i++) { - in.add_timestamp(fmt::format("key{:08x}", i), make_ts(i)); + in.add(fmt::format("key{:08x}", i), make_ts(i), i); } auto out = serde_roundtrip(in); BOOST_REQUIRE_EQUAL(out.size(), item_count); + for (size_t i = 0; i < item_count; ++i) { + const auto entry = in.get(fmt::format("key{:08x}", i)); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, i); + BOOST_REQUIRE_EQUAL(entry->atime_sec, i); + } +} + +SEASTAR_THREAD_TEST_CASE(test_access_time_tracker_read_skipped_on_old_version) { + access_time_tracker in; + in.add("key", make_ts(0), 0); + auto out = serde_roundtrip(in, tracker_version::v1); + BOOST_REQUIRE_EQUAL(out.size(), 0); } /** @@ -702,3 +719,52 @@ FIXTURE_TEST(test_background_maybe_trim, cache_test_fixture) { // = 40%. +1 for the object we just added. BOOST_REQUIRE_EQUAL(get_object_count(), 41); } + +FIXTURE_TEST(test_tracker_sync_only_remove, cache_test_fixture) { + put_into_cache(create_data_string('a', 1_KiB), KEY); + auto& cache = sharded_cache.local(); + cache.get(KEY).get(); + + const auto full_key_path = CACHE_DIR / KEY; + + const auto& t = tracker(); + BOOST_REQUIRE_EQUAL(t.size(), 1); + + { + const auto entry = t.get(full_key_path.native()); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); + } + + ss::remove_file(full_key_path.native()).get(); + + { + const auto entry = t.get(full_key_path.native()); + BOOST_REQUIRE(entry.has_value()); + BOOST_REQUIRE_EQUAL(entry->size, 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1_KiB); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); + } + + sync_tracker(); + + BOOST_REQUIRE_EQUAL(t.size(), 0); + BOOST_REQUIRE(!t.get(full_key_path.native()).has_value()); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 0); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 0); +} + +FIXTURE_TEST(test_tracker_sync_add_remove, cache_test_fixture) { + put_into_cache(create_data_string('a', 1_KiB), KEY); + auto& cache = sharded_cache.local(); + const auto full_key_path = CACHE_DIR / KEY; + const auto& t = tracker(); + BOOST_REQUIRE_EQUAL(t.size(), 0); + sync_tracker(access_time_tracker::add_entries_t::yes); + BOOST_REQUIRE_EQUAL(t.size(), 1); + BOOST_REQUIRE(t.get(full_key_path.native()).has_value()); + BOOST_REQUIRE_EQUAL(cache.get_usage_bytes(), 1024); + BOOST_REQUIRE_EQUAL(cache.get_usage_objects(), 1); +} diff --git a/src/v/cloud_storage/tests/cache_test_fixture.h b/src/v/cloud_storage/tests/cache_test_fixture.h index 33a3e44dfed36..26bfbf5edfa4d 100644 --- a/src/v/cloud_storage/tests/cache_test_fixture.h +++ b/src/v/cloud_storage/tests/cache_test_fixture.h @@ -12,6 +12,7 @@ #include "base/seastarx.h" #include "base/units.h" #include "bytes/iobuf.h" +#include "bytes/iostream.h" #include "cloud_storage/cache_service.h" #include "config/property.h" #include "test_utils/scoped_config.h" @@ -168,6 +169,16 @@ class cache_test_fixture { } scoped_config cfg; + + void sync_tracker( + access_time_tracker::add_entries_t add_entries + = access_time_tracker::add_entries_t::no) { + sharded_cache.local().sync_access_time_tracker(add_entries).get(); + } + + const access_time_tracker& tracker() const { + return sharded_cache.local()._access_time_tracker; + } }; } // namespace cloud_storage diff --git a/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py b/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py index 514517c4ae2c4..267494a6ee681 100644 --- a/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py +++ b/tests/rptest/scale_tests/tiered_storage_cache_stress_test.py @@ -6,7 +6,10 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import sys +from dataclasses import dataclass +from rptest.services.metrics_check import MetricCheck from rptest.tests.redpanda_test import RedpandaTest from rptest.services.cluster import cluster from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer, KgoVerifierRandomConsumer @@ -28,6 +31,18 @@ class LimitMode(str, Enum): both = 'both' +@dataclass +class CacheExpectations: + tracker_size: int = 0 + min_mem_trims: int = 0 + max_mem_trims: int = sys.maxsize + min_fast_trims: int = 0 + max_fast_trims: int = sys.maxsize + min_exhaustive_trims: int = 0 + max_exhaustive_trims: int = sys.maxsize + num_syncs: int = 0 + + class TieredStorageCacheStressTest(RedpandaTest): segment_upload_interval = 30 manifest_upload_interval = 10 @@ -48,6 +63,7 @@ def setUp(self): self.segment_upload_interval, 'cloud_storage_manifest_max_upload_interval_sec': self.manifest_upload_interval, + 'disable_public_metrics': False, } self.redpanda.set_extra_rp_conf(extra_rp_conf) @@ -93,8 +109,6 @@ def _validate_node_storage(self, node, limit_mode: LimitMode, hwm_size = int(sample.value) elif sample.name == "redpanda_cloud_storage_cache_space_hwm_files": hwm_objects = int(sample.value) - #else: - # self.logger.debug(sample.name) assert hwm_size is not None, "Cache HWM metric not found" any_cache_usage = (usage['cloud_storage_cache_bytes'] > 0 @@ -361,7 +375,9 @@ def tiny_cache_test(self): cache_size, max_objects=None) - def run_test_with_cache_prefilled(self, cache_prefill_command: str): + def run_test_with_cache_prefilled(self, cache_prefill_command: str, + prefill_count: int, + expectations: CacheExpectations): segment_size = 128 * 1024 * 1024 msg_size = 16384 data_size = segment_size * 10 @@ -374,7 +390,6 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): self.redpanda.clean_node(n) # Pre-populate caches with files. - prefill_count = 100 for node in self.redpanda.nodes: node.account.ssh(cache_prefill_command.format(prefill_count)) @@ -391,6 +406,7 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): # Bring up redpanda self.redpanda.set_si_settings(si_settings) + self.redpanda.start(clean_nodes=False) # Cache startup should have registered the garbage objects in stats @@ -398,10 +414,32 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): for node in self.redpanda.nodes: usage = admin.get_local_storage_usage(node) assert usage[ - 'cloud_storage_cache_objects'] >= prefill_count, f"Node {node.name} has unexpectedly few objects {usage['cloud_storage_cache_objects']} < {prefill_count}" + 'cloud_storage_cache_objects'] >= prefill_count, \ + (f"Node {node.name} has unexpectedly few objects " + f"{usage['cloud_storage_cache_objects']} < {prefill_count}") # Inject data self._create_topic(topic_name, 1, segment_size) + + trim_metrics = [ + 'redpanda_cloud_storage_cache_trim_in_mem_trims_total', + 'redpanda_cloud_storage_cache_trim_fast_trims_total', + 'redpanda_cloud_storage_cache_trim_exhaustive_trims_total', + 'redpanda_cloud_storage_cache_space_tracker_syncs_total', + 'redpanda_cloud_storage_cache_space_tracker_size' + ] + m = MetricCheck(self.redpanda.logger, + self.redpanda, + self.redpanda.partitions(topic_name)[0].leader, + trim_metrics, + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS) + + m.expect([('redpanda_cloud_storage_cache_space_tracker_syncs_total', + lambda a, b: a == expectations.num_syncs == b)]) + + m.expect([('redpanda_cloud_storage_cache_space_tracker_size', + lambda a, b: a == expectations.tracker_size == b)]) + self._produce_and_quiesce(topic_name, msg_size, data_size, expect_bandwidth) @@ -424,7 +462,19 @@ def run_test_with_cache_prefilled(self, cache_prefill_command: str): # reduce in number to the point that the read is able to proceed usage = admin.get_local_storage_usage(leader_node) assert usage[ - 'cloud_storage_cache_objects'] <= cache_object_limit, f"Node {leader_node.name} has unexpectedly many objects {usage['cloud_storage_cache_objects']} > {cache_object_limit}" + 'cloud_storage_cache_objects'] <= cache_object_limit, \ + (f"Node {leader_node.name} has unexpectedly many objects " + f"{usage['cloud_storage_cache_objects']} > {cache_object_limit}") + + m.expect([('redpanda_cloud_storage_cache_trim_in_mem_trims_total', + lambda a, b: a == 0 and expectations.max_mem_trims >= b >= + expectations.min_mem_trims)]) + m.expect([('redpanda_cloud_storage_cache_trim_fast_trims_total', + lambda a, b: a == 0 and expectations.max_fast_trims >= b >= + expectations.min_fast_trims)]) + m.expect([('redpanda_cloud_storage_cache_trim_exhaustive_trims_total', + lambda a, b: a == 0 and expectations.max_exhaustive_trims >= + b >= expectations.min_exhaustive_trims)]) @cluster(num_nodes=4, log_allow_list=S3_ERROR_LOGS) def garbage_objects_test(self): @@ -438,11 +488,22 @@ def garbage_objects_test(self): https://github.com/redpanda-data/redpanda/issues/11835 """ + prefill_count = 100 + expectations = CacheExpectations( + num_syncs=1, + # The tracker will add all non-index/tx/tmp files during startup sync + tracker_size=prefill_count, + # Tracker based trim should be enough to acquire space + min_mem_trims=1, + min_fast_trims=0, + max_fast_trims=0, + min_exhaustive_trims=0, + max_exhaustive_trims=0) self.run_test_with_cache_prefilled( f"mkdir -p {self.redpanda.cache_dir} ; " "for n in `seq 1 {}`; do " - f"dd if=/dev/urandom bs=1k count=4 of={self.redpanda.cache_dir}/garbage_$n.bin ; done" - ) + f"dd if=/dev/urandom bs=1k count=4 of={self.redpanda.cache_dir}/garbage_$n.bin ; done", + prefill_count, expectations) @cluster(num_nodes=4, log_allow_list=S3_ERROR_LOGS) def test_indices_dominate_cache(self): @@ -450,9 +511,20 @@ def test_indices_dominate_cache(self): Ensures that if the cache is filled with index and tx objects alone, trimming still works. """ + + prefill_count = 100 + expectations = CacheExpectations( + num_syncs=1, + # Neither index nor tx files will be added to tracker during startup sync + tracker_size=0, + # Since orphan index and tx files are only cleaned up in exhaustive trim, + # all three trims have to run to acquire space + min_mem_trims=1, + min_fast_trims=1, + min_exhaustive_trims=1) self.run_test_with_cache_prefilled( f"mkdir -pv {self.redpanda.cache_dir}; " "for n in `seq 1 {}`; do " f"touch {self.redpanda.cache_dir}/garbage_$n.index && " f"touch {self.redpanda.cache_dir}/garbage_$n.tx; " - "done") + "done", prefill_count, expectations) diff --git a/tests/rptest/tests/cloud_storage_chunk_read_path_test.py b/tests/rptest/tests/cloud_storage_chunk_read_path_test.py index 0d2c5c1bd84f7..5baef70c46040 100644 --- a/tests/rptest/tests/cloud_storage_chunk_read_path_test.py +++ b/tests/rptest/tests/cloud_storage_chunk_read_path_test.py @@ -183,7 +183,10 @@ def _assert_in_cache(self, expr: str): num_nodes=4, log_allow_list=[ # Ignore trim related errors caused by deleting chunk files manually. - "failed to free sufficient space in exhaustive trim" + "failed to free sufficient space in exhaustive trim", + # With tracker based trim and the manual deletes in this test, sometimes + # the cache tries to trim chunks which have already been removed externally. + "filesystem error: remove failed: No such file or directory" ]) def test_read_chunks(self): self.default_chunk_size = 1048576