Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] [v24.1.x] miscellaneous idempotency fixes #22687 #22781

Merged
merged 9 commits into from
Aug 15, 2024
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ v_cc_library(
HDRS
topic_recovery_validator.h
SRCS
errors.cc
metadata_cache.cc
partition_manager.cc
scheduling/partition_allocator.cc
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ enum class errc : int16_t {
producer_ids_vcluster_limit_exceeded,
validation_of_recovery_topic_failed,
};

std::ostream& operator<<(std::ostream& o, errc err);

struct errc_category final : public std::error_category {
const char* name() const noexcept final { return "cluster::errc"; }

Expand Down
171 changes: 171 additions & 0 deletions src/v/cluster/errors.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/errc.h"

#include <iostream>

namespace cluster {
std::ostream& operator<<(std::ostream& o, cluster::errc err) {
switch (err) {
case errc::success:
return o << "cluster::errc::success";
case errc::notification_wait_timeout:
return o << "cluster::errc::notification_wait_timeout";
case errc::topic_invalid_partitions:
return o << "cluster::errc::topic_invalid_partitions";
case errc::topic_invalid_replication_factor:
return o << "cluster::errc::topic_invalid_replication_factor";
case errc::topic_invalid_config:
return o << "cluster::errc::topic_invalid_config";
case errc::not_leader_controller:
return o << "cluster::errc::not_leader_controller";
case errc::topic_already_exists:
return o << "cluster::errc::topic_already_exists";
case errc::replication_error:
return o << "cluster::errc::replication_error";
case errc::shutting_down:
return o << "cluster::errc::shutting_down";
case errc::no_leader_controller:
return o << "cluster::errc::no_leader_controller";
case errc::join_request_dispatch_error:
return o << "cluster::errc::join_request_dispatch_error";
case errc::seed_servers_exhausted:
return o << "cluster::errc::seed_servers_exhausted";
case errc::auto_create_topics_exception:
return o << "cluster::errc::auto_create_topics_exception";
case errc::timeout:
return o << "cluster::errc::timeout";
case errc::topic_not_exists:
return o << "cluster::errc::topic_not_exists";
case errc::invalid_topic_name:
return o << "cluster::errc::invalid_topic_name";
case errc::partition_not_exists:
return o << "cluster::errc::partition_not_exists";
case errc::not_leader:
return o << "cluster::errc::not_leader";
case errc::partition_already_exists:
return o << "cluster::errc::partition_already_exists";
case errc::waiting_for_recovery:
return o << "cluster::errc::waiting_for_recovery";
case errc::waiting_for_reconfiguration_finish:
return o << "cluster::errc::waiting_for_reconfiguration_finish";
case errc::update_in_progress:
return o << "cluster::errc::update_in_progress";
case errc::user_exists:
return o << "cluster::errc::user_exists";
case errc::user_does_not_exist:
return o << "cluster::errc::user_does_not_exist";
case errc::invalid_producer_epoch:
return o << "cluster::errc::invalid_producer_epoch";
case errc::sequence_out_of_order:
return o << "cluster::errc::sequence_out_of_order";
case errc::generic_tx_error:
return o << "cluster::errc::generic_tx_error";
case errc::node_does_not_exists:
return o << "cluster::errc::node_does_not_exists";
case errc::invalid_node_operation:
return o << "cluster::errc::invalid_node_operation";
case errc::invalid_configuration_update:
return o << "cluster::errc::invalid_configuration_update";
case errc::topic_operation_error:
return o << "cluster::errc::topic_operation_error";
case errc::no_eligible_allocation_nodes:
return o << "cluster::errc::no_eligible_allocation_nodes";
case errc::allocation_error:
return o << "cluster::errc::allocation_error";
case errc::partition_configuration_revision_not_updated:
return o
<< "cluster::errc::partition_configuration_revision_not_updated";
case errc::partition_configuration_in_joint_mode:
return o << "cluster::errc::partition_configuration_in_joint_mode";
case errc::partition_configuration_leader_config_not_committed:
return o << "cluster::errc::partition_configuration_leader_config_not_"
"committed";
case errc::partition_configuration_differs:
return o << "cluster::errc::partition_configuration_differs";
case errc::data_policy_already_exists:
return o << "cluster::errc::data_policy_already_exists";
case errc::data_policy_not_exists:
return o << "cluster::errc::data_policy_not_exists";
case errc::source_topic_not_exists:
return o << "cluster::errc::source_topic_not_exists";
case errc::source_topic_still_in_use:
return o << "cluster::errc::source_topic_still_in_use";
case errc::waiting_for_partition_shutdown:
return o << "cluster::errc::waiting_for_partition_shutdown";
case errc::error_collecting_health_report:
return o << "cluster::errc::error_collecting_health_report";
case errc::leadership_changed:
return o << "cluster::errc::leadership_changed";
case errc::feature_disabled:
return o << "cluster::errc::feature_disabled";
case errc::invalid_request:
return o << "cluster::errc::invalid_request";
case errc::no_update_in_progress:
return o << "cluster::errc::no_update_in_progress";
case errc::unknown_update_interruption_error:
return o << "cluster::errc::unknown_update_interruption_error";
case errc::throttling_quota_exceeded:
return o << "cluster::errc::throttling_quota_exceeded";
case errc::cluster_already_exists:
return o << "cluster::errc::cluster_already_exists";
case errc::no_partition_assignments:
return o << "cluster::errc::no_partition_assignments";
case errc::failed_to_create_partition:
return o << "cluster::errc::failed_to_create_partition";
case errc::partition_operation_failed:
return o << "cluster::errc::partition_operation_failed";
case errc::transform_does_not_exist:
return o << "cluster::errc::transform_does_not_exist";
case errc::transform_invalid_update:
return o << "cluster::errc::transform_invalid_update";
case errc::transform_invalid_create:
return o << "cluster::errc::transform_invalid_create";
case errc::transform_invalid_source:
return o << "cluster::errc::transform_invalid_source";
case errc::transform_invalid_environment:
return o << "cluster::errc::transform_invalid_environment";
case errc::trackable_keys_limit_exceeded:
return o << "cluster::errc::trackable_keys_limit_exceeded";
case errc::topic_disabled:
return o << "cluster::errc::topic_disabled";
case errc::partition_disabled:
return o << "cluster::errc::partition_disabled";
case errc::invalid_partition_operation:
return o << "cluster::errc::invalid_partition_operation";
case errc::concurrent_modification_error:
return o << "cluster::errc::concurrent_modification_error";
case errc::transform_count_limit_exceeded:
return o << "cluster::errc::transform_count_limit_exceeded";
case errc::role_exists:
return o << "cluster::errc::role_exists";
case errc::role_does_not_exist:
return o << "cluster::errc::role_does_not_exist";
case errc::inconsistent_stm_update:
return o << "cluster::errc::inconsistent_stm_update";
case errc::waiting_for_shard_placement_update:
return o << "cluster::errc::waiting_for_shard_placement_update";
case errc::topic_invalid_partitions_core_limit:
return o << "cluster::errc::topic_invalid_partitions_core_limit";
case errc::topic_invalid_partitions_memory_limit:
return o << "cluster::errc::topic_invalid_partitions_memory_limit";
case errc::topic_invalid_partitions_fd_limit:
return o << "cluster::errc::topic_invalid_partitions_fd_limit";
case errc::topic_invalid_partitions_decreased:
return o << "cluster::errc::topic_invalid_partitions_decreased";
case errc::producer_ids_vcluster_limit_exceeded:
return o << "cluster::errc::producer_ids_vcluster_limit_exceeded";
case errc::validation_of_recovery_topic_failed:
return o << "cluster::errc::validation_of_recovery_topic_failed";
}
}
} // namespace cluster
42 changes: 26 additions & 16 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ std::optional<request_ptr> requests::last_request() const {
return std::nullopt;
}

void requests::reset(request_result_t::error_type error) {
for (auto& request : _inflight_requests) {
if (!request->has_completed()) {
request->set_error(error);
}
}
_inflight_requests.clear();
_finished_requests.clear();
}

bool requests::is_valid_sequence(seq_t incoming) const {
auto last_req = last_request();
return
Expand All @@ -138,13 +148,7 @@ result<request_ptr> requests::try_emplace(
if (reset_sequences) {
// reset all the sequence tracking state, avoids any sequence
// checks for sequence tracking.
while (!_inflight_requests.empty()) {
if (!_inflight_requests.front()->has_completed()) {
_inflight_requests.front()->set_error(errc::timeout);
}
_inflight_requests.pop_front();
}
_finished_requests.clear();
reset(errc::timeout);
} else {
// gc and fail any inflight requests from old terms
// these are guaranteed to be failed because of sync() guarantees
Expand Down Expand Up @@ -232,15 +236,7 @@ bool requests::stm_apply(
return relink_producer;
}

void requests::shutdown() {
for (auto& request : _inflight_requests) {
if (!request->has_completed()) {
request->set_error(errc::shutting_down);
}
}
_inflight_requests.clear();
_finished_requests.clear();
}
void requests::shutdown() { reset(cluster::errc::shutting_down); }

producer_state::producer_state(
ss::noncopyable_function<void()> post_eviction_hook,
Expand Down Expand Up @@ -315,6 +311,17 @@ bool producer_state::can_evict() {
return true;
}

void producer_state::reset_with_new_epoch(model::producer_epoch new_epoch) {
vassert(
new_epoch > _id.get_epoch(),
"Invalid epoch bump to {} for producer {}",
new_epoch,
*this);
vlog(clusterlog.info, "[{}] Reseting epoch to {}", *this, new_epoch);
_requests.reset(errc::timeout);
_id = model::producer_identity(_id.id, new_epoch);
}

result<request_ptr> producer_state::try_emplace_request(
const model::batch_identity& bid, model::term_id current_term, bool reset) {
if (bid.first_seq > bid.last_seq) {
Expand Down Expand Up @@ -352,6 +359,9 @@ bool producer_state::update(
if (_evicted) {
return false;
}
if (!bid.is_transactional && bid.pid.epoch > _id.epoch) {
reset_with_new_epoch(model::producer_epoch{bid.pid.epoch});
}
bool relink_producer = _requests.stm_apply(bid, offset);
vlog(
clusterlog.trace,
Expand Down
12 changes: 12 additions & 0 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class requests {
static_cast<unsigned long>(requests_cached_max));
bool is_valid_sequence(seq_t incoming) const;
std::optional<request_ptr> last_request() const;
void reset(request_result_t::error_type);
ss::chunked_fifo<request_ptr, chunk_size> _inflight_requests;
ss::chunked_fifo<request_ptr, chunk_size> _finished_requests;
friend producer_state;
Expand Down Expand Up @@ -208,6 +209,17 @@ class producer_state {
void update_current_txn_start_offset(std::optional<kafka::offset> offset) {
_current_txn_start_offset = offset;
}

model::producer_identity id() const { return _id; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I guess this better fits in a5bb2d2?


// Resets the producer to use a new epoch. The new epoch should be strictly
// larger than the current epoch. This is only used by the idempotent
// producers trying to bump epoch of the existing producer based on the
// incoming request with a higher epoch. Transactions follow a separate
// fencing based approach to bump epochs as it requires aborting any in
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

safe_intrusive_list_hook _hook;

private:
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ namespace cluster {

producer_state_manager::producer_state_manager(
config::binding<uint64_t> max_producer_ids,
std::chrono::milliseconds producer_expiration_ms,
config::binding<std::chrono::milliseconds> producer_expiration_ms,
config::binding<size_t> virtual_cluster_min_producer_ids)
: _producer_expiration_ms(producer_expiration_ms)
: _producer_expiration_ms(std::move(producer_expiration_ms))
, _max_ids(std::move(max_producer_ids))
, _virtual_cluster_min_producer_ids(
std::move(virtual_cluster_min_producer_ids))
Expand Down Expand Up @@ -95,7 +95,7 @@ void producer_state_manager::touch(
}
void producer_state_manager::evict_excess_producers() {
_cache.evict_older_than<ss::lowres_system_clock>(
ss::lowres_system_clock::now() - _producer_expiration_ms);
ss::lowres_system_clock::now() - _producer_expiration_ms());
if (!_gate.is_closed()) {
_reaper.arm(period);
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/producer_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class producer_state_manager {
explicit producer_state_manager(

config::binding<uint64_t> max_producer_ids,
std::chrono::milliseconds producer_expiration_ms,
config::binding<std::chrono::milliseconds> producer_expiration_ms,
config::binding<size_t> virtual_cluster_min_producer_ids);

ss::future<> start();
Expand Down Expand Up @@ -75,7 +75,7 @@ class producer_state_manager {
void evict_excess_producers();
size_t _eviction_counter = 0;
// if a producer is inactive for this long, it will be gc-ed
std::chrono::milliseconds _producer_expiration_ms;
config::binding<std::chrono::milliseconds> _producer_expiration_ms;
// maximum number of active producers allowed on this shard across
// all partitions. When exceeded, producers are evicted on an
// LRU basis.
Expand Down
Loading