diff --git a/CHANGELOG.md b/CHANGELOG.md index 857526c6eb..4616af95df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +# librdkafka v2.1.1 + +librdkafka v2.1.1 is a bugfix release: + + * Avoid a duplicate message when a fetch response is received + in the middle of an offset validation request (#todo). + +## Fixes + +### Consumer fixes + + * A duplicate message can be emitted when a fetch response is received + in the middle of an offset validation request. Solved by discarding + the fetched message if the state is not `ACTIVE`. + + + # librdkafka v2.1.0 librdkafka v2.1.0 is a feature release: @@ -64,6 +81,11 @@ librdkafka v2.1.0 is a feature release: any of the **seek**, **pause**, **resume** or **rebalancing** operation, `on_consume` interceptors might be called incorrectly (maybe multiple times) for not consumed messages. +### Consume API + + * A duplicate message can be emitted when a fetch response is received + in the middle of an offset validation request. + # librdkafka v2.0.2 diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c index 8ee67a4205..909ad3cb12 100644 --- a/src/rdkafka_fetcher.c +++ b/src/rdkafka_fetcher.c @@ -508,6 +508,21 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } + /* Make sure toppar is in ACTIVE state. */ + if (unlikely(rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE)) { + rd_kafka_toppar_unlock(rktp); + rd_rkb_dbg(rkb, MSG, "FETCH", + "%.*s [%" PRId32 + "]: partition not in state ACTIVE: " + "discarding fetch response", + RD_KAFKAP_STR_PR(topic), hdr.Partition); + rd_kafka_toppar_destroy(rktp); /* from get */ + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + fetch_version = rktp->rktp_fetch_version; rd_kafka_toppar_unlock(rktp);