diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 1becc229fea4d..a20ecdd03827c 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -76,6 +77,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (auto& pas : _assignments) { for (auto& replica : pas.replicas) { _state->remove_allocation(replica, _domain); @@ -96,6 +100,11 @@ allocated_partition::allocated_partition( std::optional allocated_partition::prepare_move(model::node_id prev_node) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + previous_replica prev; auto it = std::find_if( _replicas.begin(), _replicas.end(), [prev_node](const auto& bs) { @@ -149,6 +158,11 @@ allocated_partition::prepare_move(model::node_id prev_node) { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -225,7 +239,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; } diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index c677d9a771b75..0c80ff2fb089c 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -344,6 +344,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { _updates_in_progress.erase(it); + _topics_map_revision++; + on_partition_move_finish(cmd.key, cmd.value); // notify backend about finished update @@ -417,6 +419,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { current_assignment_it->replicas = in_progress_it->second.get_previous_replicas(); + _topics_map_revision++; + _pending_deltas.emplace_back( std::move(cmd.key), model::revision_id(o), @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { co_return errc::no_update_in_progress; } + auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); + if (p_meta_it == tp->second.partitions.end()) { + co_return errc::partition_not_exists; + } + // revert replica set update current_assignment_it->replicas = in_progress_it->second.get_target_replicas(); @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { current_assignment_it->replicas, }; - // update partition_meta object - auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); - if (p_meta_it == tp->second.partitions.end()) { - co_return errc::partition_not_exists; - } + // update partition_meta object: // the cancellation was reverted and update went through, we must // update replicas_revisions. p_meta_it->second.replicas_revisions = update_replicas_revisions( @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { /// Since the update is already finished we drop in_progress state _updates_in_progress.erase(in_progress_it); + _topics_map_revision++; + // notify backend about finished update _pending_deltas.emplace_back( ntp, model::revision_id(o), topic_table_delta_type::replicas_updated); @@ -664,6 +671,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { } } + _topics_map_revision++; notify_waiters(); co_return errc::success; @@ -988,6 +996,7 @@ class topic_table::snapshot_applier { disabled_partitions_t& _disabled_partitions; fragmented_vector& _pending_deltas; topic_table_probe& _probe; + model::revision_id& _topics_map_revision; model::revision_id _snap_revision; public: @@ -996,6 +1005,7 @@ class topic_table::snapshot_applier { , _disabled_partitions(parent._disabled_partitions) , _pending_deltas(parent._pending_deltas) , _probe(parent._probe) + , _topics_map_revision(parent._topics_map_revision) , _snap_revision(snap_revision) {} void delete_ntp( @@ -1003,7 +1013,9 @@ class topic_table::snapshot_applier { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id); vlog( clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp); - _updates_in_progress.erase(ntp); + if (_updates_in_progress.erase(ntp)) { + _topics_map_revision++; + }; _pending_deltas.emplace_back( std::move(ntp), _snap_revision, topic_table_delta_type::removed); @@ -1022,7 +1034,9 @@ class topic_table::snapshot_applier { delete_ntp(ns_tp, p_as); co_await ss::coroutine::maybe_yield(); } - _disabled_partitions.erase(ns_tp); + if (_disabled_partitions.erase(ns_tp)) { + _topics_map_revision++; + }; _probe.handle_topic_deletion(ns_tp); // topic_metadata_item object is supposed to be removed from _topics by // the caller @@ -1037,6 +1051,9 @@ class topic_table::snapshot_applier { vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp); size_t pending_deltas_start_idx = _pending_deltas.size(); + // we are going to modify md_item so increment the revision right away. + _topics_map_revision++; + const model::partition_id p_id = ntp.tp.partition; // 1. reconcile the _topics state (the md_item object) and generate @@ -1169,7 +1186,9 @@ class topic_table::snapshot_applier { topic_metadata_item ret{topic_metadata{topic.metadata, {}}}; if (topic.disabled_set) { _disabled_partitions[ns_tp] = *topic.disabled_set; + _topics_map_revision++; } + for (const auto& [p_id, partition] : topic.partitions) { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id); add_ntp(ntp, topic, partition, ret, false); @@ -1208,6 +1227,7 @@ ss::future<> topic_table::apply_snapshot( // The topic was re-created, delete and add it anew. co_await applier.delete_topic(ns_tp, md_item); md_item = co_await applier.create_topic(ns_tp, topic_snapshot); + _topics_map_revision++; } else { // The topic was present in the previous set, now we need to // reconcile individual partitions. @@ -1225,10 +1245,12 @@ ss::future<> topic_table::apply_snapshot( old_disabled_set = std::exchange( _disabled_partitions[ns_tp], *topic_snapshot.disabled_set); + _topics_map_revision++; } else if (auto it = _disabled_partitions.find(ns_tp); it != _disabled_partitions.end()) { old_disabled_set = std::move(it->second); _disabled_partitions.erase(it); + _topics_map_revision++; } // 2. For each partition in the new set, reconcile assignments @@ -1265,6 +1287,7 @@ ss::future<> topic_table::apply_snapshot( if (!topic_snapshot.partitions.contains(as_it_copy->id)) { applier.delete_ntp(ns_tp, *as_it_copy); md_item.get_assignments().erase(as_it_copy); + _topics_map_revision++; } co_await ss::coroutine::maybe_yield(); } @@ -1642,6 +1665,7 @@ void topic_table::change_partition_replicas( auto previous_assignment = current_assignment.replicas; // replace partition replica set current_assignment.replicas = new_assignment; + _topics_map_revision++; // calculate delta for backend diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 645db3216eed5..bcef38f7e26d0 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -98,21 +98,17 @@ class topic_table { // * partition::get_revision_id() // * raft::group_configuration::revision_id() - class concurrent_modification_error final : public std::exception { + class concurrent_modification_error final + : public ::concurrent_modification_error { public: concurrent_modification_error( model::revision_id initial_revision, model::revision_id current_revision) - : _msg(ssx::sformat( - "Topic table was modified by concurrent fiber. (initial_revision: " - "{}, current_revision: {}) ", + : ::concurrent_modification_error(ssx::sformat( + "Topic table was modified by concurrent fiber. " + "(initial_revision: {}, current_revision: {}) ", initial_revision, current_revision)) {} - - const char* what() const noexcept final { return _msg.c_str(); } - - private: - ss::sstring _msg; }; class in_progress_update { @@ -627,8 +623,13 @@ class topic_table { updates_t _updates_in_progress; model::revision_id _last_applied_revision_id; - // Monotonic counter that is bumped for every addition/deletion to topics - // map. Unlike other revisions this does not correspond to the command + + // Monotonic counter that is bumped each time _topics, _disabled_partitions, + // or _updates_in_progress are modified in a way that makes iteration over + // them unsafe (i.e. invalidates iterators or references, including + // for nested collections like partition sets and replica sets). + // + // Unlike other revisions this does not correspond to the command // revision that updated the map. model::revision_id _topics_map_revision{0}; diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 58c1d2d84ad01..58b7262aa66e3 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -976,7 +976,7 @@ topics_frontend::partitions_with_lost_majority( co_return errc::concurrent_modification_error; } co_return result; - } catch (const topic_table::concurrent_modification_error& e) { + } catch (const concurrent_modification_error& e) { // state changed while generating the plan, force caller to retry; vlog( clusterlog.info, diff --git a/src/v/utils/exceptions.h b/src/v/utils/exceptions.h new file mode 100644 index 0000000000000..a70b8dd549bd5 --- /dev/null +++ b/src/v/utils/exceptions.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "seastarx.h" + +#include + +#include + +/// Some objects reference state that changes comparatively rarely (e.g. +/// topic_table state) across yield points and expect these references to remain +/// valid. In case these references are invalidated by a concurrent fiber, this +/// exception is thrown. This is a signal for the caller to restart the +/// computation with up-to-date state. +class concurrent_modification_error : public std::exception { +public: + explicit concurrent_modification_error(ss::sstring s) + : _msg(std::move(s)) {} + + const char* what() const noexcept override { return _msg.c_str(); } + +private: + ss::sstring _msg; +}; diff --git a/src/v/utils/stable_iterator_adaptor.h b/src/v/utils/stable_iterator_adaptor.h index 424bcb82fd32b..fe39fbdbc8785 100644 --- a/src/v/utils/stable_iterator_adaptor.h +++ b/src/v/utils/stable_iterator_adaptor.h @@ -11,20 +11,20 @@ #pragma once #include "seastarx.h" +#include "utils/exceptions.h" #include #include #include -#include -#include #include -class iterator_stability_violation : public std::runtime_error { +class iterator_stability_violation final + : public concurrent_modification_error { public: - explicit iterator_stability_violation(const std::string& why) - : std::runtime_error(why){}; + explicit iterator_stability_violation(ss::sstring why) + : concurrent_modification_error(std::move(why)){}; }; /*