From d656a807b6b031039580ccae61f3d34fb386916d Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Wed, 9 Oct 2024 11:11:40 +0100 Subject: [PATCH 1/2] kafka: expand members updated by group::update_member Up to now, group::update_member function would only update the protocols of the group_member. The fields updated now include the client id and host fields and the session and rebalance timeout durations. (cherry picked from commit e1bc0df34cb274320dea6923ee8780bd8f39f588) --- src/v/kafka/server/group.cc | 73 ++++++++++++++++++++++---- src/v/kafka/server/group.h | 15 +++++- src/v/kafka/server/member.h | 28 ++++++++++ src/v/kafka/server/tests/group_test.cc | 71 +++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 11 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 5abcd5952150a..5c80e57c29997 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -296,13 +296,20 @@ ss::future group::add_member(member_ptr member) { } void group::update_member_no_join( - member_ptr member, chunked_vector&& new_protocols) { + member_ptr member, + chunked_vector&& new_protocols, + const std::optional& new_client_id, + const kafka::client_host& new_client_host, + std::chrono::milliseconds new_session_timeout, + std::chrono::milliseconds new_rebalance_timeout) { vlog( _ctxlog.trace, - "Updating {}joining member {} with protocols {}", + "Updating {}joining member {} with protocols {} and timeouts {}/{}", member->is_joining() ? "" : "non-", member, - new_protocols); + new_protocols, + new_session_timeout, + new_rebalance_timeout); /* * before updating the member, subtract its existing protocols from @@ -322,11 +329,29 @@ void group::update_member_no_join( for (auto& p : member->protocols()) { _supported_protocols[p.name]++; } + + if (new_client_id) { + member->replace_client_id(*new_client_id); + } + member->replace_client_host(new_client_host); + member->replace_session_timeout(new_session_timeout); + member->replace_rebalance_timeout(new_rebalance_timeout); } ss::future group::update_member( - member_ptr member, chunked_vector&& new_protocols) { - update_member_no_join(member, std::move(new_protocols)); + member_ptr member, + chunked_vector&& new_protocols, + const std::optional& new_client_id, + const kafka::client_host& new_client_host, + std::chrono::milliseconds new_session_timeout, + std::chrono::milliseconds new_rebalance_timeout) { + update_member_no_join( + member, + std::move(new_protocols), + new_client_id, + new_client_host, + new_session_timeout, + new_rebalance_timeout); if (!member->is_joining()) { _num_members_joining++; @@ -632,7 +657,23 @@ group::join_group_stages group::update_static_member_and_rebalance( * with new member id. */ schedule_next_heartbeat_expiration(member); - auto f = update_member(member, r.native_member_protocols()); + + kafka::client_id old_client_id = member->client_id(); + kafka::client_host old_client_host = member->client_host(); + auto old_session_timeout + = std::chrono::duration_cast( + member->session_timeout()); + auto old_rebalance_timeout + = std::chrono::duration_cast( + member->rebalance_timeout()); + + auto f = update_member( + member, + r.native_member_protocols(), + r.client_id, + r.client_host, + r.data.session_timeout_ms, + r.data.rebalance_timeout_ms); auto old_protocols = _members.at(new_member_id)->protocols().copy(); switch (state()) { case group_state::stable: { @@ -651,7 +692,11 @@ group::join_group_stages group::update_static_member_and_rebalance( instance_id = *r.data.group_instance_id, new_member_id = std::move(new_member_id), old_member_id = std::move(old_member_id), - old_protocols = std::move(old_protocols)]( + old_protocols = std::move(old_protocols), + old_client_id = std::move(old_client_id), + old_client_host = std::move(old_client_host), + old_session_timeout = old_session_timeout, + old_rebalance_timeout = old_rebalance_timeout]( result result) mutable { if (!result) { vlog( @@ -664,7 +709,12 @@ group::join_group_stages group::update_static_member_and_rebalance( auto member = replace_static_member( instance_id, new_member_id, old_member_id); update_member_no_join( - member, std::move(old_protocols)); + member, + std::move(old_protocols), + old_client_id, + old_client_host, + old_session_timeout, + old_rebalance_timeout); schedule_next_heartbeat_expiration(member); try_finish_joining_member( member, @@ -972,7 +1022,12 @@ group::join_group_stages group::add_member_and_rebalance( group::join_group_stages group::update_member_and_rebalance(member_ptr member, join_group_request&& r) { auto response = update_member( - std::move(member), r.native_member_protocols()); + std::move(member), + r.native_member_protocols(), + r.client_id, + r.client_host, + r.data.session_timeout_ms, + r.data.rebalance_timeout_ms); try_prepare_rebalance(); return join_group_stages(std::move(response)); } diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 031c7564cd0e4..e0b9a2e0036f8 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -351,13 +351,24 @@ class group final : public ss::enable_lw_shared_from_this { * \returns join response promise set at the end of the join phase. */ ss::future update_member( - member_ptr member, chunked_vector&& new_protocols); + member_ptr member, + chunked_vector&& new_protocols, + const std::optional& new_client_id, + const kafka::client_host& new_client_host, + std::chrono::milliseconds new_session_timeout, + std::chrono::milliseconds new_rebalance_timeout); + /** * Same as update_member but without returning the join promise. Used when * reverting member state after failed group checkpoint */ void update_member_no_join( - member_ptr member, chunked_vector&& new_protocols); + member_ptr member, + chunked_vector&& new_protocols, + const std::optional& new_client_id, + const kafka::client_host& new_client_host, + std::chrono::milliseconds new_session_timeout, + std::chrono::milliseconds new_rebalance_timeout); /** * \brief Get the timeout duration for rebalancing. diff --git a/src/v/kafka/server/member.h b/src/v/kafka/server/member.h index cf5a24f188547..d5783a887c39d 100644 --- a/src/v/kafka/server/member.h +++ b/src/v/kafka/server/member.h @@ -86,6 +86,22 @@ class group_member { void replace_id(member_id new_id) { _state.id = std::move(new_id); } + /// Get the member's client_id. + const kafka::client_id& client_id() const { return _state.client_id; } + + /// Replace the member's client_id. + void replace_client_id(kafka::client_id new_client_id) { + _state.client_id = std::move(new_client_id); + } + + /// Get the member's client_host. + const kafka::client_host& client_host() const { return _state.client_host; } + + /// Replace the member's client_host. + void replace_client_host(kafka::client_host new_client_host) { + _state.client_host = std::move(new_client_host); + } + /// Get the id of the member's group. const kafka::group_id& group_id() const { return _group_id; } @@ -97,9 +113,21 @@ class group_member { /// Get the member's session timeout. duration_type session_timeout() const { return _state.session_timeout; } + /// Replace the member's session timeout. + void + replace_session_timeout(std::chrono::milliseconds new_session_timeout) { + _state.session_timeout = new_session_timeout; + } + /// Get the member's rebalance timeout. duration_type rebalance_timeout() const { return _state.rebalance_timeout; } + /// Replace the member's rebalance timeout. + void + replace_rebalance_timeout(std::chrono::milliseconds new_rebalance_timeout) { + _state.rebalance_timeout = new_rebalance_timeout; + } + /// Get the member's protocol type. const kafka::protocol_type& protocol_type() const { return _protocol_type; } diff --git a/src/v/kafka/server/tests/group_test.cc b/src/v/kafka/server/tests/group_test.cc index 32d5b775e6eb7..7795cbcba4f87 100644 --- a/src/v/kafka/server/tests/group_test.cc +++ b/src/v/kafka/server/tests/group_test.cc @@ -10,6 +10,7 @@ #include "cluster/partition.h" #include "config/configuration.h" #include "container/fragmented_vector.h" +#include "kafka/protocol/types.h" #include "kafka/server/group.h" #include "kafka/server/group_metadata.h" #include "utils/to_string.h" @@ -21,6 +22,8 @@ #include #include +#include + using namespace std::chrono_literals; namespace kafka { @@ -495,4 +498,72 @@ SEASTAR_THREAD_TEST_CASE(group_state_output) { BOOST_TEST(s == "PreparingRebalance"); } +SEASTAR_THREAD_TEST_CASE(add_new_static_member) { + auto g = get(); + const kafka::group_id common_group_id = g.id(); + const kafka::group_instance_id common_instance_id{"0-0"}; + + const kafka::member_id m1_id{"m1"}; + const kafka::client_id m1_client_id{"client-id-1"}; + const kafka::client_host m1_client_host{"client-host-1"}; + const std::chrono::milliseconds m1_session_timeout{30001}; + const std::chrono::milliseconds m1_rebalance_timeout{45001}; + + // Create request for first member + join_group_request r1; + r1.client_id = m1_client_id; + r1.client_host = m1_client_host; + r1.data.group_id = common_group_id; + r1.data.group_instance_id = common_instance_id; + r1.data.session_timeout_ms = m1_session_timeout; + r1.data.rebalance_timeout_ms = m1_rebalance_timeout; + + // adding first static member will call "add_member_and_rebalance" + g.add_new_static_member(m1_id, std::move(r1)); + + // validate group state + BOOST_TEST(g.contains_member(m1_id)); + + // validate new member + const auto m1 = g.get_member(m1_id); + BOOST_TEST(m1->group_id() == common_group_id); + BOOST_TEST(m1->id() == m1_id); + BOOST_TEST(m1->client_id() == m1_client_id); + BOOST_TEST(m1->client_host() == m1_client_host); + BOOST_TEST(m1->session_timeout() == m1_session_timeout); + BOOST_TEST(m1->rebalance_timeout() == m1_rebalance_timeout); + + const kafka::member_id m2_id{"m2"}; + const kafka::client_id m2_client_id{"client-id-2"}; + const kafka::client_host m2_client_host{"client-host-2"}; + const std::chrono::milliseconds m2_session_timeout{30002}; + const std::chrono::milliseconds m2_rebalance_timeout{45002}; + + // Create request for second member to update m1 + join_group_request r2; + r2.client_id = m2_client_id; + r2.client_host = m2_client_host; + r2.data.group_id = common_group_id; + r2.data.group_instance_id = common_instance_id; + r2.data.session_timeout_ms = m2_session_timeout; + r2.data.rebalance_timeout_ms = m2_rebalance_timeout; + + // adding second static member will call + // "update_static_member_and_rebalance" + g.add_new_static_member(m2_id, std::move(r2)); + + // validate group state + BOOST_TEST(!g.contains_member(m1_id)); + BOOST_TEST(g.contains_member(m2_id)); + + // validate updated member + const auto m2 = g.get_member(m2_id); + BOOST_TEST(m2->group_id() == common_group_id); + BOOST_TEST(m2->id() == m2_id); + BOOST_TEST(m2->client_id() == m2_client_id); + BOOST_TEST(m2->client_host() == m2_client_host); + BOOST_TEST(m2->session_timeout() == m2_session_timeout); + BOOST_TEST(m2->rebalance_timeout() == m2_rebalance_timeout); +} + } // namespace kafka From 44513eeb5269b06e0be712522003c57d15b0aac5 Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Wed, 9 Oct 2024 11:26:55 +0100 Subject: [PATCH 2/2] kafka: Add ducktape test for updating a group static member (cherry picked from commit 72365bb767c910e22a4183657fb11f8a32fc2e64) --- tests/rptest/tests/consumer_group_test.py | 83 +++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index c3e3c93762052..dcd8ee1d08e61 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -564,6 +564,89 @@ async def create_groups(r): assert len(list) == groups_in_round * rounds + @cluster(num_nodes=5) + def test_consumer_static_member_update(self): + """ + Test validating that re-joining static member will update the client id + """ + self.create_topic(20) + + group = 'test-gr-1' + + rpk = RpkTool(self.redpanda) + + # create and start first consumer + consumer1 = self.create_consumer( + topic=self.topic_spec.name, + group=group, + instance_name="static-consumer", + instance_id="panda-instance", + consumer_properties={"client.id": "my-client-1"}) + + consumer1.start() + + self.wait_for_members(group, 1) + + # wait for some messages + self.start_producer() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least([consumer1], 50), + timeout_sec=30, + backoff_sec=2, + err_msg="consumer1 did not consume messages") + + # validate initial state + rpk_group_1 = rpk.group_describe(group) + + assert rpk_group_1.state == "Stable", f"Describe: {rpk_group_1}" + assert rpk_group_1.members == 1, f"Describe: {rpk_group_1}" + for p in rpk_group_1.partitions: + assert p.client_id == 'my-client-1', f"Describe: {p}" + + # clean up + self.producer.wait() + self.producer.free() + + consumer1.stop() + consumer1.wait() + consumer1.free() + + # create and start consumer with same instance_id but different cliend_id + consumer2 = self.create_consumer( + topic=self.topic_spec.name, + group=group, + instance_name="static-consumer", + instance_id="panda-instance", + consumer_properties={"client.id": "my-client-2"}) + + consumer2.start() + + self.wait_for_members(group, 1) + + # wait for some messages + self.start_producer() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least([consumer2], 50), + timeout_sec=30, + backoff_sec=2, + err_msg="consumer2 did not consume messages") + + # validate updated state + rpk_group_2 = rpk.group_describe(group) + + assert rpk_group_2.state == "Stable", f"Describe: {rpk_group_2}" + assert rpk_group_2.members == 1, f"Describe: {rpk_group_2}" + for p in rpk_group_2.partitions: + assert p.client_id == 'my-client-2', f"Describe: {p}" + + # clean up + consumer2.stop() + consumer2.wait() + consumer2.free() + + self.producer.wait() + self.producer.free() + @dataclass class OffsetAndMetadata():