diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index 89f280f0e4..35429c0929 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -3036,7 +3036,8 @@ static int ut_cgrp_consumer_member_next_assignment0( fixtures[i].comment); if (fixtures[i].session_timed_out) { - rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member); + rd_kafka_mock_cgrp_consumer_member_leave(mcgrp, member, + rd_false); member = rd_kafka_mock_cgrp_consumer_member_add( mcgrp, conn, &MemberId, &InstanceId, &SubscribedTopic, 1); diff --git a/src/rdkafka_mock_cgrp.c b/src/rdkafka_mock_cgrp.c index c45344bc58..cac2f5c2ed 100644 --- a/src/rdkafka_mock_cgrp.c +++ b/src/rdkafka_mock_cgrp.c @@ -1462,18 +1462,26 @@ rd_kafka_mock_cgrp_consumer_member_add(rd_kafka_mock_cgrp_consumer_t *mcgrp, /* Find member */ member = rd_kafka_mock_cgrp_consumer_member_find(mcgrp, MemberId); if (!member) { + if (RD_KAFKAP_STR_LEN(MemberId) == 0) + /* KIP 1082: MemberId is generated by the client */ + return NULL; + member = rd_kafka_mock_cgrp_consumer_member_find_by_instance_id( mcgrp, InstanceId); - if (member && RD_KAFKAP_STR_LEN(MemberId) > 0 && + + if (member && rd_kafkap_str_cmp_str(MemberId, member->id) != 0) { - /* Either member is a new instance and is rejoining - * with same InstanceId, so MemberId is NULL, - * or it's rejoining after unsubscribing, - * then it must have the same MemberId as before, - * as it lasts for member lifetime. - * It both don't hold, we cannot add the member - * to the group. */ - return NULL; + /* Member is a new instance and is rejoining + * with a new MemberId. */ + + if (!member->left_static_membership) + /* Old member still active, + * fence this one */ + return NULL; + + RD_IF_FREE(member->id, rd_free); + member->id = RD_KAFKAP_STR_DUP(MemberId); + member->left_static_membership = rd_false; } } @@ -1555,29 +1563,39 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy( rd_free(member); } +static void rd_kafka_mock_cgrp_consumer_member_leave_static( + rd_kafka_mock_cgrp_consumer_member_t *member) { + member->left_static_membership = rd_true; + rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(member, + NULL); +} + /** * @brief Called when a member must leave a consumer group. * * @param mcgrp Consumer group to leave. * @param member Member that leaves. - * @param is_static If true, the member is leaving with static group membership. + * @param leave_static If true, the member is leaving with static group + * membership. * * @locks mcluster->lock MUST be held. */ void rd_kafka_mock_cgrp_consumer_member_leave( rd_kafka_mock_cgrp_consumer_t *mcgrp, - rd_kafka_mock_cgrp_consumer_member_t *member) { + rd_kafka_mock_cgrp_consumer_member_t *member, + rd_bool_t leave_static) { rd_bool_t is_static = member->instance_id != NULL; rd_kafka_dbg(mcgrp->cluster->rk, MOCK, "MOCK", - "Member %s is leaving group %s, is static: %s", member->id, - mcgrp->id, RD_STR_ToF(is_static)); - if (!is_static) + "Member %s is leaving group %s, is static: %s, " + "static leave: %s", + member->id, mcgrp->id, RD_STR_ToF(is_static), + RD_STR_ToF(leave_static)); + if (!is_static || !leave_static) rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member); else - rd_kafka_mock_cgrp_consumer_member_returned_assignment_set( - member, NULL); + rd_kafka_mock_cgrp_consumer_member_leave_static(member); } /** diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 23431116e4..1eac17f278 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -2884,7 +2884,7 @@ rd_kafka_mock_handle_ConsumerGroupHeartbeat(rd_kafka_mock_connection_t *mconn, } } else { rd_kafka_mock_cgrp_consumer_member_leave( - mcgrp, member); + mcgrp, member, MemberEpoch == -2); member = NULL; } } else { diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index b1f82d2737..e81d2c791e 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -166,6 +166,9 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s { rd_list_t *subscribed_topics; /**< Subscribed topics */ + rd_bool_t left_static_membership; /**< Member has left the group + * with static membership. */ + struct rd_kafka_mock_connection_s *conn; /**< Connection, may be NULL * if there is no ongoing * request. */ @@ -671,7 +674,8 @@ rd_kafka_mock_cgrp_consumer_get(rd_kafka_mock_cluster_t *mcluster, void rd_kafka_mock_cgrp_consumer_member_leave( rd_kafka_mock_cgrp_consumer_t *mcgrp, - rd_kafka_mock_cgrp_consumer_member_t *member); + rd_kafka_mock_cgrp_consumer_member_t *member, + rd_bool_t static_leave); void rd_kafka_mock_cgrp_consumer_member_fenced( rd_kafka_mock_cgrp_consumer_t *mcgrp, diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c index 3e84c865eb..3a922f2db9 100644 --- a/tests/0102-static_group_rebalance.c +++ b/tests/0102-static_group_rebalance.c @@ -93,6 +93,16 @@ static int static_member_wait_rebalance0(int line, rd_kafka_err2name((C)->expected_rb_event)); \ } while (0) +#define static_member_expect_no_rebalance(C, PREV_ASSIGNED, PREV_REVOKED) \ + do { \ + if ((C)->assigned_at != (PREV_ASSIGNED)) \ + TEST_FAIL("%s: unexpected assign event", \ + rd_kafka_name((C)->rk)); \ + if ((C)->revoked_at != (PREV_REVOKED)) \ + TEST_FAIL("%s: unexpected revoke event", \ + rd_kafka_name((C)->rk)); \ + } while (0) + #define static_member_wait_rebalance(C, START, TARGET, TIMEOUT_MS) \ static_member_wait_rebalance0(__LINE__, C, START, TARGET, TIMEOUT_MS) @@ -117,13 +127,19 @@ static void rebalance_cb(rd_kafka_t *rk, c->partition_cnt = parts->cnt; c->assigned_at = test_clock(); - rd_kafka_assign(rk, parts); + if (test_consumer_group_protocol_classic()) + rd_kafka_assign(rk, parts); + else + rd_kafka_incremental_assign(rk, parts); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: c->revoked_at = test_clock(); - rd_kafka_assign(rk, NULL); + if (test_consumer_group_protocol_classic()) + rd_kafka_assign(rk, NULL); + else + rd_kafka_incremental_unassign(rk, parts); TEST_SAY("line %d: %s revoked %d partitions\n", c->curr_line, rd_kafka_name(c->rk), parts->cnt); @@ -141,8 +157,13 @@ static void rebalance_cb(rd_kafka_t *rk, rd_kafka_yield(rk); } - -static void do_test_static_group_rebalance(void) { +/** + * @brief Test static group rebalance with classic group protocol. + * The behaviour is the librdkafka 1.x and 2.x one. + * Unsubscribe and max.poll.interval.ms cause a rebalance even + * with static group membership. + */ +static void do_test_static_group_rebalance_classic(void) { rd_kafka_conf_t *conf; test_msgver_t mv; int64_t rebalance_start; @@ -153,6 +174,7 @@ static void do_test_static_group_rebalance(void) { test_mk_topic_name("0102_static_group_rebalance", 1); char *topics = rd_strdup(tsprintf("^%s.*", topic)); test_timing_t t_close; + int session_timeout_ms = 6000; SUB_TEST(); @@ -247,7 +269,7 @@ static void do_test_static_group_rebalance(void) { TIMING_STOP(&t_close); /* Should complete before `session.timeout.ms` */ - TIMING_ASSERT(&t_close, 0, 6000); + TIMING_ASSERT(&t_close, 0, session_timeout_ms); TEST_SAY("== Testing subscription expansion ==\n"); @@ -399,6 +421,240 @@ static void do_test_static_group_rebalance(void) { SUB_TEST_PASS(); } +/** + * @brief Test static group rebalance with KIP-848. The behaviour is the same + * as the Java client, the member only leaves the group when the + * session times out, it doesn't leave on unsubscribe + * or max.poll.interval.ms reached. + * KIP-1092 will be implemented so a static member can + * leave the group permanently before reaching session.timeout.ms. + */ +static void do_test_static_group_rebalance_consumer(void) { + rd_kafka_conf_t *conf; + test_msgver_t mv; + int64_t rebalance_start; + int i; + _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT; + const int msgcnt = 100; + uint64_t testid = test_id_generate(); + const char *topic = + test_mk_topic_name("0102_static_group_rebalance", 1); + char *topics = rd_strdup(tsprintf("^%s.*", topic)); + test_timing_t t_close; + /* FIXME: change this group `group.consumer.session.timeout.ms` + * in order to match the classic group configuration + * when the IncrementalAlterConfigs changes for 848 are merged. */ + int session_timeout_ms = 45000; + rd_ts_t prev_assigned[_CONSUMER_CNT] = RD_ZERO_INIT; + rd_ts_t prev_revoked[_CONSUMER_CNT] = RD_ZERO_INIT; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 70); + test_msgver_init(&mv, testid); + c[0].mv = &mv; + c[1].mv = &mv; + + test_create_topic_wait_exists(NULL, topic, 3, 1, 5000); + test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); + + test_conf_set(conf, "max.poll.interval.ms", "9000"); + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500"); + test_conf_set(conf, "metadata.max.age.ms", "5000"); + test_conf_set(conf, "enable.partition.eof", "true"); + test_conf_set(conf, "group.instance.id", "consumer1"); + test_conf_set(conf, "group.protocol", "consumer"); + test_conf_set(conf, "partition.assignment.strategy", + "cooperative-sticky"); + + rd_kafka_conf_set_opaque(conf, &c[0]); + c[0].rk = test_create_consumer(topic, rebalance_cb, + rd_kafka_conf_dup(conf), NULL); + + rd_kafka_conf_set_opaque(conf, &c[1]); + test_conf_set(conf, "group.instance.id", "consumer2"); + c[1].rk = test_create_consumer(topic, rebalance_cb, + rd_kafka_conf_dup(conf), NULL); + rd_kafka_conf_destroy(conf); + + test_wait_topic_exists(c[1].rk, topic, 5000); + + test_consumer_subscribe(c[0].rk, topics); + test_consumer_subscribe(c[1].rk, topics); + + rebalance_start = test_clock(); + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, 1000)) { + /* keep consumer 2 alive while consumer 1 awaits + * its assignment + */ + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + /* + * Consume all the messages so we can watch for duplicates + * after rejoin/rebalance operations. + */ + c[0].curr_line = __LINE__; + test_consumer_poll("serve.queue", c[0].rk, testid, c[0].partition_cnt, + 0, -1, &mv); + c[1].curr_line = __LINE__; + test_consumer_poll("serve.queue", c[1].rk, testid, c[1].partition_cnt, + 0, -1, &mv); + + test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt); + + TEST_SAY("== Testing consumer restart ==\n"); + conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk)); + + /* Only c[1] should exhibit rebalance behavior */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + TIMING_START(&t_close, "consumer restart"); + test_consumer_close(c[1].rk); + rd_kafka_destroy(c[1].rk); + + c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + rd_kafka_poll_set_consumer(c[1].rk); + + test_consumer_subscribe(c[1].rk, topics); + + /* Await assignment */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + rebalance_start = test_clock(); + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + } + TIMING_STOP(&t_close); + + /* Should complete before `session.timeout.ms` */ + TIMING_ASSERT(&t_close, 0, session_timeout_ms); + + + TEST_SAY("== Testing subscription expansion ==\n"); + + /* + * New topics matching the subscription pattern should cause + * group rebalance. Partition count >= 2 as with KIP 848 + * only the members receiving the new partitions will have + * rebalance callbacks triggered. + */ + test_create_topic_wait_exists(c->rk, tsprintf("%snew", topic), 4, 1, + 5000); + + + /* Await assignment */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, 1000)) { + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + TEST_SAY("== Testing consumer unsubscribe ==\n"); + + /* Unsubscribe doesn't invoke a rebalance with static membership */ + + /* Send ConsumerGroupHeartbeat(-2), not leaving the group */ + rebalance_start = test_clock(); + prev_assigned[0] = c[0].assigned_at; + prev_revoked[0] = c[0].revoked_at; + rd_kafka_unsubscribe(c[1].rk); + + /* Await revocation */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at, + -1); + + rd_usleep(3000 * 1000, 0); + static_member_expect_no_rebalance(&c[0], prev_assigned[0], + prev_revoked[0]); + + test_consumer_subscribe(c[1].rk, topics); + + /* Await assignment */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + } + + TEST_SAY("== Testing max poll violation ==\n"); + + /* + * Stop polling consumer 2 until we reach + * `max.poll.interval.ms` and is evicted from the group. + */ + rebalance_start = test_clock(); + prev_assigned[0] = c[0].assigned_at; + prev_revoked[0] = c[0].revoked_at; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + + for (i = 0; i < 10; i++) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + rd_usleep(1000 * 1000, 0); + } + static_member_expect_no_rebalance(&c[0], prev_assigned[0], + prev_revoked[0]); + + /* consumer 2 restarts polling and re-joins the group */ + rebalance_start = test_clock(); + c[1].curr_line = __LINE__; + test_consumer_poll_expect_err(c[1].rk, testid, 1000, + RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED); + + /* Await revocation */ + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].revoked_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 1000); + } + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n"); + + rebalance_start = test_clock(); + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + TIMING_START(&t_close, "consumer close"); + test_consumer_close(c[0].rk); + rd_kafka_destroy(c[0].rk); + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + TIMING_ASSERT(&t_close, session_timeout_ms - 4000, + session_timeout_ms + 4000); + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + test_consumer_close(c[1].rk); + rd_kafka_destroy(c[1].rk); + + test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0, msgcnt); + test_msgver_clear(&mv); + rd_free(topics); + + SUB_TEST_PASS(); +} + /** * @brief Await a non-empty assignment for all consumers in \p c @@ -749,6 +1005,15 @@ static void do_test_static_membership_mock(int variation) { "Expected assignment for consumer 1 after the change"); TEST_ASSERT(next_assignment2 != NULL, "Expected assignment for consumer 2 after the change"); + + TEST_ASSERT(next_generation_id1 == prev_generation_id1, + "Expected same generation id for consumer 1, " + "got %d != %d", + prev_generation_id1, next_generation_id1); + TEST_ASSERT(next_generation_id2 == prev_generation_id2, + "Expected same generation id for consumer 2, " + "got %d != %d", + prev_generation_id2, next_generation_id2); TEST_ASSERT(!test_partition_list_and_offsets_cmp(prev_assignment1, next_assignment1), "Expected same assignment for consumer 1 after the change"); @@ -770,15 +1035,12 @@ static void do_test_static_membership_mock(int variation) { } int main_0102_static_group_rebalance(int argc, char **argv) { - /* TODO: check again when regexes - * will be supported by KIP-848 */ - if (test_consumer_group_protocol_classic()) { - do_test_static_group_rebalance(); - } if (test_consumer_group_protocol_classic()) { + do_test_static_group_rebalance_classic(); do_test_fenced_member_classic(); } else { + do_test_static_group_rebalance_consumer(); do_test_fenced_member_consumer(); }