From 07eecc6d98d5f743680065a5d0d3464c8a32228e Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Fri, 14 Mar 2025 09:55:09 +0100
Subject: [PATCH 1/6] [KIP-848] Fix static group membership the contract is
 changed to be the same as the Java client, the member only releases
 partitions on session.timeout.ms reached. KIP-1092 to be implemented in a
 separate PR to leave permanently the group with static membership before
 session.timeout.ms is reached.

When a new member joins the group and there's
already a member with same group instance id
that hasn't left statically the incoming member
is fenced
---
 tests/0102-static_group_rebalance.c | 285 +++++++++++++++++++++++++++-
 1 file changed, 275 insertions(+), 10 deletions(-)

diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c
index 709ac77e33..0c41f35f2c 100644
--- a/tests/0102-static_group_rebalance.c
+++ b/tests/0102-static_group_rebalance.c
@@ -94,6 +94,16 @@ static int static_member_wait_rebalance0(int line,
                                   rd_kafka_err2name((C)->expected_rb_event));  \
         } while (0)
 
+#define static_member_expect_no_rebalance(C, PREV_ASSIGNED, PREV_REVOKED)      \
+        do {                                                                   \
+                if ((C)->assigned_at != (PREV_ASSIGNED))                       \
+                        TEST_FAIL("%s: unexpected assign event",               \
+                                  rd_kafka_name((C)->rk));                     \
+                if ((C)->revoked_at != (PREV_REVOKED))                         \
+                        TEST_FAIL("%s: unexpected revoke event",               \
+                                  rd_kafka_name((C)->rk));                     \
+        } while (0)
+
 #define static_member_wait_rebalance(C, START, TARGET, TIMEOUT_MS)             \
         static_member_wait_rebalance0(__LINE__, C, START, TARGET, TIMEOUT_MS)
 
@@ -118,13 +128,19 @@ static void rebalance_cb(rd_kafka_t *rk,
 
                 c->partition_cnt = parts->cnt;
                 c->assigned_at   = test_clock();
-                rd_kafka_assign(rk, parts);
+                if (test_consumer_group_protocol_classic())
+                        rd_kafka_assign(rk, parts);
+                else
+                        rd_kafka_incremental_assign(rk, parts);
 
                 break;
 
         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                 c->revoked_at = test_clock();
-                rd_kafka_assign(rk, NULL);
+                if (test_consumer_group_protocol_classic())
+                        rd_kafka_assign(rk, NULL);
+                else
+                        rd_kafka_incremental_unassign(rk, parts);
                 TEST_SAY("line %d: %s revoked %d partitions\n", c->curr_line,
                          rd_kafka_name(c->rk), parts->cnt);
 
@@ -142,8 +158,13 @@ static void rebalance_cb(rd_kafka_t *rk,
         rd_kafka_yield(rk);
 }
 
-
-static void do_test_static_group_rebalance(void) {
+/**
+ * @brief Test static group rebalance with classic group protocol.
+ *        The behaviour is the librdkafka 1.x and 2.x one.
+ *        Unsubscribe and max.poll.interval.ms cause a rebalance even
+ *        with static group membership.
+ */
+static void do_test_static_group_rebalance_classic(void) {
         rd_kafka_conf_t *conf;
         test_msgver_t mv;
         int64_t rebalance_start;
@@ -154,6 +175,7 @@ static void do_test_static_group_rebalance(void) {
             test_mk_topic_name("0102_static_group_rebalance", 1);
         char *topics = rd_strdup(tsprintf("^%s.*", topic));
         test_timing_t t_close;
+        int session_timeout_ms = 6000;
 
         SUB_TEST();
 
@@ -251,7 +273,7 @@ static void do_test_static_group_rebalance(void) {
         TIMING_STOP(&t_close);
 
         /* Should complete before `session.timeout.ms` */
-        TIMING_ASSERT(&t_close, 0, 6000);
+        TIMING_ASSERT(&t_close, 0, session_timeout_ms);
 
 
         TEST_SAY("== Testing subscription expansion ==\n");
@@ -412,6 +434,240 @@ static void do_test_static_group_rebalance(void) {
         SUB_TEST_PASS();
 }
 
+/**
+ * @brief Test static group rebalance with KIP-848. The behaviour is the same
+ *        as the Java client, the member only leaves the group when the
+ *        session times out, it doesn't leave on unsubscribe
+ *        or max.poll.interval.ms reached.
+ *        KIP-1092 will be implemented so a static member can
+ *        leave the group permanently before reaching session.timeout.ms.
+ */
+static void do_test_static_group_rebalance_consumer(void) {
+        rd_kafka_conf_t *conf;
+        test_msgver_t mv;
+        int64_t rebalance_start;
+        int i;
+        _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT;
+        const int msgcnt             = 100;
+        uint64_t testid              = test_id_generate();
+        const char *topic =
+            test_mk_topic_name("0102_static_group_rebalance", 1);
+        char *topics = rd_strdup(tsprintf("^%s.*", topic));
+        test_timing_t t_close;
+        /* FIXME: change this group `group.consumer.session.timeout.ms`
+         * in order to match the classic group configuration
+         * when the IncrementalAlterConfigs changes for 848 are merged. */
+        int session_timeout_ms               = 45000;
+        rd_ts_t prev_assigned[_CONSUMER_CNT] = RD_ZERO_INIT;
+        rd_ts_t prev_revoked[_CONSUMER_CNT]  = RD_ZERO_INIT;
+
+        SUB_TEST();
+
+        test_conf_init(&conf, NULL, 70);
+        test_msgver_init(&mv, testid);
+        c[0].mv = &mv;
+        c[1].mv = &mv;
+
+        test_create_topic_wait_exists(NULL, topic, 3, 1, 5000);
+        test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);
+
+        test_conf_set(conf, "max.poll.interval.ms", "9000");
+        test_conf_set(conf, "session.timeout.ms", "6000");
+        test_conf_set(conf, "auto.offset.reset", "earliest");
+        test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
+        test_conf_set(conf, "metadata.max.age.ms", "5000");
+        test_conf_set(conf, "enable.partition.eof", "true");
+        test_conf_set(conf, "group.instance.id", "consumer1");
+        test_conf_set(conf, "group.protocol", "consumer");
+        test_conf_set(conf, "partition.assignment.strategy",
+                      "cooperative-sticky");
+
+        rd_kafka_conf_set_opaque(conf, &c[0]);
+        c[0].rk = test_create_consumer(topic, rebalance_cb,
+                                       rd_kafka_conf_dup(conf), NULL);
+
+        rd_kafka_conf_set_opaque(conf, &c[1]);
+        test_conf_set(conf, "group.instance.id", "consumer2");
+        c[1].rk = test_create_consumer(topic, rebalance_cb,
+                                       rd_kafka_conf_dup(conf), NULL);
+        rd_kafka_conf_destroy(conf);
+
+        test_wait_topic_exists(c[1].rk, topic, 5000);
+
+        test_consumer_subscribe(c[0].rk, topics);
+        test_consumer_subscribe(c[1].rk, topics);
+
+        rebalance_start        = test_clock();
+        c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        while (!static_member_wait_rebalance(&c[0], rebalance_start,
+                                             &c[0].assigned_at, 1000)) {
+                /* keep consumer 2 alive while consumer 1 awaits
+                 * its assignment
+                 */
+                c[1].curr_line = __LINE__;
+                test_consumer_poll_once(c[1].rk, &mv, 0);
+        }
+
+        static_member_expect_rebalance(&c[1], rebalance_start,
+                                       &c[1].assigned_at, -1);
+
+        /*
+         * Consume all the messages so we can watch for duplicates
+         * after rejoin/rebalance operations.
+         */
+        c[0].curr_line = __LINE__;
+        test_consumer_poll("serve.queue", c[0].rk, testid, c[0].partition_cnt,
+                           0, -1, &mv);
+        c[1].curr_line = __LINE__;
+        test_consumer_poll("serve.queue", c[1].rk, testid, c[1].partition_cnt,
+                           0, -1, &mv);
+
+        test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
+
+        TEST_SAY("== Testing consumer restart ==\n");
+        conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk));
+
+        /* Only c[1] should exhibit rebalance behavior */
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+        TIMING_START(&t_close, "consumer restart");
+        test_consumer_close(c[1].rk);
+        rd_kafka_destroy(c[1].rk);
+
+        c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+        rd_kafka_poll_set_consumer(c[1].rk);
+
+        test_consumer_subscribe(c[1].rk, topics);
+
+        /* Await assignment */
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        rebalance_start        = test_clock();
+        while (!static_member_wait_rebalance(&c[1], rebalance_start,
+                                             &c[1].assigned_at, 1000)) {
+                c[0].curr_line = __LINE__;
+                test_consumer_poll_once(c[0].rk, &mv, 0);
+        }
+        TIMING_STOP(&t_close);
+
+        /* Should complete before `session.timeout.ms` */
+        TIMING_ASSERT(&t_close, 0, session_timeout_ms);
+
+
+        TEST_SAY("== Testing subscription expansion ==\n");
+
+        /*
+         * New topics matching the subscription pattern should cause
+         * group rebalance. Partition count >= 2 as with KIP 848
+         * only the members receiving the new partitions will have
+         * rebalance callbacks triggered.
+         */
+        test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 4, 1,
+                                      5000);
+
+
+        /* Await assignment */
+        c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        while (!static_member_wait_rebalance(&c[0], rebalance_start,
+                                             &c[0].assigned_at, 1000)) {
+                c[1].curr_line = __LINE__;
+                test_consumer_poll_once(c[1].rk, &mv, 0);
+        }
+
+        static_member_expect_rebalance(&c[1], rebalance_start,
+                                       &c[1].assigned_at, -1);
+
+        TEST_SAY("== Testing consumer unsubscribe ==\n");
+
+        /* Unsubscribe doesn't invoke a rebalance with static membership */
+
+        /* Send ConsumerGroupHeartbeat(-2), not leaving the group */
+        rebalance_start  = test_clock();
+        prev_assigned[0] = c[0].assigned_at;
+        prev_revoked[0]  = c[0].revoked_at;
+        rd_kafka_unsubscribe(c[1].rk);
+
+        /* Await revocation */
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+        static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
+                                       -1);
+
+        rd_usleep(3000 * 1000, 0);
+        static_member_expect_no_rebalance(&c[0], prev_assigned[0],
+                                          prev_revoked[0]);
+
+        test_consumer_subscribe(c[1].rk, topics);
+
+        /* Await assignment */
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        while (!static_member_wait_rebalance(&c[1], rebalance_start,
+                                             &c[1].assigned_at, 1000)) {
+                c[0].curr_line = __LINE__;
+                test_consumer_poll_once(c[0].rk, &mv, 0);
+        }
+
+        TEST_SAY("== Testing max poll violation ==\n");
+
+        /*
+         * Stop polling consumer 2 until we reach
+         * `max.poll.interval.ms` and is evicted from the group.
+         */
+        rebalance_start        = test_clock();
+        prev_assigned[0]       = c[0].assigned_at;
+        prev_revoked[0]        = c[0].revoked_at;
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+
+        for (i = 0; i < 10; i++) {
+                c[0].curr_line = __LINE__;
+                test_consumer_poll_once(c[0].rk, &mv, 0);
+                rd_usleep(1000 * 1000, 0);
+        }
+        static_member_expect_no_rebalance(&c[0], prev_assigned[0],
+                                          prev_revoked[0]);
+
+        /* consumer 2 restarts polling and re-joins the group */
+        rebalance_start = test_clock();
+        c[1].curr_line  = __LINE__;
+        test_consumer_poll_expect_err(c[1].rk, testid, 1000,
+                                      RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED);
+
+        /* Await revocation */
+        while (!static_member_wait_rebalance(&c[1], rebalance_start,
+                                             &c[1].revoked_at, 1000)) {
+                c[0].curr_line = __LINE__;
+                test_consumer_poll_once(c[0].rk, &mv, 1000);
+        }
+
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        static_member_expect_rebalance(&c[1], rebalance_start,
+                                       &c[1].assigned_at, -1);
+
+        TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n");
+
+        rebalance_start        = test_clock();
+        c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+        TIMING_START(&t_close, "consumer close");
+        test_consumer_close(c[0].rk);
+        rd_kafka_destroy(c[0].rk);
+
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+        static_member_expect_rebalance(&c[1], rebalance_start,
+                                       &c[1].assigned_at, -1);
+
+        TIMING_ASSERT(&t_close, session_timeout_ms - 4000,
+                      session_timeout_ms + 4000);
+
+        c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+        test_consumer_close(c[1].rk);
+        rd_kafka_destroy(c[1].rk);
+
+        test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0, msgcnt);
+        test_msgver_clear(&mv);
+        rd_free(topics);
+
+        SUB_TEST_PASS();
+}
+
 
 /**
  * @brief Await a non-empty assignment for all consumers in \p c
@@ -645,6 +901,9 @@ static rd_kafka_t *create_consumer(const char *bootstraps,
         test_conf_init(&conf, NULL, 0);
         test_conf_set(conf, "bootstrap.servers", bootstraps);
         test_conf_set(conf, "group.instance.id", group_instance_id);
+        test_conf_set(conf, "partition.assignment.strategy",
+                      "cooperative-sticky");
+        test_conf_set(conf, "group.protocol", "consumer");
         test_conf_set(conf, "auto.offset.reset", "earliest");
         test_conf_set(conf, "enable.partition.eof", "true");
         return test_create_consumer(group_id, NULL, conf, NULL);
@@ -780,6 +1039,15 @@ static void do_test_static_membership_mock(
                     "Expected assignment for consumer 1 after the change");
         TEST_ASSERT(next_assignment2 != NULL,
                     "Expected assignment for consumer 2 after the change");
+
+        TEST_ASSERT(next_generation_id1 == prev_generation_id1,
+                    "Expected same generation id for consumer 1, "
+                    "got %d != %d",
+                    prev_generation_id1, next_generation_id1);
+        TEST_ASSERT(next_generation_id2 == prev_generation_id2,
+                    "Expected same generation id for consumer 2, "
+                    "got %d != %d",
+                    prev_generation_id2, next_generation_id2);
         TEST_ASSERT(!test_partition_list_and_offsets_cmp(prev_assignment1,
                                                          next_assignment1),
                     "Expected same assignment for consumer 1 after the change");
@@ -801,15 +1069,12 @@ static void do_test_static_membership_mock(
 }
 
 int main_0102_static_group_rebalance(int argc, char **argv) {
-        /* TODO: check again when regexes
-         * will be supported by KIP-848 */
-        if (test_consumer_group_protocol_classic()) {
-                do_test_static_group_rebalance();
-        }
 
         if (test_consumer_group_protocol_classic()) {
+                do_test_static_group_rebalance_classic();
                 do_test_fenced_member_classic();
         } else {
+                do_test_static_group_rebalance_consumer();
                 do_test_fenced_member_consumer();
         }
 

From 5ea56bec5f288f597c5b0a145e5a72360bcee05b Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Tue, 24 Jun 2025 19:00:54 +0200
Subject: [PATCH 2/6] Default broker consumer group properties that are good
 for testing

---
 .semaphore/semaphore.yml     | 1 +
 tests/scenarios/default.json | 4 ++++
 2 files changed, 5 insertions(+)

diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml
index cd6c173944..4a36112964 100644
--- a/.semaphore/semaphore.yml
+++ b/.semaphore/semaphore.yml
@@ -167,6 +167,7 @@ blocks:
             - export TEST_CONSUMER_GROUP_PROTOCOL=consumer
             - (cd tests && python3 -m trivup.clusters.KafkaCluster --kraft --version '4.0.0'
               --cpversion 7.9.0
+              --conf '["group.consumer.min.session.timeout.ms=1", "group.consumer.min.heartbeat.interval.ms=1"]'
               --cmd 'make quick')
 
 
diff --git a/tests/scenarios/default.json b/tests/scenarios/default.json
index 92287a7632..4ad094baf2 100644
--- a/tests/scenarios/default.json
+++ b/tests/scenarios/default.json
@@ -2,4 +2,8 @@
     "auto_create_topics": "true",
     "num_partitions": 4,
     "replication_factor": 3,
+    "conf": [
+        "group.consumer.min.session.timeout.ms=1",
+        "group.consumer.min.heartbeat.interval.ms=1"
+    ]
 }

From 3abd3c0715294d6bea7dcbaae2493880048430e4 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Tue, 24 Jun 2025 19:02:13 +0200
Subject: [PATCH 3/6] Test helper functions to set broker configuration
 properties and in particular `consumer.session.timeout.ms` and
 `consumer.heartbeat.interval.ms`

---
 tests/test.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++
 tests/test.h | 13 +++++++++++++
 2 files changed, 60 insertions(+)

diff --git a/tests/test.c b/tests/test.c
index 4dbef9d16e..1822b886bc 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -4763,6 +4763,53 @@ void test_any_conf_set(rd_kafka_conf_t *conf,
                           val, errstr);
 }
 
+/**
+ * @brief Set broker configuration property \p name to the given value \p val
+ *        for the resource type \p restype and resource name \p resname.
+ */
+void test_broker_conf_set(rd_kafka_ResourceType_t restype,
+                          const char *resname,
+                          const char *name,
+                          const char *val) {
+        rd_kafka_t *rk;
+        const char *conf_set[] = {name, "SET", val};
+
+        rk = test_create_producer();
+        TEST_CALL_ERR__(test_IncrementalAlterConfigs_simple(
+            rk, restype, resname, conf_set, 1));
+        rd_kafka_destroy(rk);
+}
+
+/**
+ * @brief Set broker group configuration property 'consumer.session.timeout.ms'.
+ *        to the given value \p session_timeout_ms.
+ *
+ * @param group_id Group id to set the configuration for.
+ * @param session_timeout_ms Session timeout in milliseconds.
+ */
+void test_broker_conf_set_group_consumer_session_timeout_ms(
+    const char *group_id,
+    int session_timeout_ms) {
+        test_broker_conf_set(RD_KAFKA_RESOURCE_GROUP, group_id,
+                             "consumer.session.timeout.ms",
+                             tsprintf("%d", session_timeout_ms));
+}
+
+/**
+ * @brief Set broker group configuration property
+ * 'consumer.heartbeat.interval.ms'. to the given value \p
+ * heartbeat_interval_ms.
+ *
+ * @param group_id Group id to set the configuration for.
+ * @param heartbeat_interval_ms Heartbeat interval in milliseconds.
+ */
+void test_broker_conf_set_group_consumer_heartbeat_interval_ms(
+    const char *group_id,
+    int heartbeat_interval_ms) {
+        test_broker_conf_set(RD_KAFKA_RESOURCE_GROUP, group_id,
+                             "consumer.heartbeat.interval.ms",
+                             tsprintf("%d", heartbeat_interval_ms));
+}
 
 /**
  * @returns true if test clients need to be configured for authentication
diff --git a/tests/test.h b/tests/test.h
index a3d36db3c9..ab346f4d2f 100644
--- a/tests/test.h
+++ b/tests/test.h
@@ -721,6 +721,19 @@ void test_any_conf_set(rd_kafka_conf_t *conf,
                        const char *name,
                        const char *val);
 
+void test_broker_conf_set(rd_kafka_ResourceType_t restype,
+                          const char *resname,
+                          const char *name,
+                          const char *val);
+
+void test_broker_conf_set_group_consumer_session_timeout_ms(
+    const char *group_id,
+    int timeout_ms);
+
+void test_broker_conf_set_group_consumer_heartbeat_interval_ms(
+    const char *group_id,
+    int timeout_ms);
+
 rd_kafka_topic_partition_list_t *test_topic_partitions(int cnt, ...);
 void test_print_partition_list(
     const rd_kafka_topic_partition_list_t *partitions);

From 1306a93f0ae4210b64aab3d4de27ad1b359ac5c4 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Tue, 24 Jun 2025 19:03:21 +0200
Subject: [PATCH 4/6] Complete test `do_test_static_group_rebalance_consumer`
 by setting a shorter session timeout on the broker

---
 tests/0102-static_group_rebalance.c | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)

diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c
index 0c41f35f2c..315fbaecb7 100644
--- a/tests/0102-static_group_rebalance.c
+++ b/tests/0102-static_group_rebalance.c
@@ -452,12 +452,14 @@ static void do_test_static_group_rebalance_consumer(void) {
         uint64_t testid              = test_id_generate();
         const char *topic =
             test_mk_topic_name("0102_static_group_rebalance", 1);
-        char *topics = rd_strdup(tsprintf("^%s.*", topic));
+        char *topics         = rd_strdup(tsprintf("^%s.*", topic));
+        const char *group_id = topic;
         test_timing_t t_close;
-        /* FIXME: change this group `group.consumer.session.timeout.ms`
-         * in order to match the classic group configuration
-         * when the IncrementalAlterConfigs changes for 848 are merged. */
-        int session_timeout_ms               = 45000;
+
+        int session_timeout_ms = 6000;
+        test_broker_conf_set_group_consumer_session_timeout_ms(
+            group_id, session_timeout_ms);
+
         rd_ts_t prev_assigned[_CONSUMER_CNT] = RD_ZERO_INIT;
         rd_ts_t prev_revoked[_CONSUMER_CNT]  = RD_ZERO_INIT;
 
@@ -472,15 +474,11 @@ static void do_test_static_group_rebalance_consumer(void) {
         test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);
 
         test_conf_set(conf, "max.poll.interval.ms", "9000");
-        test_conf_set(conf, "session.timeout.ms", "6000");
         test_conf_set(conf, "auto.offset.reset", "earliest");
         test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
         test_conf_set(conf, "metadata.max.age.ms", "5000");
         test_conf_set(conf, "enable.partition.eof", "true");
         test_conf_set(conf, "group.instance.id", "consumer1");
-        test_conf_set(conf, "group.protocol", "consumer");
-        test_conf_set(conf, "partition.assignment.strategy",
-                      "cooperative-sticky");
 
         rd_kafka_conf_set_opaque(conf, &c[0]);
         c[0].rk = test_create_consumer(topic, rebalance_cb,
@@ -488,7 +486,7 @@ static void do_test_static_group_rebalance_consumer(void) {
 
         rd_kafka_conf_set_opaque(conf, &c[1]);
         test_conf_set(conf, "group.instance.id", "consumer2");
-        c[1].rk = test_create_consumer(topic, rebalance_cb,
+        c[1].rk = test_create_consumer(group_id, rebalance_cb,
                                        rd_kafka_conf_dup(conf), NULL);
         rd_kafka_conf_destroy(conf);
 

From ef7fd386264c91a955b9b87d47164454aa9d87f9 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Wed, 25 Jun 2025 15:30:35 +0200
Subject: [PATCH 5/6] Change references to `session.timeout.ms`

---
 tests/0102-static_group_rebalance.c | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c
index 315fbaecb7..10ce4f0de6 100644
--- a/tests/0102-static_group_rebalance.c
+++ b/tests/0102-static_group_rebalance.c
@@ -440,7 +440,8 @@ static void do_test_static_group_rebalance_classic(void) {
  *        session times out, it doesn't leave on unsubscribe
  *        or max.poll.interval.ms reached.
  *        KIP-1092 will be implemented so a static member can
- *        leave the group permanently before reaching session.timeout.ms.
+ *        leave the group permanently before reaching
+ *        `consumer.session.timeout.ms`.
  */
 static void do_test_static_group_rebalance_consumer(void) {
         rd_kafka_conf_t *conf;
@@ -547,8 +548,8 @@ static void do_test_static_group_rebalance_consumer(void) {
         }
         TIMING_STOP(&t_close);
 
-        /* Should complete before `session.timeout.ms` */
-        TIMING_ASSERT(&t_close, 0, session_timeout_ms);
+        /* Should complete before 45s (default close timeout) */
+        TIMING_ASSERT(&t_close, 0, 45000);
 
 
         TEST_SAY("== Testing subscription expansion ==\n");
@@ -640,7 +641,7 @@ static void do_test_static_group_rebalance_consumer(void) {
         static_member_expect_rebalance(&c[1], rebalance_start,
                                        &c[1].assigned_at, -1);
 
-        TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n");
+        TEST_SAY("== Testing `consumer.session.timeout.ms` member eviction ==\n");
 
         rebalance_start        = test_clock();
         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;

From 1091c2e2bd0ad02a2e5abb7c06d9611460b9b234 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Wed, 25 Jun 2025 16:42:38 +0200
Subject: [PATCH 6/6] Add member epoch checks

---
 tests/0102-static_group_rebalance.c | 66 ++++++++++++++++++++++-------
 1 file changed, 51 insertions(+), 15 deletions(-)

diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c
index 10ce4f0de6..1854d5c9ee 100644
--- a/tests/0102-static_group_rebalance.c
+++ b/tests/0102-static_group_rebalance.c
@@ -107,6 +107,19 @@ static int static_member_wait_rebalance0(int line,
 #define static_member_wait_rebalance(C, START, TARGET, TIMEOUT_MS)             \
         static_member_wait_rebalance0(__LINE__, C, START, TARGET, TIMEOUT_MS)
 
+/**
+ * @brief Get generation id of consumer \p consumer .
+ */
+static int32_t consumer_generation_id(rd_kafka_t *consumer) {
+        rd_kafka_consumer_group_metadata_t *group_metadata;
+        int32_t generation_id;
+
+        group_metadata = rd_kafka_consumer_group_metadata(consumer);
+        generation_id =
+            rd_kafka_consumer_group_metadata_generation_id(group_metadata);
+        rd_kafka_consumer_group_metadata_destroy(group_metadata);
+        return generation_id;
+}
 
 static void rebalance_cb(rd_kafka_t *rk,
                          rd_kafka_resp_err_t err,
@@ -456,10 +469,13 @@ static void do_test_static_group_rebalance_consumer(void) {
         char *topics         = rd_strdup(tsprintf("^%s.*", topic));
         const char *group_id = topic;
         test_timing_t t_close;
+        int32_t prev_member_epoch[_CONSUMER_CNT], member_epoch[_CONSUMER_CNT];
 
         int session_timeout_ms = 6000;
         test_broker_conf_set_group_consumer_session_timeout_ms(
             group_id, session_timeout_ms);
+        test_broker_conf_set_group_consumer_heartbeat_interval_ms(
+            group_id, 1000 /* 1s heartbeat interval */);
 
         rd_ts_t prev_assigned[_CONSUMER_CNT] = RD_ZERO_INIT;
         rd_ts_t prev_revoked[_CONSUMER_CNT]  = RD_ZERO_INIT;
@@ -560,6 +576,7 @@ static void do_test_static_group_rebalance_consumer(void) {
          * only the members receiving the new partitions will have
          * rebalance callbacks triggered.
          */
+        rebalance_start = test_clock();
         test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 4, 1,
                                       5000);
 
@@ -584,6 +601,10 @@ static void do_test_static_group_rebalance_consumer(void) {
         rebalance_start  = test_clock();
         prev_assigned[0] = c[0].assigned_at;
         prev_revoked[0]  = c[0].revoked_at;
+
+        prev_member_epoch[0] = consumer_generation_id(c[0].rk);
+        prev_member_epoch[1] = consumer_generation_id(c[1].rk);
+
         rd_kafka_unsubscribe(c[1].rk);
 
         /* Await revocation */
@@ -605,6 +626,20 @@ static void do_test_static_group_rebalance_consumer(void) {
                 test_consumer_poll_once(c[0].rk, &mv, 0);
         }
 
+        member_epoch[0] = consumer_generation_id(c[0].rk);
+        member_epoch[1] = consumer_generation_id(c[1].rk);
+
+        TEST_ASSERT(prev_member_epoch[0] == member_epoch[0],
+                    "c[0] should have the same member epoch "
+                    "as before the unsubscribe. "
+                    "expected %" PRId32 ", got %" PRId32,
+                    prev_member_epoch[0], member_epoch[0]);
+        TEST_ASSERT(prev_member_epoch[1] == member_epoch[1],
+                    "c[1] should have the same member epoch "
+                    "as before the unsubscribe. "
+                    "expected %" PRId32 ", got %" PRId32,
+                    prev_member_epoch[1], member_epoch[1]);
+
         TEST_SAY("== Testing max poll violation ==\n");
 
         /*
@@ -641,7 +676,22 @@ static void do_test_static_group_rebalance_consumer(void) {
         static_member_expect_rebalance(&c[1], rebalance_start,
                                        &c[1].assigned_at, -1);
 
-        TEST_SAY("== Testing `consumer.session.timeout.ms` member eviction ==\n");
+        member_epoch[0] = consumer_generation_id(c[0].rk);
+        member_epoch[1] = consumer_generation_id(c[1].rk);
+
+        TEST_ASSERT(prev_member_epoch[0] == member_epoch[0],
+                    "c[0] should have the same member epoch "
+                    "as before reaching `max.poll.interval.ms`. "
+                    "expected %" PRId32 ", got %" PRId32,
+                    prev_member_epoch[0], member_epoch[0]);
+        TEST_ASSERT(prev_member_epoch[1] == member_epoch[1],
+                    "c[1] should have the same member epoch "
+                    "as before reaching `max.poll.interval.ms`. "
+                    "expected %" PRId32 ", got %" PRId32,
+                    prev_member_epoch[1], member_epoch[1]);
+
+        TEST_SAY(
+            "== Testing `consumer.session.timeout.ms` member eviction ==\n");
 
         rebalance_start        = test_clock();
         c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
@@ -908,20 +958,6 @@ static rd_kafka_t *create_consumer(const char *bootstraps,
         return test_create_consumer(group_id, NULL, conf, NULL);
 }
 
-/**
- * @brief Get generation id of consumer \p consumer .
- */
-static int32_t consumer_generation_id(rd_kafka_t *consumer) {
-        rd_kafka_consumer_group_metadata_t *group_metadata;
-        int32_t generation_id;
-
-        group_metadata = rd_kafka_consumer_group_metadata(consumer);
-        generation_id =
-            rd_kafka_consumer_group_metadata_generation_id(group_metadata);
-        rd_kafka_consumer_group_metadata_destroy(group_metadata);
-        return generation_id;
-}
-
 /**
  * @brief Check if the API key in \p request is the same as that
  *        pointed by \p opaque .