diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index debdbc5db96d1..a01af7a631fb0 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -88,6 +88,7 @@ v_cc_library( HDRS topic_recovery_validator.h SRCS + errors.cc metadata_cache.cc partition_manager.cc scheduling/partition_allocator.cc diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 6506a41cc384f..adff2d58b2abf 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -90,6 +90,9 @@ enum class errc : int16_t { producer_ids_vcluster_limit_exceeded, validation_of_recovery_topic_failed, }; + +std::ostream& operator<<(std::ostream& o, errc err); + struct errc_category final : public std::error_category { const char* name() const noexcept final { return "cluster::errc"; } diff --git a/src/v/cluster/errors.cc b/src/v/cluster/errors.cc new file mode 100644 index 0000000000000..5f4b47b83e76b --- /dev/null +++ b/src/v/cluster/errors.cc @@ -0,0 +1,171 @@ +/** + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/errc.h" + +#include + +namespace cluster { +std::ostream& operator<<(std::ostream& o, cluster::errc err) { + switch (err) { + case errc::success: + return o << "cluster::errc::success"; + case errc::notification_wait_timeout: + return o << "cluster::errc::notification_wait_timeout"; + case errc::topic_invalid_partitions: + return o << "cluster::errc::topic_invalid_partitions"; + case errc::topic_invalid_replication_factor: + return o << "cluster::errc::topic_invalid_replication_factor"; + case errc::topic_invalid_config: + return o << "cluster::errc::topic_invalid_config"; + case errc::not_leader_controller: + return o << "cluster::errc::not_leader_controller"; + case errc::topic_already_exists: + return o << "cluster::errc::topic_already_exists"; + case errc::replication_error: + return o << "cluster::errc::replication_error"; + case errc::shutting_down: + return o << "cluster::errc::shutting_down"; + case errc::no_leader_controller: + return o << "cluster::errc::no_leader_controller"; + case errc::join_request_dispatch_error: + return o << "cluster::errc::join_request_dispatch_error"; + case errc::seed_servers_exhausted: + return o << "cluster::errc::seed_servers_exhausted"; + case errc::auto_create_topics_exception: + return o << "cluster::errc::auto_create_topics_exception"; + case errc::timeout: + return o << "cluster::errc::timeout"; + case errc::topic_not_exists: + return o << "cluster::errc::topic_not_exists"; + case errc::invalid_topic_name: + return o << "cluster::errc::invalid_topic_name"; + case errc::partition_not_exists: + return o << "cluster::errc::partition_not_exists"; + case errc::not_leader: + return o << "cluster::errc::not_leader"; + case errc::partition_already_exists: + return o << "cluster::errc::partition_already_exists"; + case errc::waiting_for_recovery: + return o << "cluster::errc::waiting_for_recovery"; + case errc::waiting_for_reconfiguration_finish: + return o << "cluster::errc::waiting_for_reconfiguration_finish"; + case errc::update_in_progress: + return o << "cluster::errc::update_in_progress"; + case errc::user_exists: + return o << "cluster::errc::user_exists"; + case errc::user_does_not_exist: + return o << "cluster::errc::user_does_not_exist"; + case errc::invalid_producer_epoch: + return o << "cluster::errc::invalid_producer_epoch"; + case errc::sequence_out_of_order: + return o << "cluster::errc::sequence_out_of_order"; + case errc::generic_tx_error: + return o << "cluster::errc::generic_tx_error"; + case errc::node_does_not_exists: + return o << "cluster::errc::node_does_not_exists"; + case errc::invalid_node_operation: + return o << "cluster::errc::invalid_node_operation"; + case errc::invalid_configuration_update: + return o << "cluster::errc::invalid_configuration_update"; + case errc::topic_operation_error: + return o << "cluster::errc::topic_operation_error"; + case errc::no_eligible_allocation_nodes: + return o << "cluster::errc::no_eligible_allocation_nodes"; + case errc::allocation_error: + return o << "cluster::errc::allocation_error"; + case errc::partition_configuration_revision_not_updated: + return o + << "cluster::errc::partition_configuration_revision_not_updated"; + case errc::partition_configuration_in_joint_mode: + return o << "cluster::errc::partition_configuration_in_joint_mode"; + case errc::partition_configuration_leader_config_not_committed: + return o << "cluster::errc::partition_configuration_leader_config_not_" + "committed"; + case errc::partition_configuration_differs: + return o << "cluster::errc::partition_configuration_differs"; + case errc::data_policy_already_exists: + return o << "cluster::errc::data_policy_already_exists"; + case errc::data_policy_not_exists: + return o << "cluster::errc::data_policy_not_exists"; + case errc::source_topic_not_exists: + return o << "cluster::errc::source_topic_not_exists"; + case errc::source_topic_still_in_use: + return o << "cluster::errc::source_topic_still_in_use"; + case errc::waiting_for_partition_shutdown: + return o << "cluster::errc::waiting_for_partition_shutdown"; + case errc::error_collecting_health_report: + return o << "cluster::errc::error_collecting_health_report"; + case errc::leadership_changed: + return o << "cluster::errc::leadership_changed"; + case errc::feature_disabled: + return o << "cluster::errc::feature_disabled"; + case errc::invalid_request: + return o << "cluster::errc::invalid_request"; + case errc::no_update_in_progress: + return o << "cluster::errc::no_update_in_progress"; + case errc::unknown_update_interruption_error: + return o << "cluster::errc::unknown_update_interruption_error"; + case errc::throttling_quota_exceeded: + return o << "cluster::errc::throttling_quota_exceeded"; + case errc::cluster_already_exists: + return o << "cluster::errc::cluster_already_exists"; + case errc::no_partition_assignments: + return o << "cluster::errc::no_partition_assignments"; + case errc::failed_to_create_partition: + return o << "cluster::errc::failed_to_create_partition"; + case errc::partition_operation_failed: + return o << "cluster::errc::partition_operation_failed"; + case errc::transform_does_not_exist: + return o << "cluster::errc::transform_does_not_exist"; + case errc::transform_invalid_update: + return o << "cluster::errc::transform_invalid_update"; + case errc::transform_invalid_create: + return o << "cluster::errc::transform_invalid_create"; + case errc::transform_invalid_source: + return o << "cluster::errc::transform_invalid_source"; + case errc::transform_invalid_environment: + return o << "cluster::errc::transform_invalid_environment"; + case errc::trackable_keys_limit_exceeded: + return o << "cluster::errc::trackable_keys_limit_exceeded"; + case errc::topic_disabled: + return o << "cluster::errc::topic_disabled"; + case errc::partition_disabled: + return o << "cluster::errc::partition_disabled"; + case errc::invalid_partition_operation: + return o << "cluster::errc::invalid_partition_operation"; + case errc::concurrent_modification_error: + return o << "cluster::errc::concurrent_modification_error"; + case errc::transform_count_limit_exceeded: + return o << "cluster::errc::transform_count_limit_exceeded"; + case errc::role_exists: + return o << "cluster::errc::role_exists"; + case errc::role_does_not_exist: + return o << "cluster::errc::role_does_not_exist"; + case errc::inconsistent_stm_update: + return o << "cluster::errc::inconsistent_stm_update"; + case errc::waiting_for_shard_placement_update: + return o << "cluster::errc::waiting_for_shard_placement_update"; + case errc::topic_invalid_partitions_core_limit: + return o << "cluster::errc::topic_invalid_partitions_core_limit"; + case errc::topic_invalid_partitions_memory_limit: + return o << "cluster::errc::topic_invalid_partitions_memory_limit"; + case errc::topic_invalid_partitions_fd_limit: + return o << "cluster::errc::topic_invalid_partitions_fd_limit"; + case errc::topic_invalid_partitions_decreased: + return o << "cluster::errc::topic_invalid_partitions_decreased"; + case errc::producer_ids_vcluster_limit_exceeded: + return o << "cluster::errc::producer_ids_vcluster_limit_exceeded"; + case errc::validation_of_recovery_topic_failed: + return o << "cluster::errc::validation_of_recovery_topic_failed"; + } +} +} // namespace cluster diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index d261a23f93ded..0b2f32ab1c783 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -122,6 +122,16 @@ std::optional requests::last_request() const { return std::nullopt; } +void requests::reset(request_result_t::error_type error) { + for (auto& request : _inflight_requests) { + if (!request->has_completed()) { + request->set_error(error); + } + } + _inflight_requests.clear(); + _finished_requests.clear(); +} + bool requests::is_valid_sequence(seq_t incoming) const { auto last_req = last_request(); return @@ -138,13 +148,7 @@ result requests::try_emplace( if (reset_sequences) { // reset all the sequence tracking state, avoids any sequence // checks for sequence tracking. - while (!_inflight_requests.empty()) { - if (!_inflight_requests.front()->has_completed()) { - _inflight_requests.front()->set_error(errc::timeout); - } - _inflight_requests.pop_front(); - } - _finished_requests.clear(); + reset(errc::timeout); } else { // gc and fail any inflight requests from old terms // these are guaranteed to be failed because of sync() guarantees @@ -232,15 +236,7 @@ bool requests::stm_apply( return relink_producer; } -void requests::shutdown() { - for (auto& request : _inflight_requests) { - if (!request->has_completed()) { - request->set_error(errc::shutting_down); - } - } - _inflight_requests.clear(); - _finished_requests.clear(); -} +void requests::shutdown() { reset(cluster::errc::shutting_down); } producer_state::producer_state( ss::noncopyable_function post_eviction_hook, @@ -315,6 +311,17 @@ bool producer_state::can_evict() { return true; } +void producer_state::reset_with_new_epoch(model::producer_epoch new_epoch) { + vassert( + new_epoch > _id.get_epoch(), + "Invalid epoch bump to {} for producer {}", + new_epoch, + *this); + vlog(clusterlog.info, "[{}] Reseting epoch to {}", *this, new_epoch); + _requests.reset(errc::timeout); + _id = model::producer_identity(_id.id, new_epoch); +} + result producer_state::try_emplace_request( const model::batch_identity& bid, model::term_id current_term, bool reset) { if (bid.first_seq > bid.last_seq) { @@ -352,6 +359,9 @@ bool producer_state::update( if (_evicted) { return false; } + if (!bid.is_transactional && bid.pid.epoch > _id.epoch) { + reset_with_new_epoch(model::producer_epoch{bid.pid.epoch}); + } bool relink_producer = _requests.stm_apply(bid, offset); vlog( clusterlog.trace, diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index d5f710c3e053e..53926499f4afa 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -130,6 +130,7 @@ class requests { static_cast(requests_cached_max)); bool is_valid_sequence(seq_t incoming) const; std::optional last_request() const; + void reset(request_result_t::error_type); ss::chunked_fifo _inflight_requests; ss::chunked_fifo _finished_requests; friend producer_state; @@ -208,6 +209,17 @@ class producer_state { void update_current_txn_start_offset(std::optional offset) { _current_txn_start_offset = offset; } + + model::producer_identity id() const { return _id; } + + // Resets the producer to use a new epoch. The new epoch should be strictly + // larger than the current epoch. This is only used by the idempotent + // producers trying to bump epoch of the existing producer based on the + // incoming request with a higher epoch. Transactions follow a separate + // fencing based approach to bump epochs as it requires aborting any in + // progress transactions with older epoch. + void reset_with_new_epoch(model::producer_epoch new_epoch); + safe_intrusive_list_hook _hook; private: diff --git a/src/v/cluster/producer_state_manager.cc b/src/v/cluster/producer_state_manager.cc index 0dee65f81707d..4bc68c1290821 100644 --- a/src/v/cluster/producer_state_manager.cc +++ b/src/v/cluster/producer_state_manager.cc @@ -25,9 +25,9 @@ namespace cluster { producer_state_manager::producer_state_manager( config::binding max_producer_ids, - std::chrono::milliseconds producer_expiration_ms, + config::binding producer_expiration_ms, config::binding virtual_cluster_min_producer_ids) - : _producer_expiration_ms(producer_expiration_ms) + : _producer_expiration_ms(std::move(producer_expiration_ms)) , _max_ids(std::move(max_producer_ids)) , _virtual_cluster_min_producer_ids( std::move(virtual_cluster_min_producer_ids)) @@ -95,7 +95,7 @@ void producer_state_manager::touch( } void producer_state_manager::evict_excess_producers() { _cache.evict_older_than( - ss::lowres_system_clock::now() - _producer_expiration_ms); + ss::lowres_system_clock::now() - _producer_expiration_ms()); if (!_gate.is_closed()) { _reaper.arm(period); } diff --git a/src/v/cluster/producer_state_manager.h b/src/v/cluster/producer_state_manager.h index 3e582aa743875..486d1e34aa3d6 100644 --- a/src/v/cluster/producer_state_manager.h +++ b/src/v/cluster/producer_state_manager.h @@ -25,7 +25,7 @@ class producer_state_manager { explicit producer_state_manager( config::binding max_producer_ids, - std::chrono::milliseconds producer_expiration_ms, + config::binding producer_expiration_ms, config::binding virtual_cluster_min_producer_ids); ss::future<> start(); @@ -75,7 +75,7 @@ class producer_state_manager { void evict_excess_producers(); size_t _eviction_counter = 0; // if a producer is inactive for this long, it will be gc-ed - std::chrono::milliseconds _producer_expiration_ms; + config::binding _producer_expiration_ms; // maximum number of active producers allowed on this shard across // all partitions. When exceeded, producers are evicted on an // LRU basis. diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 7a538389bcd72..5f3b1de117868 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -320,7 +320,8 @@ ss::future rm_stm::bootstrap_committed_offset() { .then([this] { return _raft->committed_offset(); }); } -producer_ptr rm_stm::maybe_create_producer(model::producer_identity pid) { +std::pair +rm_stm::maybe_create_producer(model::producer_identity pid) { // Double lookup because of two reasons // 1. we are forced to use a ptr as map value_type because producer_state is // not movable @@ -328,14 +329,14 @@ producer_ptr rm_stm::maybe_create_producer(model::producer_identity pid) { // as it is memory friendly. auto it = _producers.find(pid); if (it != _producers.end()) { - return it->second; + return std::make_pair(it->second, producer_previously_known::yes); } auto producer = ss::make_lw_shared( pid, _raft->group(), [pid, this] { cleanup_producer_state(pid); }); _producer_state_manager.local().register_producer(*producer, _vcluster_id); _producers.emplace(pid, producer); - return producer; + return std::make_pair(producer, producer_previously_known::no); } void rm_stm::cleanup_producer_state(model::producer_identity pid) { @@ -1104,7 +1105,7 @@ ss::future> rm_stm::transactional_replicate( if (!check_tx_permitted()) { co_return errc::generic_tx_error; } - auto producer = maybe_create_producer(bid.pid); + auto [producer, _] = maybe_create_producer(bid.pid); co_return co_await producer ->run_with_lock([&](ssx::semaphore_units units) { return do_sync_and_transactional_replicate( @@ -1121,7 +1122,8 @@ ss::future> rm_stm::do_sync_and_idempotent_replicate( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued, - ssx::semaphore_units units) { + ssx::semaphore_units units, + producer_previously_known known_producer) { if (!co_await sync(_sync_timeout)) { // it's ok not to set enqueued on early return because // the safety check in replicate_in_stages sets it automatically @@ -1135,7 +1137,8 @@ ss::future> rm_stm::do_sync_and_idempotent_replicate( std::move(br), opts, std::move(enqueued), - units); + units, + known_producer); if (!result) { vlog( @@ -1178,8 +1181,31 @@ ss::future> rm_stm::do_idempotent_replicate( model::record_batch_reader br, raft::replicate_options opts, ss::lw_shared_ptr> enqueued, - ssx::semaphore_units& units) { - auto request = producer->try_emplace_request(bid, synced_term); + ssx::semaphore_units& units, + producer_previously_known known_producer) { + // Check if the producer bumped the epoch and reset accordingly. + if (bid.pid.epoch > producer->id().epoch) { + producer->reset_with_new_epoch(model::producer_epoch{bid.pid.epoch}); + } + // If the producer is unknown and is producing with a non zero + // sequence number, it is possible that the producer has been evicted + // from the broker memory. Instead of rejecting the request, we accept + // the current sequence and move on. This logic is similar to what + // Apache Kafka does. + // Ref: + // https://github.com/apache/kafka/blob/704476885ffb40cd3bf9b8f5c368c01eaee0a737 + // storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java#L135 + auto skip_sequence_checks = !known_producer && bid.first_seq > 0; + if (unlikely(skip_sequence_checks)) { + vlog( + _ctx_log.warn, + "Accepting batch from unknown producer that likely got evicted: {}, " + "term: {}", + bid, + synced_term); + } + auto request = producer->try_emplace_request( + bid, synced_term, skip_sequence_checks); if (!request) { co_return request.error(); } @@ -1230,16 +1256,17 @@ ss::future> rm_stm::idempotent_replicate( raft::replicate_options opts, ss::lw_shared_ptr> enqueued) { try { - auto producer = maybe_create_producer(bid.pid); + auto [producer, known_producer] = maybe_create_producer(bid.pid); co_return co_await producer - ->run_with_lock([&](ssx::semaphore_units units) { + ->run_with_lock([&, known_producer](ssx::semaphore_units units) { return do_sync_and_idempotent_replicate( producer, bid, std::move(br), opts, std::move(enqueued), - std::move(units)); + std::move(units), + known_producer); }) .finally([this, producer] { _producer_state_manager.local().touch(*producer, _vcluster_id); @@ -1757,7 +1784,7 @@ void rm_stm::apply_control( // either epoch is the same as fencing or it's lesser in the latter // case we don't fence off aborts and commits because transactional // manager already decided a tx's outcome and acked it to the client - auto producer = maybe_create_producer(pid); + auto [producer, _] = maybe_create_producer(pid); if (likely( crt == model::control_record_type::tx_abort @@ -1821,7 +1848,7 @@ void rm_stm::apply_data( _highest_producer_id = std::max(_highest_producer_id, bid.pid.get_id()); const auto last_offset = header.last_offset(); const auto last_kafka_offset = from_log_offset(header.last_offset()); - auto producer = maybe_create_producer(bid.pid); + auto [producer, _] = maybe_create_producer(bid.pid); auto needs_touch = producer->update(bid, last_kafka_offset); if (needs_touch) { _producer_state_manager.local().touch(*producer, _vcluster_id); diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index eabec04245975..64ab3f0e21b73 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -273,7 +273,12 @@ class rm_stm final : public raft::persisted_stm<> { ss::future<> do_remove_persistent_state(); ss::future> do_aborted_transactions(model::offset, model::offset); - producer_ptr maybe_create_producer(model::producer_identity); + // Tells whether the producer is already known or is created + // for the first time from the incoming request. + using producer_previously_known + = ss::bool_class; + std::pair + maybe_create_producer(model::producer_identity); void cleanup_producer_state(model::producer_identity); ss::future<> reset_producers(); model::record_batch make_fence_batch( @@ -333,7 +338,8 @@ class rm_stm final : public raft::persisted_stm<> { model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>, - ssx::semaphore_units&); + ssx::semaphore_units&, + producer_previously_known); ss::future> do_sync_and_idempotent_replicate( producer_ptr, @@ -341,7 +347,8 @@ class rm_stm final : public raft::persisted_stm<> { model::record_batch_reader, raft::replicate_options, ss::lw_shared_ptr>, - ssx::semaphore_units); + ssx::semaphore_units, + producer_previously_known); ss::future> replicate_msg( model::record_batch_reader, diff --git a/src/v/cluster/tests/idempotency_tests.cc b/src/v/cluster/tests/idempotency_tests.cc index d089366bc3791..93f4295899314 100644 --- a/src/v/cluster/tests/idempotency_tests.cc +++ b/src/v/cluster/tests/idempotency_tests.cc @@ -317,41 +317,6 @@ FIXTURE_TEST(test_rm_stm_prevents_gaps, rm_stm_test_fixture) { r2 == failure_type(cluster::errc::sequence_out_of_order)); } -FIXTURE_TEST(test_rm_stm_prevents_odd_session_start_off, rm_stm_test_fixture) { - create_stm_and_start_raft(); - auto& stm = *_stm; - stm.testing_only_disable_auto_abort(); - - stm.start().get0(); - - wait_for_confirmed_leader(); - wait_for_meta_initialized(); - - auto count = 5; - auto rdr = random_batches_reader(model::test::record_batch_spec{ - .offset = model::offset(0), - .allow_compression = true, - .count = count, - .enable_idempotence = true, - .producer_id = 0, - .producer_epoch = 0, - .base_sequence = 1}); - - auto bid = model::batch_identity{ - .pid = model::producer_identity{0, 0}, - .first_seq = 1, - .last_seq = 1 + (count - 1)}; - - auto r = stm - .replicate( - bid, - std::move(rdr), - raft::replicate_options(raft::consistency_level::quorum_ack)) - .get0(); - BOOST_REQUIRE( - r == failure_type(cluster::errc::sequence_out_of_order)); -} - FIXTURE_TEST(test_rm_stm_passes_immediate_retry, rm_stm_test_fixture) { create_stm_and_start_raft(); auto& stm = *_stm; diff --git a/src/v/cluster/tests/producer_state_tests.cc b/src/v/cluster/tests/producer_state_tests.cc index ef3a9ab068432..d4bdc4613fbe0 100644 --- a/src/v/cluster/tests/producer_state_tests.cc +++ b/src/v/cluster/tests/producer_state_tests.cc @@ -52,7 +52,7 @@ struct test_fixture { size_t max_producers, size_t min_producers_per_vcluster) { _psm = std::make_unique( config::mock_binding(max_producers), - std::chrono::milliseconds::max(), + config::mock_binding(std::chrono::milliseconds::max()), config::mock_binding(min_producers_per_vcluster)); _psm->start().get(); validate_producer_count(0); diff --git a/src/v/cluster/tests/rm_stm_test_fixture.h b/src/v/cluster/tests/rm_stm_test_fixture.h index e10e5b45798bf..de80d1c181ce9 100644 --- a/src/v/cluster/tests/rm_stm_test_fixture.h +++ b/src/v/cluster/tests/rm_stm_test_fixture.h @@ -23,7 +23,7 @@ struct rm_stm_test_fixture : simple_raft_fixture { producer_state_manager .start( config::mock_binding(std::numeric_limits::max()), - std::chrono::milliseconds::max(), + config::mock_binding(std::chrono::milliseconds::max()), config::mock_binding(std::numeric_limits::max())) .get(); producer_state_manager diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 2805c42fac291..f4ad87517f6bc 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -3135,7 +3135,7 @@ void tx_gateway_frontend::expire_old_txs() { model::tx_manager_nt); if (!ntp_meta) { vlog( - txlog.error, + txlog.debug, "Topic {} doesn't exist in metadata cache,", model::tx_manager_nt); return ss::now(); diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 08a5861e689aa..e89e14f7bcfe2 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1452,7 +1452,10 @@ void application::wire_up_redpanda_services( ss::sharded_parameter([]() { return config::shard_local_cfg().max_concurrent_producer_ids.bind(); }), - config::shard_local_cfg().transactional_id_expiration_ms.value(), + ss::sharded_parameter([]() { + return config::shard_local_cfg() + .transactional_id_expiration_ms.bind(); + }), ss::sharded_parameter([]() { return config::shard_local_cfg() .virtual_cluster_min_producer_ids.bind(); diff --git a/tests/java/verifiers/pom.xml b/tests/java/verifiers/pom.xml index f30ce4af35ce5..f678f19c4b2df 100644 --- a/tests/java/verifiers/pom.xml +++ b/tests/java/verifiers/pom.xml @@ -30,6 +30,11 @@ kafka-clients 3.0.2 + + org.slf4j + slf4j-log4j12 + 1.7.12 + ${buildDir} diff --git a/tests/java/verifiers/src/main/java/idempotency/App.java b/tests/java/verifiers/src/main/java/idempotency/App.java new file mode 100644 index 0000000000000..dc5fe05786519 --- /dev/null +++ b/tests/java/verifiers/src/main/java/idempotency/App.java @@ -0,0 +1,189 @@ +package io.vectorized.idempotency; + +import static spark.Spark.*; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import java.lang.String; +import java.lang.Thread; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Semaphore; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import spark.*; + +// A simple pausable idempotent producer for sanity testing a Java based +// producer from ducktape. This is a not a verfier but rather a simple pausable +// load generating utility that can start, pause and resume a single idempotency +// session using a single producer on demand. +// +// Supported REST APIs +// - /start-producer - start a new or resume existing idempotent producer +// session +// - /pause-producer - pauses the producer +// - /stop-producer - stops the producer +public class App { + + static Logger logger = Logger.getLogger(App.class); + + static void setupLogging() { + org.apache.log4j.BasicConfigurator.configure(); + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger("io.vectorized").setLevel(Level.DEBUG); + Logger.getLogger("org.apache.kafka").setLevel(Level.DEBUG); + } + + public static class Params { + public String brokers; + public String topic; + public long partitions; + } + + public static class Progress { + public long num_produced; + }; + + static Producer createIdempotentProducer(String brokers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.LINGER_MS_CONFIG, 0); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + return new KafkaProducer(props); + } + + public static class JsonTransformer implements ResponseTransformer { + private Gson gson = new Gson(); + + @Override + public String render(Object model) { + return gson.toJson(model); + } + } + + volatile Params params = null; + volatile Thread produceThread = null; + volatile Semaphore sem = new Semaphore(1, true); + volatile boolean started = false; + volatile boolean stopped = false; + volatile Exception ex = null; + volatile long counter = 0; + + void produceLoop() { + var random = new Random(); + Producer idempotentProducer + = createIdempotentProducer(this.params.brokers); + while (!stopped) { + try { + sem.acquire(); + long partition + = random.longs(0, this.params.partitions).findFirst().getAsLong(); + String kv = Long.toString(counter); + ProducerRecord record + = new ProducerRecord<>(this.params.topic, kv, kv); + idempotentProducer.send(record).get(); + } catch (Exception e) { + ex = e; + logger.error("Exception in produce loop: ", e); + } finally { + sem.release(); + } + counter++; + } + idempotentProducer.close(); + } + + void run() throws Exception { + + port(8080); + + get("/ping", (req, res) -> { + res.status(200); + return ""; + }); + + get("/progress", (req, res) -> { + Progress progress = new Progress(); + progress.num_produced = counter; + return progress; + }, new JsonTransformer()); + + post("/start-producer", (req, res) -> { + if (this.started && !this.stopped) { + logger.info("Producer already started. unpausing it."); + if (this.sem.availablePermits() == 0) { + this.sem.release(); + } + res.status(200); + return ""; + } + logger.info("Starting producer"); + try { + this.params = (new Gson()).fromJson(req.body(), Params.class); + this.produceThread = new Thread(() -> { this.produceLoop(); }); + this.produceThread.start(); + + } catch (Exception e) { + logger.error("Exception starting produce thread ", e); + throw e; + } + this.started = true; + this.stopped = false; + res.status(200); + return ""; + }); + + post("/pause-producer", (req, res) -> { + if (!this.started) { + logger.info("Pause failed, not started."); + res.status(500); + return ""; + } + logger.info("Pausing producer"); + this.sem.acquire(); + res.status(200); + return ""; + }); + + post("/stop-producer", (req, res) -> { + logger.info("Stopping producer"); + this.stopped = true; + if (this.sem.availablePermits() == 0) { + this.sem.release(); + } + try { + if (produceThread != null) { + produceThread.join(); + } + } catch (Exception e) { + logger.error("Exception stopping producer", e); + throw e; + } + + if (ex != null) { + System.exit(1); + } + + res.status(200); + return ""; + }); + } + + public static void main(String[] args) throws Exception { + setupLogging(); + new App().run(); + } +} diff --git a/tests/rptest/services/idempotency_load_generator.py b/tests/rptest/services/idempotency_load_generator.py new file mode 100644 index 0000000000000..76e9b5e14708a --- /dev/null +++ b/tests/rptest/services/idempotency_load_generator.py @@ -0,0 +1,147 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from ducktape.services.service import Service +from rptest.util import wait_until +import requests +import sys + +OUTPUT_LOG = "/opt/remote/var/pausable_idempotent_producer.log" + + +class IdempotencyClientFailure(Exception): + pass + + +class PausableIdempotentProducer(Service): + logs = { + "pausable_idempotent_producer_stdout_stderr": { + "path": OUTPUT_LOG, + "collect_default": True + } + } + + def __init__(self, context, redpanda): + super(PausableIdempotentProducer, self).__init__(context, num_nodes=1) + self._redpanda = redpanda + + def is_alive(self, node): + result = node.account.ssh_output( + "bash /opt/remote/control/alive.sh pausable_idempotent_producer") + result = result.decode("utf-8") + return "YES" in result + + def is_ready(self): + try: + self.remote_ping() + return True + except requests.exceptions.ConnectionError: + return False + + def raise_on_violation(self, node): + self.logger.info( + f"Scanning node {node.account.hostname} log for violations...") + + for line in node.account.ssh_capture( + f"grep -e Exception {OUTPUT_LOG} || true"): + raise IdempotencyClientFailure(line) + + def start_node(self, node, timeout_sec=10): + node.account.ssh( + f"bash /opt/remote/control/start.sh pausable_idempotent_producer \"java -cp /opt/verifiers/verifiers.jar io.vectorized.idempotency.App\"" + ) + wait_until( + lambda: self.is_alive(node), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to start within {timeout_sec} sec", + retry_on_exc=False) + self._node = node + wait_until( + lambda: self.is_ready(), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to become ready within {timeout_sec} sec", + retry_on_exc=False) + + def stop_node(self, node): + node.account.ssh( + "bash /opt/remote/control/stop.sh pausable_idempotent_producer") + self.raise_on_violation(node) + + def clean_node(self, node): + pass + + def wait_node(self, node, timeout_sec=sys.maxsize): + wait_until( + lambda: not (self.is_alive(node)), + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {node.account.hostname} failed to stop within {timeout_sec} sec", + retry_on_exc=False) + return True + + def remote_ping(self): + ip = self._node.account.hostname + r = requests.get(f"http://{ip}:8080/ping") + if r.status_code != 200: + raise Exception(f"unexpected status code: {r.status_code}") + + def start_producer(self, topic, partitions): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/start-producer", + json={ + "brokers": self._redpanda.brokers(), + "topic": topic, + "partitions": partitions, + }) + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + ) + + def get_progress(self): + ip = self._node.account.hostname + return requests.get(f"http://{ip}:8080/progress") + + def ensure_progress(self, expected=1000, timeout_sec=10): + def check_progress(): + r = self.get_progress() + if r.status_code != 200: + return False + output = r.json() + self._redpanda.logger.debug(f"progress response: {output}") + return output["num_produced"] >= expected + + wait_until( + check_progress, + timeout_sec=timeout_sec, + backoff_sec=1, + err_msg= + f"pausable_idempotent_producer service {self._node.account.hostname} failed to make progress in {timeout_sec} sec", + retry_on_exc=False) + + def pause_producer(self): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/pause-producer") + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + ) + + def stop_producer(self): + ip = self._node.account.hostname + r = requests.post(f"http://{ip}:8080/stop-producer") + if r.status_code != 200: + raise Exception( + f"unexpected status code: {r.status_code} content: {r.content}" + ) diff --git a/tests/rptest/tests/idempotency_recovery_test.py b/tests/rptest/tests/idempotency_recovery_test.py new file mode 100644 index 0000000000000..f43a7380811fe --- /dev/null +++ b/tests/rptest/tests/idempotency_recovery_test.py @@ -0,0 +1,146 @@ +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +from collections import defaultdict +from rptest.clients.default import DefaultClient +from rptest.clients.rpk import RpkTool +from rptest.tests.redpanda_test import RedpandaTest +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.idempotency_load_generator import PausableIdempotentProducer +from time import sleep +from rptest.util import wait_until +import confluent_kafka as ck + + +class IdempotentProducerRecoveryTest(RedpandaTest): + """This test ensures that various client implementations can recover + from scenario in which the producers get evicted on the brokers. + When a producer is evicted it loses all old state including the + sequence numbers. So the client attempting to produce after that + should still recover and be able to make progress. There are subtle + variations among client implementations around how they deal + with this situation, so this test acts a regression test ensuring that we + do not break this behavior.""" + def __init__(self, test_context): + super(IdempotentProducerRecoveryTest, + self).__init__(test_context=test_context, num_brokers=1) + self.test_topic = TopicSpec(name="test", + partition_count=1, + replication_factor=1) + + def wait_for_eviction(self, active_producers_remaining, + expected_to_be_evicted): + def do_wait(): + samples = [ + "idempotency_pid_cache_size", + "producer_state_manager_evicted_producers" + ] + brokers = self.redpanda.started_nodes() + metrics = self.redpanda.metrics_samples(samples, brokers) + producers_per_node = defaultdict(int) + evicted_per_node = defaultdict(int) + for pattern, metric in metrics.items(): + for m in metric.samples: + id = self.redpanda.node_id(m.node) + if pattern == "idempotency_pid_cache_size": + producers_per_node[id] += int(m.value) + elif pattern == "producer_state_manager_evicted_producers": + evicted_per_node[id] += int(m.value) + + self.redpanda.logger.debug( + f"active producers: {producers_per_node}") + self.redpanda.logger.debug( + f"evicted producers: {evicted_per_node}") + + remaining_match = all([ + num == active_producers_remaining + for num in producers_per_node.values() + ]) + + evicted_match = all([ + val == expected_to_be_evicted + for val in evicted_per_node.values() + ]) + + return len(producers_per_node) == len( + brokers) and remaining_match and evicted_match + + wait_until(do_wait, + timeout_sec=20, + backoff_sec=1, + err_msg=f"Not all producers were evicted in 20secs.", + retry_on_exc=False) + + @cluster(num_nodes=2) + def test_java_client_recovery_on_producer_eviction(self): + rpk = RpkTool(self.redpanda) + rpk.create_topic(self.test_topic.name, self.test_topic.partition_count, + self.test_topic.replication_factor) + + workload_svc = PausableIdempotentProducer(self.test_context, + self.redpanda) + workload_svc.start() + + workload_svc.start_producer(self.test_topic.name, + self.test_topic.partition_count) + # Generate some load + workload_svc.ensure_progress(expected=1000, timeout_sec=20) + + # Pause the producer to evict the producer sessions + workload_svc.pause_producer() + + progress_so_far = workload_svc.get_progress().json()["num_produced"] + + # Evict all producers + rpk.cluster_config_set("transactional_id_expiration_ms", 0) + self.wait_for_eviction(active_producers_remaining=0, + expected_to_be_evicted=1) + rpk.cluster_config_set("transactional_id_expiration_ms", 3600000) + + # Resume the idempotency session again. + workload_svc.start_producer(self.test_topic.name, + self.test_topic.partition_count) + + workload_svc.ensure_progress(expected=progress_so_far + 1000, + timeout_sec=20) + # Verify the producer can make progress without exceptions + workload_svc.stop_producer() + + @cluster(num_nodes=1) + def test_librdkafka_recovery_on_producer_eviction(self): + rpk = RpkTool(self.redpanda) + rpk.create_topic(self.test_topic.name, self.test_topic.partition_count, + self.test_topic.replication_factor) + + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'enable.idempotence': True, + }) + + def produce_some(): + def on_delivery(err, _): + assert err is None, err + + for i in range(1000): + producer.produce(self.test_topic.name, + str(i), + str(i), + on_delivery=on_delivery) + producer.flush() + + produce_some() + + # Evict all producers + rpk.cluster_config_set("transactional_id_expiration_ms", 0) + self.wait_for_eviction(active_producers_remaining=0, + expected_to_be_evicted=1) + rpk.cluster_config_set("transactional_id_expiration_ms", 3600000) + + # Ensure producer can recover. + produce_some() diff --git a/tests/rptest/tests/transactions_test.py b/tests/rptest/tests/transactions_test.py index 9778ddc76a2bf..52c1991df1ada 100644 --- a/tests/rptest/tests/transactions_test.py +++ b/tests/rptest/tests/transactions_test.py @@ -868,14 +868,6 @@ def _produce_one(producer, idx): backoff_sec=2, err_msg="Producers not evicted in time") - try: - _produce_one(producers[0], 0) - assert False, "We can not produce after cleaning in rm_stm" - except ck.cimpl.KafkaException as e: - kafka_error = e.args[0] - kafka_error.code( - ) == ck.cimpl.KafkaError.OUT_OF_ORDER_SEQUENCE_NUMBER - # validate that the producers are evicted with LRU policy, # starting from this producer there should be no sequence # number errors as those producer state should not be evicted