From 9ac6534f645bebe4e5b044d258632cafa316821b Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 17 May 2023 15:34:01 -0700 Subject: [PATCH 01/11] storage: split disk usage function into helper The state computed and reported by the public disk_usage function is going to grow, and so split this function up so that it doesn't become too unwieldy. Signed-off-by: Noah Watkins --- src/v/storage/disk_log_impl.cc | 24 ++++++++++++++++++++---- src/v/storage/disk_log_impl.h | 3 +++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index ef696253aa6a..0c7031e87df7 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1808,10 +1808,13 @@ log make_disk_backed_log( return log(ptr); } -ss::future disk_log_impl::disk_usage(gc_config cfg) { - // protect against concurrent log removal with housekeeping loop - auto gate = _compaction_housekeeping_gate.hold(); - +/* + * disk usage and amount reclaimable are best computed together. + * + * assumes that the compaction gate is held. + */ +ss::future> +disk_log_impl::disk_usage_and_reclaim(gc_config cfg) { std::optional max_offset; if (config().is_collectable()) { cfg = apply_overrides(cfg); @@ -1903,6 +1906,19 @@ ss::future disk_log_impl::disk_usage(gc_config cfg) { .available = retention.total() + available.total(), }; + co_return std::make_pair(usage, reclaim); +} + +ss::future disk_log_impl::disk_usage(gc_config cfg) { + // protect against concurrent log removal with housekeeping loop + auto gate = _compaction_housekeeping_gate.hold(); + + /* + * compute the amount of current disk usage as well as the amount available + * for being reclaimed. + */ + auto [usage, reclaim] = co_await disk_usage_and_reclaim(cfg); + co_return usage_report{ .usage = usage, .reclaim = reclaim, diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 7e7fe600448c..e10908e3d434 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -188,6 +188,9 @@ class disk_log_impl final : public log::impl { std::optional retention_offset(gc_config); + ss::future> + disk_usage_and_reclaim(gc_config); + private: size_t max_segment_size() const; // Computes the segment size based on the latest max_segment_size From d189bf54ec64965e53765bc2b794d7ebf4940c3f Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 10 May 2023 20:22:17 -0700 Subject: [PATCH 02/11] storage: report minimum capacity needed for logs When collecting a storage report, we add a new section that descibes target sizes with different meanings. This commit adds a minimum capacity target which is the minimum amount of capacity that log storage needs to be able function at a bare minimum. The minimum is computed as the max segment size (potentially different for each partition) * the minimum reserved number of segments (configurable) * the total number of partitions. Signed-off-by: Noah Watkins --- src/v/config/configuration.cc | 14 ++++++++++++++ src/v/config/configuration.h | 1 + src/v/redpanda/admin/api-doc/debug.json | 4 ++++ src/v/redpanda/admin_server.cc | 1 + src/v/storage/disk_log_impl.cc | 20 ++++++++++++++++++++ src/v/storage/disk_log_impl.h | 1 + src/v/storage/types.h | 24 ++++++++++++++++++++++++ 7 files changed, 65 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index e5e4615bb3f1..4135333acea8 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -930,6 +930,20 @@ configuration::configuration() .example = "3600", .visibility = visibility::tunable}, std::nullopt) + , storage_reserve_min_segments( + *this, + "storage_reserve_min_segments", + "The number of segments per partition that the system will attempt to " + "reserve disk capcity for. For example, if the maximum segment size is " + "configured to be 100 MB, and the value of this option is 2, then in a " + "system with 10 partitions Redpanda will attempt to reserve at least 2 " + "GB " + "of disk space.", + {.needs_restart = needs_restart::no, + .example = "4", + .visibility = visibility::tunable}, + 2, + {.min = 1}) , id_allocator_log_capacity( *this, "id_allocator_log_capacity", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index df60d30d3483..a52109c4ca5a 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -193,6 +193,7 @@ struct configuration final : public config_store { property max_compacted_log_segment_size; property> storage_ignore_timestamps_in_future_sec; + bounded_property storage_reserve_min_segments; property id_allocator_log_capacity; property id_allocator_batch_size; diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index 048b686ad16d..df062d0cc01f 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -800,6 +800,10 @@ "reclaimable_by_retention": { "type": "long", "description": "Number of bytes currently reclaimable by retention" + }, + "target_min_capacity": { + "type": "long", + "description": "Target minimum capacity" } } } diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 4ff6f1bd9162..6c5400708489 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -4177,6 +4177,7 @@ admin_server::get_local_storage_usage_handler( ret.index = disk.usage.index; ret.compaction = disk.usage.compaction; ret.reclaimable_by_retention = disk.reclaim.retention; + ret.target_min_capacity = disk.target.min_capacity; co_return ret; } diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 0c7031e87df7..4c742d86e5f1 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1909,6 +1909,19 @@ disk_log_impl::disk_usage_and_reclaim(gc_config cfg) { co_return std::make_pair(usage, reclaim); } +/* + * assumes that the compaction gate is held. + */ +ss::future disk_log_impl::disk_usage_target(gc_config) { + usage_target target; + + target.min_capacity + = config::shard_local_cfg().storage_reserve_min_segments() + * max_segment_size(); + + co_return target; +} + ss::future disk_log_impl::disk_usage(gc_config cfg) { // protect against concurrent log removal with housekeeping loop auto gate = _compaction_housekeeping_gate.hold(); @@ -1919,9 +1932,16 @@ ss::future disk_log_impl::disk_usage(gc_config cfg) { */ auto [usage, reclaim] = co_await disk_usage_and_reclaim(cfg); + /* + * compute target capacities such as minimum required capacity as well as + * capacities needed to meet goals such as local retention. + */ + auto target = co_await disk_usage_target(cfg); + co_return usage_report{ .usage = usage, .reclaim = reclaim, + .target = target, }; } diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index e10908e3d434..6e2ce872c4be 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -190,6 +190,7 @@ class disk_log_impl final : public log::impl { ss::future> disk_usage_and_reclaim(gc_config); + ss::future disk_usage_target(gc_config); private: size_t max_segment_size() const; diff --git a/src/v/storage/types.h b/src/v/storage/types.h index 4eb3040e10a0..1d85e4e62338 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -468,19 +468,43 @@ struct usage { } }; +/* + * disk usage targets + * + * min_capacity: minimum amount of storage capacity needed. + * + * The minimum capacity is intended to capture the minimum amount of disk space + * needed for the basic functionality. At a high-level this is expressed as a + * minimum number of segments per partition. Formally it is the sum of S * + * log.max_segment_size() for each managed log, where S is the value of the + * configuration option storage_reserve_min_segments specifying the minimum + * number of segments for which space should be reserved. + */ +struct usage_target { + size_t min_capacity{0}; + + friend usage_target operator+(usage_target lhs, const usage_target& rhs) { + lhs.min_capacity += rhs.min_capacity; + return lhs; + } +}; + /* * disk usage report * * usage: disk usage summary for log. * reclaim: disk uage reclaim summary for log. + * targets: target disk usage statistics. */ struct usage_report { usage usage; reclaim_size_limits reclaim; + usage_target target; friend usage_report operator+(usage_report lhs, const usage_report& rhs) { lhs.usage = lhs.usage + rhs.usage; lhs.reclaim = lhs.reclaim + rhs.reclaim; + lhs.target = lhs.target + rhs.target; return lhs; } }; From abb52d55093556a01ae613a77005dd8d0f76f223 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 10 May 2023 17:11:45 -0700 Subject: [PATCH 03/11] storage: add disk usage report constructor Useful for making sure we aren't missing any call sites--a possible scenario when using the {.x=, .y=} form of construction. Signed-off-by: Noah Watkins --- src/v/storage/disk_log_impl.cc | 6 +----- src/v/storage/types.h | 8 ++++++++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 4c742d86e5f1..6264a012980e 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1938,11 +1938,7 @@ ss::future disk_log_impl::disk_usage(gc_config cfg) { */ auto target = co_await disk_usage_target(cfg); - co_return usage_report{ - .usage = usage, - .reclaim = reclaim, - .target = target, - }; + co_return usage_report(usage, reclaim, target); } } // namespace storage diff --git a/src/v/storage/types.h b/src/v/storage/types.h index 1d85e4e62338..ca479aa81a79 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -501,6 +501,14 @@ struct usage_report { reclaim_size_limits reclaim; usage_target target; + usage_report() = default; + + usage_report( + struct usage usage, reclaim_size_limits reclaim, usage_target target) + : usage(usage) + , reclaim(reclaim) + , target(target) {} + friend usage_report operator+(usage_report lhs, const usage_report& rhs) { lhs.usage = lhs.usage + rhs.usage; lhs.reclaim = lhs.reclaim + rhs.reclaim; From 4288a532d19e0a21439d979bc1016163b9ab6b53 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Thu, 11 May 2023 13:18:27 -0700 Subject: [PATCH 04/11] storage: add test for minimum capacity target Checks in cases of multiple partitions, topics, and settings for both the minimum reserved number of segments and topic segment size settings. Signed-off-by: Noah Watkins --- tests/rptest/tests/full_disk_test.py | 31 +++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index df9453df78f4..1f9283f67805 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -319,7 +319,7 @@ def observed_data_size(pred): class LocalDiskReportTest(RedpandaTest): - topics = (TopicSpec(), TopicSpec(partition_count=4)) + topics = (TopicSpec(segment_bytes=2**30), TopicSpec(partition_count=4)) def check(self, threshold): admin = Admin(self.redpanda) @@ -335,6 +335,35 @@ def check(self, threshold): return False return True + @cluster(num_nodes=3) + def test_target_min_capacity(self): + """ + Test that the target min storage capcity reflects changes to reserved min + segments configuration option. + """ + admin = Admin(self.redpanda) + default_segment_size = admin.get_cluster_config()["log_segment_size"] + node = self.redpanda.nodes[0] + + for min_segments in (1, 2, 3): + self.redpanda.set_cluster_config( + dict(storage_reserve_min_segments=min_segments)) + reported = admin.get_local_storage_usage( + node)["target_min_capacity"] + + # 1 partition with 1gb segment size + # 4 partition with default segment size + # controller partition has default segment size + # reservation will be for min_segments + expected = min_segments * ((1 * 2**30) + \ + (4 * default_segment_size) + \ + (1 * default_segment_size)) + + diff = abs(reported - expected) + assert diff <= ( + 0.001 * reported + ), f"diff {diff} expected {expected} reported {reported}" + @cluster(num_nodes=3) def test_basic_usage_report(self): # start-up From 43ca4f714ca9049431ec5d34f9a5f676ab5160ae Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 17 May 2023 16:10:29 -0700 Subject: [PATCH 05/11] storage: split out overrides from cloud overrides It is useful to examine retention configurations without having cloud storage related overrides in the case of estimating the amount of disk capacity needed. Used in later commits. Signed-off-by: Noah Watkins --- src/v/storage/disk_log_impl.cc | 12 ++++++++++-- src/v/storage/disk_log_impl.h | 1 + 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 6264a012980e..1e552322f6bd 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -704,9 +704,12 @@ bool disk_log_impl::is_cloud_retention_active() const { && (config().is_archival_enabled()); } -gc_config disk_log_impl::apply_overrides(gc_config defaults) const { +/* + * applies overrides for non-cloud storage settings + */ +gc_config disk_log_impl::apply_base_overrides(gc_config defaults) const { if (!config().has_overrides()) { - return override_retention_config(defaults); + return defaults; } auto ret = defaults; @@ -734,6 +737,11 @@ gc_config disk_log_impl::apply_overrides(gc_config defaults) const { model::timestamp::now().value() - retention_time.value().count()); } + return ret; +} + +gc_config disk_log_impl::apply_overrides(gc_config defaults) const { + auto ret = apply_base_overrides(defaults); return override_retention_config(ret); } diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 6e2ce872c4be..9d13338f4bce 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -177,6 +177,7 @@ class disk_log_impl final : public log::impl { retention_adjust_timestamps(std::chrono::seconds ignore_in_future); gc_config apply_overrides(gc_config) const; + gc_config apply_base_overrides(gc_config) const; storage_resources& resources(); From fc8486fc6a46e07f3581572649b515d78658f448 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Wed, 17 May 2023 17:02:08 -0700 Subject: [PATCH 06/11] storage: add size based retention wanted metric Reports how much capacity a partition (or all raft) would like to have. This commit only considers sized based retetion requirements. Time will be added in a later commit. Signed-off-by: Noah Watkins --- src/v/redpanda/admin/api-doc/debug.json | 4 ++ src/v/redpanda/admin_server.cc | 1 + src/v/storage/disk_log_impl.cc | 72 ++++++++++++++++++++++++- src/v/storage/disk_log_impl.h | 2 +- src/v/storage/types.h | 16 ++++++ 5 files changed, 92 insertions(+), 3 deletions(-) diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index df062d0cc01f..6500a1f5453c 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -804,6 +804,10 @@ "target_min_capacity": { "type": "long", "description": "Target minimum capacity" + }, + "target_min_capacity_wanted": { + "type": "long", + "description": "Target minimum capacity wanted" } } } diff --git a/src/v/redpanda/admin_server.cc b/src/v/redpanda/admin_server.cc index 6c5400708489..f98172808527 100644 --- a/src/v/redpanda/admin_server.cc +++ b/src/v/redpanda/admin_server.cc @@ -4178,6 +4178,7 @@ admin_server::get_local_storage_usage_handler( ret.compaction = disk.usage.compaction; ret.reclaimable_by_retention = disk.reclaim.retention; ret.target_min_capacity = disk.target.min_capacity; + ret.target_min_capacity_wanted = disk.target.min_capacity_wanted; co_return ret; } diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 1e552322f6bd..aed1e5162609 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1920,13 +1920,74 @@ disk_log_impl::disk_usage_and_reclaim(gc_config cfg) { /* * assumes that the compaction gate is held. */ -ss::future disk_log_impl::disk_usage_target(gc_config) { +ss::future +disk_log_impl::disk_usage_target(gc_config cfg, usage usage) { usage_target target; target.min_capacity = config::shard_local_cfg().storage_reserve_min_segments() * max_segment_size(); + cfg = apply_base_overrides(cfg); + + /* + * compacted topics are always stored whole on local storage such that local + * retention settings do not come into play. + */ + if (config().is_compacted()) { + /* + * if there is no delete policy enabled for a compacted topic then its + * local capacity requirement is limited only by compaction process and + * how much data is written. for this case we use a heuristic of to + * report space wanted as `factor * current capacity` to reflect that we + * want space for continued growth. + */ + if (!config().is_collectable() || deletion_exempt(config().ntp())) { + target.min_capacity_wanted = usage.total() * 2; + co_return target; + } + + /* + * otherwise, we fall through and evaluate the space wanted metric using + * any configured retention policies _without_ overriding based on cloud + * retention settings. the assumption here is that retention and not + * compaction is what will limit on disk space. this heuristic should be + * updated if we decide to also make "nice to have" estimates based on + * expected compaction ratios. + */ + } else { + // applies local retention overrides for cloud storage + cfg = override_retention_config(cfg); + } + + /* + * estimate how much space is needed to be meet size based retention policy. + * + * even though retention bytes has a built-in mechanism (ie -1 / nullopt) to + * represent infinite retention, it is concievable that a user sets a large + * value to achieve the same goal. in this case we cannot know precisely + * what the intention is, and so we preserve the large value. + * + * TODO: it is likely that we will want to clamp this value at some point, + * either here or at a higher level. + */ + if (cfg.max_bytes.has_value()) { + target.min_capacity_wanted = cfg.max_bytes.value(); + /* + * since prefix truncation / garbage collection only removes whole + * segments we can make the estimate a bit more conservative by rounding + * up to roughly the nearest segment size. + */ + target.min_capacity_wanted + += max_segment_size() - (cfg.max_bytes.value() % max_segment_size()); + } else { + /* + * without any target max bytes, use `factor * current capacity` to + * reflect that we want space for continued growth. + */ + target.min_capacity_wanted = usage.total() * 2; + } + co_return target; } @@ -1944,7 +2005,14 @@ ss::future disk_log_impl::disk_usage(gc_config cfg) { * compute target capacities such as minimum required capacity as well as * capacities needed to meet goals such as local retention. */ - auto target = co_await disk_usage_target(cfg); + auto target = co_await disk_usage_target(cfg, usage); + + /* + * the intention here is to establish a needed <= wanted relationship which + * should generally provide a nicer set of numbers for consumers to use. + */ + target.min_capacity_wanted = std::max( + target.min_capacity_wanted, target.min_capacity); co_return usage_report(usage, reclaim, target); } diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 9d13338f4bce..7d50815c8293 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -191,7 +191,7 @@ class disk_log_impl final : public log::impl { ss::future> disk_usage_and_reclaim(gc_config); - ss::future disk_usage_target(gc_config); + ss::future disk_usage_target(gc_config, usage); private: size_t max_segment_size() const; diff --git a/src/v/storage/types.h b/src/v/storage/types.h index ca479aa81a79..8e959dd8a61b 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -472,6 +472,7 @@ struct usage { * disk usage targets * * min_capacity: minimum amount of storage capacity needed. + * min_capacity_wanted: minimum amount needed to meet policy requirements. * * The minimum capacity is intended to capture the minimum amount of disk space * needed for the basic functionality. At a high-level this is expressed as a @@ -479,12 +480,27 @@ struct usage { * log.max_segment_size() for each managed log, where S is the value of the * configuration option storage_reserve_min_segments specifying the minimum * number of segments for which space should be reserved. + * + * The minimum capacity wanted is an estimate of the amount of storage capacity + * needed to meet various configurable targets. This value is derived from + * multiple sources and policies: + * + * * size-based retention: the amount of space needed to meet size-based + * local retention policy, rounded up to the nearest segment size. + * + * * compaction: compacted topics are kept whole on local storage (ie not + * subject to truncation due to local retention policies). for compact,delete + * topic the retention policy (not local retention) is used to express how + * much capacity is wanted. for a compact-only topic `2 * current size` is + * reported. */ struct usage_target { size_t min_capacity{0}; + size_t min_capacity_wanted{0}; friend usage_target operator+(usage_target lhs, const usage_target& rhs) { lhs.min_capacity += rhs.min_capacity; + lhs.min_capacity_wanted += rhs.min_capacity_wanted; return lhs; } }; From 46750b6c051b8b90ce043b8dcf28343bae05f621 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Thu, 11 May 2023 10:33:55 -0700 Subject: [PATCH 07/11] test: add basic test for minimum capacity wanted metric Signed-off-by: Noah Watkins --- tests/rptest/tests/full_disk_test.py | 36 +++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index 1f9283f67805..c5eef7e2ac74 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -319,7 +319,19 @@ def observed_data_size(pred): class LocalDiskReportTest(RedpandaTest): - topics = (TopicSpec(segment_bytes=2**30), TopicSpec(partition_count=4)) + topics = ( + TopicSpec(segment_bytes=2**30, + retention_bytes=2 * 2**30, + cleanup_policy=TopicSpec.CLEANUP_COMPACT), + TopicSpec( + partition_count=4, + # retention should be larger than default segment size + retention_bytes=200 * 2**20, + cleanup_policy=TopicSpec.CLEANUP_DELETE)) + + def __init__(self, test_ctx): + extra_rp_conf = dict(log_compaction_interval_ms=3600 * 1000) + super().__init__(test_context=test_ctx, extra_rp_conf=extra_rp_conf) def check(self, threshold): admin = Admin(self.redpanda) @@ -335,6 +347,28 @@ def check(self, threshold): return False return True + @cluster(num_nodes=3) + def test_target_min_capacity_wanted(self): + """ + Test minimum capacity wanted calculation. + """ + admin = Admin(self.redpanda) + default_segment_size = admin.get_cluster_config()["log_segment_size"] + node = self.redpanda.nodes[0] + + # minimum based on retention bytes policy + reported = admin.get_local_storage_usage( + node)["target_min_capacity_wanted"] + expected = 4 * 200 * 2**20 + \ + 2 * 1 * 2**30 # the compacted topic clamp min wanted at min needed but doesn't contain any data + diff = abs(reported - expected) + # the difference should be less than the one extra segment per + # partitions since the reported size will try to round up to the nearest + # segment. + assert diff <= ( + 4 * default_segment_size + ), f"diff {diff} expected {expected} reported {reported}" + @cluster(num_nodes=3) def test_target_min_capacity(self): """ From 072b9ae78f55da84918cc409ef46e0160ad0e39a Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Thu, 18 May 2023 19:12:41 -0700 Subject: [PATCH 08/11] storage: add time based retention capacity wanted metric Examines recently written data to a partition and tries to extrapolate how much capacity will be needed based on the apparent rate of data production. Signed-off-by: Noah Watkins --- src/v/storage/disk_log_impl.cc | 130 +++++++++++++++++++++++++++++++-- src/v/storage/disk_log_impl.h | 2 + src/v/storage/types.h | 4 + 3 files changed, 128 insertions(+), 8 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index aed1e5162609..51fe333744a2 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1971,26 +1971,140 @@ disk_log_impl::disk_usage_target(gc_config cfg, usage usage) { * TODO: it is likely that we will want to clamp this value at some point, * either here or at a higher level. */ + std::optional want_size; if (cfg.max_bytes.has_value()) { - target.min_capacity_wanted = cfg.max_bytes.value(); + want_size = cfg.max_bytes.value(); /* * since prefix truncation / garbage collection only removes whole * segments we can make the estimate a bit more conservative by rounding * up to roughly the nearest segment size. */ - target.min_capacity_wanted - += max_segment_size() - (cfg.max_bytes.value() % max_segment_size()); - } else { - /* - * without any target max bytes, use `factor * current capacity` to - * reflect that we want space for continued growth. - */ + want_size.value() += max_segment_size() + - (cfg.max_bytes.value() % max_segment_size()); + } + + /* + * grab the time based retention estimate and combine. if only time or size + * is available, use that. if neither is available, use the `factor * + * current size` heuristic to express that we want to allow for growth. + * when both are available the minimum is taken. the reason for this is that + * when retention rules are eventually applied, garbage collection will + * select the rule that results in the most data being collected. + */ + auto want_time = co_await disk_usage_target_time_retention(cfg); + + if (!want_time.has_value() && !want_size.has_value()) { target.min_capacity_wanted = usage.total() * 2; + } else if (want_time.has_value() && want_size.has_value()) { + target.min_capacity_wanted = std::min( + want_time.value(), want_size.value()); + } else if (want_time.has_value()) { + target.min_capacity_wanted = want_time.value(); + } else { + target.min_capacity_wanted = want_size.value(); } co_return target; } +ss::future> +disk_log_impl::disk_usage_target_time_retention(gc_config cfg) { + /* + * take the opportunity to maybe fixup janky weird timestamps. also done in + * normal garbage collection path and should be idempotent. + */ + if (auto ignore_timestamps_in_future + = config::shard_local_cfg().storage_ignore_timestamps_in_future_sec(); + ignore_timestamps_in_future.has_value()) { + co_await retention_adjust_timestamps( + ignore_timestamps_in_future.value()); + } + + /* + * we are going to use whole segments for the data we'll use to extrapolate + * the time-based retention capacity needs to avoid complexity of index + * lookups (for now); if there isn't an entire segment worth of data, we + * might not have a good sample size either. first we'll find the oldest + * segment whose starting offset is greater than the eviction time. this and + * subsequent segments will be fully contained within the time range. + */ + auto it = std::find_if( + std::cbegin(_segs), + std::cend(_segs), + [time = cfg.eviction_time](const ss::lw_shared_ptr& s) { + return s->index().base_timestamp() >= time; + }); + + // collect segments for reducing + fragmented_vector segments; + for (; it != std::cend(_segs); ++it) { + segments.push_back(*it); + } + + /* + * not enough data. caller will substitute in a reasonable value. + */ + if (segments.size() < 2) { + vlog( + stlog.trace, + "time-based-usage: skipping log without too few segments ({}) ntp {}", + segments.size(), + config().ntp()); + co_return std::nullopt; + } + + auto start_timestamp = segments.front()->index().base_timestamp(); + auto end_timestamp = segments.back()->index().max_timestamp(); + auto duration = end_timestamp - start_timestamp; + auto missing = start_timestamp - cfg.eviction_time; + + /* + * timestamps are weird or there isn't a lot of data. be careful with "not a + * lot of data" when considering time because throughput may be large. so we + * go with something like 10 seconds just as a sanity check. + */ + constexpr auto min_time_needed = model::timestamp(10'000); + if (missing <= model::timestamp(0) || duration <= min_time_needed) { + vlog( + stlog.trace, + "time-based-usage: skipping with time params start {} end {} etime " + "{} dur {} miss {} ntp {}", + start_timestamp, + end_timestamp, + cfg.eviction_time, + duration, + missing, + config().ntp()); + co_return std::nullopt; + } + + // roll up the amount of disk space taken by these segments + auto usage = co_await ss::map_reduce( + segments, + [](const segment_set::type& seg) { return seg->persistent_size(); }, + storage::usage{}, + [](storage::usage acc, storage::usage u) { return acc + u; }); + + // extrapolate out for the missing period of time in the retention period + auto missing_bytes = (usage.total() * missing.value()) / duration.value(); + auto total = usage.total() + missing_bytes; + + vlog( + stlog.trace, + "time-based-usage: reporting total {} usage {} start {} end {} etime {} " + "dur {} miss {} ntp {}", + total, + usage.total(), + start_timestamp, + end_timestamp, + cfg.eviction_time, + duration, + missing, + config().ntp()); + + co_return total; +} + ss::future disk_log_impl::disk_usage(gc_config cfg) { // protect against concurrent log removal with housekeeping loop auto gate = _compaction_housekeeping_gate.hold(); diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 7d50815c8293..357fd9935e03 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -192,6 +192,8 @@ class disk_log_impl final : public log::impl { ss::future> disk_usage_and_reclaim(gc_config); ss::future disk_usage_target(gc_config, usage); + ss::future> + disk_usage_target_time_retention(gc_config); private: size_t max_segment_size() const; diff --git a/src/v/storage/types.h b/src/v/storage/types.h index 8e959dd8a61b..e1a21babb13a 100644 --- a/src/v/storage/types.h +++ b/src/v/storage/types.h @@ -488,6 +488,10 @@ struct usage { * * size-based retention: the amount of space needed to meet size-based * local retention policy, rounded up to the nearest segment size. * + * * time-based retention: attempts to extrapolate the capacity requirements + * by examining recently written data and the apparent rate at which it has + * been written. + * * * compaction: compacted topics are kept whole on local storage (ie not * subject to truncation due to local retention policies). for compact,delete * topic the retention policy (not local retention) is used to express how From 982ca09ef02b3251f534ffe6c14d362af310acc6 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Thu, 18 May 2023 22:49:27 -0700 Subject: [PATCH 09/11] storage: add test for time based retention capacity metric Signed-off-by: Noah Watkins --- tests/rptest/tests/full_disk_test.py | 46 ++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index c5eef7e2ac74..a02f5ba8118e 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -318,6 +318,52 @@ def observed_data_size(pred): backoff_sec=2) +class LocalDiskReportTimeTest(RedpandaTest): + topics = (TopicSpec(segment_bytes=2**20, + partition_count=1, + retention_bytes=-1, + retention_ms=60 * 60 * 1000, + cleanup_policy=TopicSpec.CLEANUP_DELETE), ) + + def __init__(self, test_ctx): + extra_rp_conf = dict(log_compaction_interval_ms=3600 * 1000) + super().__init__(test_context=test_ctx, extra_rp_conf=extra_rp_conf) + + @cluster(num_nodes=3) + def test_target_min_capacity_wanted_time_based(self): + admin = Admin(self.redpanda) + default_segment_size = admin.get_cluster_config()["log_segment_size"] + + # produce roughly 30mb at 0.5mb/sec + kafka_tools = KafkaCliTools(self.redpanda) + kafka_tools.produce(self.topic, + 30 * 1024, + 1024, + throughput=500, + acks=-1) + + node = self.redpanda.nodes[0] + reported = admin.get_local_storage_usage( + node)["target_min_capacity_wanted"] + + # params. the size is about 900k larger than what was written, + # attributable to per record overheads etc... and determined emperically + # by looking at trace log stats. + size = 32441102 + time = 61 + retention = 3600 + expected = retention * (size / time) + + # factor in the 2 segments worth of space for controller log + diff = abs(reported - expected - 2 * default_segment_size) + + # there is definitely going to be some fuzz factor needed here and may + # need updated, but after many runs 50mb was a good amount of slack. + assert diff <= ( + 100 * 2**20 + ), f"diff {diff} reported {reported} expected {expected} default seg size {default_segment_size}" + + class LocalDiskReportTest(RedpandaTest): topics = ( TopicSpec(segment_bytes=2**30, From ca3ce060d9b2259a3fa66c363608cb250e90d121 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 23 May 2023 11:54:40 -0700 Subject: [PATCH 10/11] storage: update method name to reflect behavior Signed-off-by: Noah Watkins --- src/v/storage/disk_log_impl.cc | 6 ++---- src/v/storage/disk_log_impl.h | 7 ++++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 51fe333744a2..059b34d58e4b 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1817,12 +1817,10 @@ log make_disk_backed_log( } /* - * disk usage and amount reclaimable are best computed together. - * * assumes that the compaction gate is held. */ ss::future> -disk_log_impl::disk_usage_and_reclaim(gc_config cfg) { +disk_log_impl::disk_usage_and_reclaimable_space(gc_config cfg) { std::optional max_offset; if (config().is_collectable()) { cfg = apply_overrides(cfg); @@ -2113,7 +2111,7 @@ ss::future disk_log_impl::disk_usage(gc_config cfg) { * compute the amount of current disk usage as well as the amount available * for being reclaimed. */ - auto [usage, reclaim] = co_await disk_usage_and_reclaim(cfg); + auto [usage, reclaim] = co_await disk_usage_and_reclaimable_space(cfg); /* * compute target capacities such as minimum required capacity as well as diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 357fd9935e03..02eadac73d21 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -189,8 +189,13 @@ class disk_log_impl final : public log::impl { std::optional retention_offset(gc_config); + /* + * total disk usage and the amount of reclaimable space are most efficiently + * computed together given that use cases often use both together. + */ ss::future> - disk_usage_and_reclaim(gc_config); + disk_usage_and_reclaimable_space(gc_config); + ss::future disk_usage_target(gc_config, usage); ss::future> disk_usage_target_time_retention(gc_config); From b240a85ea2812bd86e23530ed6df282d36a6ae35 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 23 May 2023 12:03:39 -0700 Subject: [PATCH 11/11] test: make it clear that custom segment bytes is used Signed-off-by: Noah Watkins --- tests/rptest/tests/full_disk_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rptest/tests/full_disk_test.py b/tests/rptest/tests/full_disk_test.py index a02f5ba8118e..56e55837516d 100644 --- a/tests/rptest/tests/full_disk_test.py +++ b/tests/rptest/tests/full_disk_test.py @@ -435,7 +435,7 @@ def test_target_min_capacity(self): # 4 partition with default segment size # controller partition has default segment size # reservation will be for min_segments - expected = min_segments * ((1 * 2**30) + \ + expected = min_segments * ((1 * self.topics[0].segment_bytes) + \ (4 * default_segment_size) + \ (1 * default_segment_size))