Skip to content

Commit

Permalink
Merge branch 'master' into feature/kip320
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Mar 10, 2023
2 parents 5e50954 + 578589d commit b13bade
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 69 deletions.
130 changes: 69 additions & 61 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -259,66 +259,71 @@ blocks:
commands:
- C:\msys64\usr\bin\bash -lc './packaging/mingw-w64/semaphoreci-build.sh --static ./artifacts/librdkafka.tgz'


- name: 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019'
dependencies: []
task:
agent:
machine:
type: s1-prod-windows
env_vars:
# Disable vcpkg telemetry
- name: VCPKG_DISABLE_METRICS
value: 'yes'
prologue:
commands:
# install vcpkg in the parent directory.
- pwd
- cd ..
# Restore vcpkg caches, if any.
- cache restore vcpkg-archives-$Env:ARTIFACT_KEY
# Setup vcpkg
- "& .\\librdkafka\\win32\\setup-vcpkg.ps1"
- cd librdkafka
- ..\vcpkg\vcpkg integrate install
# Install required packages.
- ..\vcpkg\vcpkg --feature-flags=versions install --triplet $Env:triplet
- cd ..
- pwd
# Store vcpkg caches
- ls vcpkg/
- echo $Env:VCPKG_ROOT
- cache delete vcpkg-archives-$Env:ARTIFACT_KEY
- cache store vcpkg-archives-$Env:ARTIFACT_KEY C:/Users/semaphore/AppData/Local/vcpkg/archives
- pwd
- cd librdkafka
# coapp is needed for creating the intermediary nuget packages.
- "& .\\win32\\install-coapp.ps1"
epilogue:
commands:
- Get-ChildItem . -include *.dll -recurse
- Get-ChildItem . -include *.lib -recurse
- if ($env:SEMAPHORE_GIT_TAG_NAME -ne "") { artifact push workflow artifacts/ --destination artifacts/$Env:ARTIFACT_KEY/ }
jobs:
- name: 'Build: MSVC x64'
env_vars:
- name: triplet
value: x64-windows
- name: ARTIFACT_KEY
value: p-librdkafka__plat-windows__dist-msvc__arch-x64__lnk-std
commands:
- "& .\\win32\\msbuild.ps1 -config Release -platform x64"
- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\"

- name: 'Build: MSVC x86'
env_vars:
- name: triplet
value: x86-windows
- name: ARTIFACT_KEY
value: p-librdkafka__plat-windows__dist-msvc__arch-x86__lnk-std
commands:
- "& .\\win32\\msbuild.ps1 -config Release -platform Win32"
- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\"
# Disabled because of CoApp discontinuation, need to find an alternative
# before next release.
#
# - name: 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019'
# dependencies: []
# task:
# agent:
# machine:
# type: s1-prod-windows
# env_vars:
# # Disable vcpkg telemetry
# - name: VCPKG_DISABLE_METRICS
# value: 'yes'
# prologue:
# commands:
# # install vcpkg in the parent directory.
# - pwd
# - cd ..
# # Restore vcpkg caches, if any.
# - cache restore vcpkg-archives-$Env:ARTIFACT_KEY
# # Setup vcpkg
# - "& .\\librdkafka\\win32\\setup-vcpkg.ps1"
# - cd librdkafka
# - ..\vcpkg\vcpkg integrate install
# # Install required packages.
# - ..\vcpkg\vcpkg --feature-flags=versions install --triplet $Env:triplet
# - cd ..
# - pwd
# # Store vcpkg caches
# - ls vcpkg/
# - echo $Env:VCPKG_ROOT
# - cache delete vcpkg-archives-$Env:ARTIFACT_KEY
# - cache store vcpkg-archives-$Env:ARTIFACT_KEY C:/Users/semaphore/AppData/Local/vcpkg/archives
# - pwd
# - cd librdkafka
# # coapp is needed for creating the intermediary nuget packages.
# #- "& .\\win32\\install-coapp.ps1"
# epilogue:
# commands:
# - Get-ChildItem . -include *.dll -recurse
# - Get-ChildItem . -include *.lib -recurse
# - if ($env:SEMAPHORE_GIT_TAG_NAME -ne "") { artifact push workflow artifacts/ --destination artifacts/$Env:ARTIFACT_KEY/ }
# jobs:
# - name: 'Build: MSVC x64'
# env_vars:
# - name: triplet
# value: x64-windows
# - name: ARTIFACT_KEY
# value: p-librdkafka__plat-windows__dist-msvc__arch-x64__lnk-std
# commands:
# - "& .\\win32\\msbuild.ps1 -config Release -platform x64"
# # Disabled because of CoApp discontinuation, need to find an alternative
# # before next release.
# #- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\"
# - name: 'Build: MSVC x86'
# env_vars:
# - name: triplet
# value: x86-windows
# - name: ARTIFACT_KEY
# value: p-librdkafka__plat-windows__dist-msvc__arch-x86__lnk-std
# commands:
# - "& .\\win32\\msbuild.ps1 -config Release -platform Win32"
# # Disabled because of CoApp discontinuation, need to find an alternative
# # before next release.
# #- "& .\\win32\\package-nuget.ps1 -destdir .\\artifacts\\"

- name: 'Packaging'
dependencies:
Expand All @@ -328,7 +333,10 @@ blocks:
- 'Linux x64: release artifact docker builds'
- 'Linux arm64: release artifact docker builds'
- 'Windows x64: MinGW-w64'
- 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019'
# Disabled because of CoApp discontinuation, need to find an alternative
# before next release.
#
#- 'Windows x64: Windows SDK 10.0 / MSVC v142 / VS 2019'
run:
when: "tag =~ '^v[0-9]\\.'"
task:
Expand Down
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ librdkafka v2.1.0 is a feature release:

* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
Allow fetchers to detect and handle log truncation (#4122).
* Fix a reference count issue blocking the consumer from closing (#4187).
* Fix a protocol issue with ListGroups API, where an extra
field was appended for API Versions greater than or equal to 3 (#4207).


## Enhancements

Expand All @@ -18,6 +22,15 @@ librdkafka v2.1.0 is a feature release:
added to per-partition statistics.


## Fixes

### Consumer fixes

* A reference count issue was blocking the consumer from closing.
The problem would happen when a partition is lost, because forcibly
unassigned from the consumer or if the corresponding topic is deleted.



# librdkafka v2.0.2

Expand Down
2 changes: 1 addition & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1856,7 +1856,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-84 - SASL SCRAM | 0.10.2.0 | Supported |
| KIP-85 - SASL config properties | 0.10.2.0 | Supported |
| KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Not supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported |
| KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported |
| KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported |
Expand Down
54 changes: 54 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,10 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg,
*/
static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_t *rktp) {
int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
rd_kafka_op_t *rko;
rd_kafka_q_t *rkq;

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
"Group \"%s\": delete %s [%" PRId32 "]",
rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str,
Expand All @@ -2740,6 +2744,56 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
rd_kafka_toppar_lock(rktp);
rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;

if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) {
/* Partition is being removed from the cluster and it's stopped,
* so rktp->rktp_fetchq->rkq_fwdq is NULL.
* Purge remaining operations in rktp->rktp_fetchq->rkq_q,
* while holding lock, to avoid circular references */
rkq = rktp->rktp_fetchq;
mtx_lock(&rkq->rkq_lock);
rd_assert(!rkq->rkq_fwdq);

rko = TAILQ_FIRST(&rkq->rkq_q);
while (rko) {
if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
rko->rko_type != RD_KAFKA_OP_FETCH) {
rd_kafka_log(
rkcg->rkcg_rk, LOG_WARNING, "PARTDEL",
"Purging toppar fetch queue buffer op"
"with unexpected type: %s",
rd_kafka_op2str(rko->rko_type));
}

if (rko->rko_type == RD_KAFKA_OP_BARRIER)
barrier_cnt++;
else if (rko->rko_type == RD_KAFKA_OP_FETCH)
message_cnt++;
else
other_cnt++;

rko = TAILQ_NEXT(rko, rko_link);
cnt++;
}

mtx_unlock(&rkq->rkq_lock);

if (cnt) {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
"Purge toppar fetch queue buffer "
"containing %d op(s) "
"(%d barrier(s), %d message(s), %d other)"
" to avoid "
"circular references",
cnt, barrier_cnt, message_cnt, other_cnt);
rd_kafka_q_purge(rktp->rktp_fetchq);
} else {
rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
"Not purging toppar fetch queue buffer."
" No ops present in the buffer.");
}
}

rd_kafka_toppar_unlock(rktp);

rd_list_remove(&rkcg->rkcg_toppars, rktp);
Expand Down
7 changes: 1 addition & 6 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion = 0;
size_t i;
rd_bool_t is_flexver = rd_false;

if (max_ApiVersion < 0)
max_ApiVersion = 4;
Expand All @@ -1983,7 +1982,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
* in the application thread reliably . */
ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL);
is_flexver = ApiVersion >= 3;
}

if (ApiVersion == -1) {
Expand All @@ -1995,7 +1993,7 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_ListGroups, 1,
/* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */
4 + 1 + 32 * states_cnt, is_flexver);
4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */);

if (ApiVersion >= 4) {
size_t of_GroupsArrayCnt =
Expand All @@ -2005,9 +2003,6 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
}
rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i);
}
if (is_flexver) {
rd_kafka_buf_write_tags(rkbuf);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
Expand Down
2 changes: 1 addition & 1 deletion win32/install-coapp.ps1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Download the CoApp tools.
$msiPath = "$($env:USERPROFILE)\\CoApp.Tools.Powershell.msi"

(New-Object Net.WebClient).DownloadFile('http://coapp.org/files/CoApp.Tools.Powershell.msi', $msiPath)
(New-Object Net.WebClient).DownloadFile('https://github.com/coapp/coapp.github.io/blob/master/files/Latest.CoApp.Tools.Powershell.msi', $msiPath)

# Install the CoApp tools from the downloaded .msi.
Start-Process -FilePath msiexec -ArgumentList /i, $msiPath, /quiet -Wait
Expand Down

0 comments on commit b13bade

Please sign in to comment.