From ff1aaf579512e79129b869c266b6add0a3d3a613 Mon Sep 17 00:00:00 2001 From: Pranav Rathi <4427674+pranavrth@users.noreply.github.com> Date: Mon, 14 Aug 2023 18:31:48 +0530 Subject: [PATCH] Fix ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. (#4346) --- CHANGELOG.md | 1 + src/rdkafka_request.c | 2 +- tests/0081-admin.c | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11b1be60ae..fbc67c8242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ librdkafka v2.2.1 is a maintenance release: * Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index e96b0f7c78..005833d204 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1180,7 +1180,7 @@ void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb, rkbuf, parts, rd_false /*include invalid offsets*/, rd_false /*skip valid offsets */, fields); } else { - rd_kafka_buf_write_arraycnt_pos(rkbuf); + rd_kafka_buf_write_arraycnt(rkbuf, PartCnt); } if (ApiVersion >= 7) { diff --git a/tests/0081-admin.c b/tests/0081-admin.c index e960342f17..ed39cfc9bb 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -4308,7 +4308,9 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_DeleteConsumerGroupOffsets( "main queue", rk, mainq, 1500, rd_true /*with subscribing consumer*/); + } + if (test_broker_version >= TEST_BRKVER(2, 5, 0, 0)) { /* Alter committed offsets */ do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1, rd_false, rd_true); @@ -4321,7 +4323,9 @@ static void do_test_apis(rd_kafka_type_t cltype) { "main queue", rk, mainq, 1500, rd_true, /*with subscribing consumer*/ rd_true); + } + if (test_broker_version >= TEST_BRKVER(2, 0, 0, 0)) { /* List committed offsets */ do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1, rd_false, rd_false);