From c75eae84846b1023422b75798c41d4b6b1f8b0b7 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 21 Feb 2023 14:33:51 +0100 Subject: [PATCH 1/4] Mark KIP-88 as supported (#4197) --- INTRODUCTION.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/INTRODUCTION.md b/INTRODUCTION.md index f3bfc7a78d..dd81ac6d69 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -1856,7 +1856,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-84 - SASL SCRAM | 0.10.2.0 | Supported | | KIP-85 - SASL config properties | 0.10.2.0 | Supported | | KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported | -| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Not supported | +| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported | | KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported | | KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported | | KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported | From 42bd8621b270819d8a4f986a31a1d75b22ec5f99 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 10 Mar 2023 08:32:34 +0100 Subject: [PATCH 2/4] Temporary fix for CoApp discontinuation (#4211) Disabled Windows MSVC NuGet package job, Needs an alternative before next release. --- .semaphore/semaphore.yml | 130 +++++++++++++++++++++------------------ win32/install-coapp.ps1 | 2 +- 2 files changed, 70 insertions(+), 62 deletions(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index ea97c6deb6..f2f801ea36 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -259,66 +259,71 @@ blocks: commands: - C:\msys64\usr\bin\bash -lc './packaging/mingw-w64/semaphoreci-build.sh --static ./artifacts/librdkafka.tgz' - - - name: 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019' - dependencies: [] - task: - agent: - machine: - type: s1-prod-windows - env_vars: - # Disable vcpkg telemetry - - name: VCPKG_DISABLE_METRICS - value: 'yes' - prologue: - commands: - # install vcpkg in the parent directory. - - pwd - - cd .. - # Restore vcpkg caches, if any. - - cache restore vcpkg-archives-$Env:ARTIFACT_KEY - # Setup vcpkg - - "& .\\librdkafka\\win32\\setup-vcpkg.ps1" - - cd librdkafka - - ..\vcpkg\vcpkg integrate install - # Install required packages. - - ..\vcpkg\vcpkg --feature-flags=versions install --triplet $Env:triplet - - cd .. - - pwd - # Store vcpkg caches - - ls vcpkg/ - - echo $Env:VCPKG_ROOT - - cache delete vcpkg-archives-$Env:ARTIFACT_KEY - - cache store vcpkg-archives-$Env:ARTIFACT_KEY C:/Users/semaphore/AppData/Local/vcpkg/archives - - pwd - - cd librdkafka - # coapp is needed for creating the intermediary nuget packages. - - "& .\\win32\\install-coapp.ps1" - epilogue: - commands: - - Get-ChildItem . -include *.dll -recurse - - Get-ChildItem . -include *.lib -recurse - - if ($env:SEMAPHORE_GIT_TAG_NAME -ne "") { artifact push workflow artifacts/ --destination artifacts/$Env:ARTIFACT_KEY/ } - jobs: - - name: 'Build: MSVC x64' - env_vars: - - name: triplet - value: x64-windows - - name: ARTIFACT_KEY - value: p-librdkafka__plat-windows__dist-msvc__arch-x64__lnk-std - commands: - - "& .\\win32\\msbuild.ps1 -config Release -platform x64" - - "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\" - - - name: 'Build: MSVC x86' - env_vars: - - name: triplet - value: x86-windows - - name: ARTIFACT_KEY - value: p-librdkafka__plat-windows__dist-msvc__arch-x86__lnk-std - commands: - - "& .\\win32\\msbuild.ps1 -config Release -platform Win32" - - "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\" + # Disabled because of CoApp discontinuation, need to find an alternative + # before next release. + # + # - name: 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019' + # dependencies: [] + # task: + # agent: + # machine: + # type: s1-prod-windows + # env_vars: + # # Disable vcpkg telemetry + # - name: VCPKG_DISABLE_METRICS + # value: 'yes' + # prologue: + # commands: + # # install vcpkg in the parent directory. + # - pwd + # - cd .. + # # Restore vcpkg caches, if any. + # - cache restore vcpkg-archives-$Env:ARTIFACT_KEY + # # Setup vcpkg + # - "& .\\librdkafka\\win32\\setup-vcpkg.ps1" + # - cd librdkafka + # - ..\vcpkg\vcpkg integrate install + # # Install required packages. + # - ..\vcpkg\vcpkg --feature-flags=versions install --triplet $Env:triplet + # - cd .. + # - pwd + # # Store vcpkg caches + # - ls vcpkg/ + # - echo $Env:VCPKG_ROOT + # - cache delete vcpkg-archives-$Env:ARTIFACT_KEY + # - cache store vcpkg-archives-$Env:ARTIFACT_KEY C:/Users/semaphore/AppData/Local/vcpkg/archives + # - pwd + # - cd librdkafka + # # coapp is needed for creating the intermediary nuget packages. + # #- "& .\\win32\\install-coapp.ps1" + # epilogue: + # commands: + # - Get-ChildItem . -include *.dll -recurse + # - Get-ChildItem . -include *.lib -recurse + # - if ($env:SEMAPHORE_GIT_TAG_NAME -ne "") { artifact push workflow artifacts/ --destination artifacts/$Env:ARTIFACT_KEY/ } + # jobs: + # - name: 'Build: MSVC x64' + # env_vars: + # - name: triplet + # value: x64-windows + # - name: ARTIFACT_KEY + # value: p-librdkafka__plat-windows__dist-msvc__arch-x64__lnk-std + # commands: + # - "& .\\win32\\msbuild.ps1 -config Release -platform x64" + # # Disabled because of CoApp discontinuation, need to find an alternative + # # before next release. + # #- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\" + # - name: 'Build: MSVC x86' + # env_vars: + # - name: triplet + # value: x86-windows + # - name: ARTIFACT_KEY + # value: p-librdkafka__plat-windows__dist-msvc__arch-x86__lnk-std + # commands: + # - "& .\\win32\\msbuild.ps1 -config Release -platform Win32" + # # Disabled because of CoApp discontinuation, need to find an alternative + # # before next release. + # #- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\" - name: 'Packaging' dependencies: @@ -328,7 +333,10 @@ blocks: - 'Linux x64: release artifact docker builds' - 'Linux arm64: release artifact docker builds' - 'Windows x64: MinGW-w64' - - 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019' + # Disabled because of CoApp discontinuation, need to find an alternative + # before next release. + # + #- 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019' run: when: "tag =~ '^v[0-9]\\.'" task: diff --git a/win32/install-coapp.ps1 b/win32/install-coapp.ps1 index 0bfb0d2919..d96f36ee57 100644 --- a/win32/install-coapp.ps1 +++ b/win32/install-coapp.ps1 @@ -1,7 +1,7 @@ # Download the CoApp tools. $msiPath = "$($env:USERPROFILE)\\CoApp.Tools.Powershell.msi" -(New-Object Net.WebClient).DownloadFile('http://coapp.org/files/CoApp.Tools.Powershell.msi', $msiPath) +(New-Object Net.WebClient).DownloadFile('https://github.com/coapp/coapp.github.io/blob/master/files/Latest.CoApp.Tools.Powershell.msi', $msiPath) # Install the CoApp tools from the downloaded .msi. Start-Process -FilePath msiexec -ArgumentList /i, $msiPath, /quiet -Wait From bd8f2a644c36337851014a381508041c949700ea Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 10 Mar 2023 10:53:39 +0100 Subject: [PATCH 3/4] Fix close blocked by reference count in test 0113. (#4187) Breaks a circular dependency from rko to rktp and back that prevents the toppar from being destroyed --- CHANGELOG.md | 17 +++++++++++++++ src/rdkafka_cgrp.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6332265712..5021de10ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +# librdkafka v2.0.3 + +librdkafka v2.0.3 is a bugfix release: + +* Fix a reference count issue blocking the consumer from closing (#4187). + + +## Fixes + +### Consumer fixes + + * A reference count issue was blocking the consumer from closing. + The problem would happen when a partition is lost, because forcibly + unassigned from the consumer or if the corresponding topic is deleted. + + + # librdkafka v2.0.2 librdkafka v2.0.2 is a bugfix release: diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index dc7ed6c0e9..922ad2e2ba 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2726,6 +2726,10 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + rd_kafka_op_t *rko; + rd_kafka_q_t *rkq; + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, @@ -2734,6 +2738,56 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_lock(rktp); rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; + + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + rd_assert(!rkq->rkq_fwdq); + + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + + mtx_unlock(&rkq->rkq_lock); + + if (cnt) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge(rktp->rktp_fetchq); + } else { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + rd_kafka_toppar_unlock(rktp); rd_list_remove(&rkcg->rkcg_toppars, rktp); From 578589db0feb5961f4ca02a0ef2587b659bcab78 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 10 Mar 2023 15:54:42 +0530 Subject: [PATCH 4/4] Fix ListGroup protocol message for ApiVersion >= 3 (#4207) Co-authored-by: Emanuele Sabellico --- CHANGELOG.md | 2 ++ src/rdkafka_request.c | 7 +------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5021de10ea..9711f9ba2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ librdkafka v2.0.3 is a bugfix release: * Fix a reference count issue blocking the consumer from closing (#4187). +* Fix a protocol issue with ListGroups API, where an extra + field was appended for API Versions greater than or equal to 3. ## Fixes diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index a20d9d632a..81bee936a4 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1789,7 +1789,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf; int16_t ApiVersion = 0; size_t i; - rd_bool_t is_flexver = rd_false; if (max_ApiVersion < 0) max_ApiVersion = 4; @@ -1800,7 +1799,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, * in the application thread reliably . */ ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL); - is_flexver = ApiVersion >= 3; } if (ApiVersion == -1) { @@ -1812,7 +1810,7 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_ListGroups, 1, /* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */ - 4 + 1 + 32 * states_cnt, is_flexver); + 4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */); if (ApiVersion >= 4) { size_t of_GroupsArrayCnt = @@ -1822,9 +1820,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, } rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i); } - if (is_flexver) { - rd_kafka_buf_write_tags(rkbuf); - } rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);