Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report minimum storage threshold targets #10710

Merged
merged 11 commits into from
May 26, 2023
14 changes: 14 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,20 @@ configuration::configuration()
.example = "3600",
.visibility = visibility::tunable},
std::nullopt)
, storage_reserve_min_segments(
*this,
"storage_reserve_min_segments",
"The number of segments per partition that the system will attempt to "
"reserve disk capcity for. For example, if the maximum segment size is "
"configured to be 100 MB, and the value of this option is 2, then in a "
"system with 10 partitions Redpanda will attempt to reserve at least 2 "
"GB "
"of disk space.",
{.needs_restart = needs_restart::no,
.example = "4",
.visibility = visibility::tunable},
2,
{.min = 1})
, id_allocator_log_capacity(
*this,
"id_allocator_log_capacity",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ struct configuration final : public config_store {
property<size_t> max_compacted_log_segment_size;
property<std::optional<std::chrono::seconds>>
storage_ignore_timestamps_in_future_sec;
bounded_property<int16_t> storage_reserve_min_segments;

property<int16_t> id_allocator_log_capacity;
property<int16_t> id_allocator_batch_size;
Expand Down
8 changes: 8 additions & 0 deletions src/v/redpanda/admin/api-doc/debug.json
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,14 @@
"reclaimable_by_retention": {
"type": "long",
"description": "Number of bytes currently reclaimable by retention"
},
"target_min_capacity": {
"type": "long",
"description": "Target minimum capacity"
},
"target_min_capacity_wanted": {
"type": "long",
"description": "Target minimum capacity wanted"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4177,6 +4177,8 @@ admin_server::get_local_storage_usage_handler(
ret.index = disk.usage.index;
ret.compaction = disk.usage.compaction;
ret.reclaimable_by_retention = disk.reclaim.retention;
ret.target_min_capacity = disk.target.min_capacity;
ret.target_min_capacity_wanted = disk.target.min_capacity_wanted;

co_return ret;
}
Expand Down
240 changes: 230 additions & 10 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,12 @@ bool disk_log_impl::is_cloud_retention_active() const {
&& (config().is_archival_enabled());
}

gc_config disk_log_impl::apply_overrides(gc_config defaults) const {
/*
* applies overrides for non-cloud storage settings
*/
gc_config disk_log_impl::apply_base_overrides(gc_config defaults) const {
if (!config().has_overrides()) {
return override_retention_config(defaults);
return defaults;
}

auto ret = defaults;
Expand Down Expand Up @@ -734,6 +737,11 @@ gc_config disk_log_impl::apply_overrides(gc_config defaults) const {
model::timestamp::now().value() - retention_time.value().count());
}

return ret;
}

gc_config disk_log_impl::apply_overrides(gc_config defaults) const {
auto ret = apply_base_overrides(defaults);
return override_retention_config(ret);
}

Expand Down Expand Up @@ -1808,10 +1816,11 @@ log make_disk_backed_log(
return log(ptr);
}

ss::future<usage_report> disk_log_impl::disk_usage(gc_config cfg) {
// protect against concurrent log removal with housekeeping loop
auto gate = _compaction_housekeeping_gate.hold();

/*
* assumes that the compaction gate is held.
*/
ss::future<std::pair<usage, reclaim_size_limits>>
disk_log_impl::disk_usage_and_reclaimable_space(gc_config cfg) {
std::optional<model::offset> max_offset;
if (config().is_collectable()) {
cfg = apply_overrides(cfg);
Expand Down Expand Up @@ -1903,10 +1912,221 @@ ss::future<usage_report> disk_log_impl::disk_usage(gc_config cfg) {
.available = retention.total() + available.total(),
};

co_return usage_report{
.usage = usage,
.reclaim = reclaim,
};
co_return std::make_pair(usage, reclaim);
}

/*
* assumes that the compaction gate is held.
*/
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
ss::future<usage_target>
disk_log_impl::disk_usage_target(gc_config cfg, usage usage) {
usage_target target;

target.min_capacity
= config::shard_local_cfg().storage_reserve_min_segments()
* max_segment_size();

cfg = apply_base_overrides(cfg);

/*
* compacted topics are always stored whole on local storage such that local
* retention settings do not come into play.
*/
if (config().is_compacted()) {
/*
* if there is no delete policy enabled for a compacted topic then its
* local capacity requirement is limited only by compaction process and
* how much data is written. for this case we use a heuristic of to
* report space wanted as `factor * current capacity` to reflect that we
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
* want space for continued growth.
*/
if (!config().is_collectable() || deletion_exempt(config().ntp())) {
target.min_capacity_wanted = usage.total() * 2;
co_return target;
}

/*
* otherwise, we fall through and evaluate the space wanted metric using
* any configured retention policies _without_ overriding based on cloud
* retention settings. the assumption here is that retention and not
* compaction is what will limit on disk space. this heuristic should be
* updated if we decide to also make "nice to have" estimates based on
* expected compaction ratios.
*/
Comment on lines +1949 to +1955
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following why we're not applying the cloud overrides here. Aren't they honored when applying retention, even on a compacted topic? (i could be wrong, would appreciate a pointer)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain, and if it makes sense to you, i'll update this comment to clarify the case in the code:

Compacted topics aren't subject to local retention (they always remain whole on local storage), so the only way that retention comes into play for a compacted topic is if the policy is compat,delete. However, the override_retention_config helper doesn't take compaction into account. So if we were to apply the cloud overrides here for a compacted topic, then the retention policy we used in calculating "nice to have" wouldn't reflect what would actually happen when housekeeping ran.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand: it wasn't clicking that retention settings applied to a compact,delete topic followed the overarching retention settings rather than the local settings for tiered storage. If that's the case, I think this policy makes sense.

} else {
// applies local retention overrides for cloud storage
cfg = override_retention_config(cfg);
}

/*
* estimate how much space is needed to be meet size based retention policy.
*
* even though retention bytes has a built-in mechanism (ie -1 / nullopt) to
* represent infinite retention, it is concievable that a user sets a large
* value to achieve the same goal. in this case we cannot know precisely
* what the intention is, and so we preserve the large value.
*
* TODO: it is likely that we will want to clamp this value at some point,
* either here or at a higher level.
*/
std::optional<size_t> want_size;
if (cfg.max_bytes.has_value()) {
want_size = cfg.max_bytes.value();
/*
* since prefix truncation / garbage collection only removes whole
* segments we can make the estimate a bit more conservative by rounding
* up to roughly the nearest segment size.
*/
want_size.value() += max_segment_size()
- (cfg.max_bytes.value() % max_segment_size());
}

/*
* grab the time based retention estimate and combine. if only time or size
* is available, use that. if neither is available, use the `factor *
* current size` heuristic to express that we want to allow for growth.
* when both are available the minimum is taken. the reason for this is that
* when retention rules are eventually applied, garbage collection will
* select the rule that results in the most data being collected.
*/
auto want_time = co_await disk_usage_target_time_retention(cfg);

if (!want_time.has_value() && !want_size.has_value()) {
target.min_capacity_wanted = usage.total() * 2;
} else if (want_time.has_value() && want_size.has_value()) {
target.min_capacity_wanted = std::min(
want_time.value(), want_size.value());
} else if (want_time.has_value()) {
target.min_capacity_wanted = want_time.value();
} else {
target.min_capacity_wanted = want_size.value();
}

co_return target;
}

ss::future<std::optional<size_t>>
disk_log_impl::disk_usage_target_time_retention(gc_config cfg) {
/*
* take the opportunity to maybe fixup janky weird timestamps. also done in
* normal garbage collection path and should be idempotent.
*/
if (auto ignore_timestamps_in_future
= config::shard_local_cfg().storage_ignore_timestamps_in_future_sec();
ignore_timestamps_in_future.has_value()) {
co_await retention_adjust_timestamps(
ignore_timestamps_in_future.value());
}

/*
* we are going to use whole segments for the data we'll use to extrapolate
* the time-based retention capacity needs to avoid complexity of index
* lookups (for now); if there isn't an entire segment worth of data, we
* might not have a good sample size either. first we'll find the oldest
* segment whose starting offset is greater than the eviction time. this and
* subsequent segments will be fully contained within the time range.
*/
auto it = std::find_if(
std::cbegin(_segs),
std::cend(_segs),
[time = cfg.eviction_time](const ss::lw_shared_ptr<segment>& s) {
return s->index().base_timestamp() >= time;
});

// collect segments for reducing
fragmented_vector<segment_set::type> segments;
for (; it != std::cend(_segs); ++it) {
segments.push_back(*it);
}

/*
* not enough data. caller will substitute in a reasonable value.
*/
if (segments.size() < 2) {
vlog(
stlog.trace,
"time-based-usage: skipping log without too few segments ({}) ntp {}",
segments.size(),
config().ntp());
co_return std::nullopt;
}

auto start_timestamp = segments.front()->index().base_timestamp();
auto end_timestamp = segments.back()->index().max_timestamp();
auto duration = end_timestamp - start_timestamp;
auto missing = start_timestamp - cfg.eviction_time;

/*
* timestamps are weird or there isn't a lot of data. be careful with "not a
* lot of data" when considering time because throughput may be large. so we
* go with something like 10 seconds just as a sanity check.
*/
constexpr auto min_time_needed = model::timestamp(10'000);
if (missing <= model::timestamp(0) || duration <= min_time_needed) {
vlog(
stlog.trace,
"time-based-usage: skipping with time params start {} end {} etime "
"{} dur {} miss {} ntp {}",
start_timestamp,
end_timestamp,
cfg.eviction_time,
duration,
missing,
config().ntp());
co_return std::nullopt;
}

// roll up the amount of disk space taken by these segments
auto usage = co_await ss::map_reduce(
segments,
[](const segment_set::type& seg) { return seg->persistent_size(); },
storage::usage{},
[](storage::usage acc, storage::usage u) { return acc + u; });

// extrapolate out for the missing period of time in the retention period
auto missing_bytes = (usage.total() * missing.value()) / duration.value();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is saying "I expect to see same throughput that ingested the latest segments for the entire remainder of the retention period". This seems like a reasonable, but it makes me wonder about bursty workloads, where a large number of records are written in the span of a few dozens of seconds, and then stopped. In such a case, we could end up with a small duration and large missing and end up with a very large value for missing_bytes.

Maybe it makes sense to compute the expected throughput with now() - start_timestamp instead of end_timestamp - start_timestamp, though given these are user-input timestamps, it's probably best to avoid using now()

I suppose regardless it might be better to overestimate the wanted bytes anyway for the goal of avoiding out of space issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to capture this issue with a heuristic like not making any estimate unless we have say 10 seconds worth of data. But this is going to just be a whack-a-mole game I'm afraid. Perhaps we would do well to do add a cap here, too, such as double or triple the amount of data currently on disk. This may work well since the goal isn't necessarily to observe the system once and know how much space we need. Rather, this will be monitoring periodically. So capping will let the estimate continue to portray growth requirement, while also eliminating huge bursts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, picking something that's good in all cases is hard. I'd lean harder away from this if this significantly impacted workloads (e.g. if we were to base rejection of produce messages on this). But given the ultimate goal here is to just apply retention more aggressively, this is probably fine.

Might be worth calling out in a header comment that this is an estimate and is subject to being way off, so don't use it too aggressively (like rejecting writes based on it)

auto total = usage.total() + missing_bytes;

vlog(
stlog.trace,
"time-based-usage: reporting total {} usage {} start {} end {} etime {} "
"dur {} miss {} ntp {}",
total,
usage.total(),
start_timestamp,
end_timestamp,
cfg.eviction_time,
duration,
missing,
config().ntp());

co_return total;
}

ss::future<usage_report> disk_log_impl::disk_usage(gc_config cfg) {
// protect against concurrent log removal with housekeeping loop
auto gate = _compaction_housekeeping_gate.hold();

/*
* compute the amount of current disk usage as well as the amount available
* for being reclaimed.
*/
auto [usage, reclaim] = co_await disk_usage_and_reclaimable_space(cfg);

/*
* compute target capacities such as minimum required capacity as well as
* capacities needed to meet goals such as local retention.
*/
auto target = co_await disk_usage_target(cfg, usage);

/*
* the intention here is to establish a needed <= wanted relationship which
* should generally provide a nicer set of numbers for consumers to use.
*/
target.min_capacity_wanted = std::max(
target.min_capacity_wanted, target.min_capacity);

co_return usage_report(usage, reclaim, target);
}

} // namespace storage
12 changes: 12 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class disk_log_impl final : public log::impl {
retention_adjust_timestamps(std::chrono::seconds ignore_in_future);

gc_config apply_overrides(gc_config) const;
gc_config apply_base_overrides(gc_config) const;

storage_resources& resources();

Expand All @@ -188,6 +189,17 @@ class disk_log_impl final : public log::impl {

std::optional<model::offset> retention_offset(gc_config);

/*
* total disk usage and the amount of reclaimable space are most efficiently
* computed together given that use cases often use both together.
*/
ss::future<std::pair<usage, reclaim_size_limits>>
disk_usage_and_reclaimable_space(gc_config);

ss::future<usage_target> disk_usage_target(gc_config, usage);
ss::future<std::optional<size_t>>
disk_usage_target_time_retention(gc_config);

private:
size_t max_segment_size() const;
// Computes the segment size based on the latest max_segment_size
Expand Down
Loading