From b4836e4fc0ca11cda05c76217dd94f24eea01dfe Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 8 May 2024 18:59:49 +0200 Subject: [PATCH 1/3] c/topic_table: bump _topics_map_revision in more places Partition balancer relies on _topics_map_revision checks to safely iterate over topic table collections with partition granularity (i.e. references to partition data and replica sets are stored and accessed across yield points). To make this safe, increment _topics_map_revision every time _topics, _updates_in_progress, _disabled_partitions or nested collections are modified in a way that invalidates references or iterators. (cherry picked from commit 30bbfb1f24f7e289c6a02956e9718bff4e2f6754) --- src/v/cluster/topic_table.cc | 38 +++++++++++++++++++++++++++++------- src/v/cluster/topic_table.h | 9 +++++++-- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index c677d9a771b75..0c80ff2fb089c 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -344,6 +344,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { _updates_in_progress.erase(it); + _topics_map_revision++; + on_partition_move_finish(cmd.key, cmd.value); // notify backend about finished update @@ -417,6 +419,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { current_assignment_it->replicas = in_progress_it->second.get_previous_replicas(); + _topics_map_revision++; + _pending_deltas.emplace_back( std::move(cmd.key), model::revision_id(o), @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { co_return errc::no_update_in_progress; } + auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); + if (p_meta_it == tp->second.partitions.end()) { + co_return errc::partition_not_exists; + } + // revert replica set update current_assignment_it->replicas = in_progress_it->second.get_target_replicas(); @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { current_assignment_it->replicas, }; - // update partition_meta object - auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); - if (p_meta_it == tp->second.partitions.end()) { - co_return errc::partition_not_exists; - } + // update partition_meta object: // the cancellation was reverted and update went through, we must // update replicas_revisions. p_meta_it->second.replicas_revisions = update_replicas_revisions( @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { /// Since the update is already finished we drop in_progress state _updates_in_progress.erase(in_progress_it); + _topics_map_revision++; + // notify backend about finished update _pending_deltas.emplace_back( ntp, model::revision_id(o), topic_table_delta_type::replicas_updated); @@ -664,6 +671,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { } } + _topics_map_revision++; notify_waiters(); co_return errc::success; @@ -988,6 +996,7 @@ class topic_table::snapshot_applier { disabled_partitions_t& _disabled_partitions; fragmented_vector& _pending_deltas; topic_table_probe& _probe; + model::revision_id& _topics_map_revision; model::revision_id _snap_revision; public: @@ -996,6 +1005,7 @@ class topic_table::snapshot_applier { , _disabled_partitions(parent._disabled_partitions) , _pending_deltas(parent._pending_deltas) , _probe(parent._probe) + , _topics_map_revision(parent._topics_map_revision) , _snap_revision(snap_revision) {} void delete_ntp( @@ -1003,7 +1013,9 @@ class topic_table::snapshot_applier { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id); vlog( clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp); - _updates_in_progress.erase(ntp); + if (_updates_in_progress.erase(ntp)) { + _topics_map_revision++; + }; _pending_deltas.emplace_back( std::move(ntp), _snap_revision, topic_table_delta_type::removed); @@ -1022,7 +1034,9 @@ class topic_table::snapshot_applier { delete_ntp(ns_tp, p_as); co_await ss::coroutine::maybe_yield(); } - _disabled_partitions.erase(ns_tp); + if (_disabled_partitions.erase(ns_tp)) { + _topics_map_revision++; + }; _probe.handle_topic_deletion(ns_tp); // topic_metadata_item object is supposed to be removed from _topics by // the caller @@ -1037,6 +1051,9 @@ class topic_table::snapshot_applier { vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp); size_t pending_deltas_start_idx = _pending_deltas.size(); + // we are going to modify md_item so increment the revision right away. + _topics_map_revision++; + const model::partition_id p_id = ntp.tp.partition; // 1. reconcile the _topics state (the md_item object) and generate @@ -1169,7 +1186,9 @@ class topic_table::snapshot_applier { topic_metadata_item ret{topic_metadata{topic.metadata, {}}}; if (topic.disabled_set) { _disabled_partitions[ns_tp] = *topic.disabled_set; + _topics_map_revision++; } + for (const auto& [p_id, partition] : topic.partitions) { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id); add_ntp(ntp, topic, partition, ret, false); @@ -1208,6 +1227,7 @@ ss::future<> topic_table::apply_snapshot( // The topic was re-created, delete and add it anew. co_await applier.delete_topic(ns_tp, md_item); md_item = co_await applier.create_topic(ns_tp, topic_snapshot); + _topics_map_revision++; } else { // The topic was present in the previous set, now we need to // reconcile individual partitions. @@ -1225,10 +1245,12 @@ ss::future<> topic_table::apply_snapshot( old_disabled_set = std::exchange( _disabled_partitions[ns_tp], *topic_snapshot.disabled_set); + _topics_map_revision++; } else if (auto it = _disabled_partitions.find(ns_tp); it != _disabled_partitions.end()) { old_disabled_set = std::move(it->second); _disabled_partitions.erase(it); + _topics_map_revision++; } // 2. For each partition in the new set, reconcile assignments @@ -1265,6 +1287,7 @@ ss::future<> topic_table::apply_snapshot( if (!topic_snapshot.partitions.contains(as_it_copy->id)) { applier.delete_ntp(ns_tp, *as_it_copy); md_item.get_assignments().erase(as_it_copy); + _topics_map_revision++; } co_await ss::coroutine::maybe_yield(); } @@ -1642,6 +1665,7 @@ void topic_table::change_partition_replicas( auto previous_assignment = current_assignment.replicas; // replace partition replica set current_assignment.replicas = new_assignment; + _topics_map_revision++; // calculate delta for backend diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 645db3216eed5..0cc56fc4f09d6 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -627,8 +627,13 @@ class topic_table { updates_t _updates_in_progress; model::revision_id _last_applied_revision_id; - // Monotonic counter that is bumped for every addition/deletion to topics - // map. Unlike other revisions this does not correspond to the command + + // Monotonic counter that is bumped each time _topics, _disabled_partitions, + // or _updates_in_progress are modified in a way that makes iteration over + // them unsafe (i.e. invalidates iterators or references, including + // for nested collections like partition sets and replica sets). + // + // Unlike other revisions this does not correspond to the command // revision that updated the map. model::revision_id _topics_map_revision{0}; From 5904396cec5468a3463787778bf7e47426d323c2 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:35:27 +0200 Subject: [PATCH 2/3] utils: introduce concurrent_modification_error exception It is handy to have a base class for all instances of concurrent modifications. (cherry picked from commit 627db4d751bc2e583f0800be0856a3cfb2aaf67b) Conflicts: src/v/cluster/topic_table.h src/v/utils/stable_iterator_adaptor.h --- src/v/cluster/topic_table.h | 14 ++++-------- src/v/cluster/topics_frontend.cc | 2 +- src/v/utils/exceptions.h | 33 +++++++++++++++++++++++++++ src/v/utils/stable_iterator_adaptor.h | 10 ++++---- 4 files changed, 44 insertions(+), 15 deletions(-) create mode 100644 src/v/utils/exceptions.h diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 0cc56fc4f09d6..bcef38f7e26d0 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -98,21 +98,17 @@ class topic_table { // * partition::get_revision_id() // * raft::group_configuration::revision_id() - class concurrent_modification_error final : public std::exception { + class concurrent_modification_error final + : public ::concurrent_modification_error { public: concurrent_modification_error( model::revision_id initial_revision, model::revision_id current_revision) - : _msg(ssx::sformat( - "Topic table was modified by concurrent fiber. (initial_revision: " - "{}, current_revision: {}) ", + : ::concurrent_modification_error(ssx::sformat( + "Topic table was modified by concurrent fiber. " + "(initial_revision: {}, current_revision: {}) ", initial_revision, current_revision)) {} - - const char* what() const noexcept final { return _msg.c_str(); } - - private: - ss::sstring _msg; }; class in_progress_update { diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 58c1d2d84ad01..58b7262aa66e3 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -976,7 +976,7 @@ topics_frontend::partitions_with_lost_majority( co_return errc::concurrent_modification_error; } co_return result; - } catch (const topic_table::concurrent_modification_error& e) { + } catch (const concurrent_modification_error& e) { // state changed while generating the plan, force caller to retry; vlog( clusterlog.info, diff --git a/src/v/utils/exceptions.h b/src/v/utils/exceptions.h new file mode 100644 index 0000000000000..a70b8dd549bd5 --- /dev/null +++ b/src/v/utils/exceptions.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "seastarx.h" + +#include + +#include + +/// Some objects reference state that changes comparatively rarely (e.g. +/// topic_table state) across yield points and expect these references to remain +/// valid. In case these references are invalidated by a concurrent fiber, this +/// exception is thrown. This is a signal for the caller to restart the +/// computation with up-to-date state. +class concurrent_modification_error : public std::exception { +public: + explicit concurrent_modification_error(ss::sstring s) + : _msg(std::move(s)) {} + + const char* what() const noexcept override { return _msg.c_str(); } + +private: + ss::sstring _msg; +}; diff --git a/src/v/utils/stable_iterator_adaptor.h b/src/v/utils/stable_iterator_adaptor.h index 424bcb82fd32b..fe39fbdbc8785 100644 --- a/src/v/utils/stable_iterator_adaptor.h +++ b/src/v/utils/stable_iterator_adaptor.h @@ -11,20 +11,20 @@ #pragma once #include "seastarx.h" +#include "utils/exceptions.h" #include #include #include -#include -#include #include -class iterator_stability_violation : public std::runtime_error { +class iterator_stability_violation final + : public concurrent_modification_error { public: - explicit iterator_stability_violation(const std::string& why) - : std::runtime_error(why){}; + explicit iterator_stability_violation(ss::sstring why) + : concurrent_modification_error(std::move(why)){}; }; /* From 7f651c9e37bfc0ea73b6bb1c17759dbb18776e05 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 10 May 2024 15:52:07 +0200 Subject: [PATCH 3/3] c/scheduling: detect concurrent allocation_state replacements Shard-local allocation_state object is replaced when we are applying a controller snapshot. After this happens, all live allocated_partition objects become invalid. Detect this and throw concurrent_modification_error in case these objects are still used and make destructors no-op. (cherry picked from commit a57a5757e278ac6b7ca0c58de0f1b23075a93402) Conflicts: src/v/cluster/scheduling/types.cc --- src/v/cluster/scheduling/types.cc | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 1becc229fea4d..a20ecdd03827c 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -76,6 +77,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); @@ -96,6 +100,11 @@ allocated_partition::allocated_partition( std::optional allocated_partition::prepare_move(model::node_id prev_node) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + previous_replica prev; auto it = std::find_if( _replicas.begin(), _replicas.end(), [prev_node](const auto& bs) { @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; }