diff --git a/CHANGELOG.md b/CHANGELOG.md
index c6f49a3863..3ed6665435 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,16 +2,20 @@
librdkafka v2.1.1 is a maintenance release:
+ * Avoid duplicate messages when a fetch response is received
+ in the middle of an offset validation request (#4261).
* Fix segmentation fault when subscribing to a non-existent topic and
calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage` (#4245).
* Fix a segmentation fault when fetching from follower and the partition lease
expires while waiting for the result of a list offsets operation (#4254).
-
## Fixes
### Consumer fixes
+ * Duplicate messages can be emitted when a fetch response is received
+ in the middle of an offset validation request. Solved by discarding
+ the fetch if the state is not `ACTIVE`.
* When fetching from follower, if the partition lease expires after 5 minutes,
and a list offsets operation was requested to retrieve the earliest
or latest offset, it resulted in segmentation fault. This was fixed by
@@ -88,8 +92,11 @@ librdkafka v2.1.0 is a feature release:
interceptors might be called incorrectly (maybe multiple times) for not consumed messages.
### Consume API
- * Segmentation fault when subscribing to a non-existent topic and
- calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage`.
+
+ * Duplicate messages can be emitted when a fetch response is received
+ in the middle of an offset validation request.
+ * Segmentation fault when subscribing to a non-existent topic and
+ calling `rd_kafka_message_leader_epoch()` on the polled `rkmessage`.
diff --git a/src/rdkafka_fetcher.c b/src/rdkafka_fetcher.c
index 8ee67a4205..909ad3cb12 100644
--- a/src/rdkafka_fetcher.c
+++ b/src/rdkafka_fetcher.c
@@ -508,6 +508,21 @@ rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
+ /* Make sure toppar is in ACTIVE state. */
+ if (unlikely(rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE)) {
+ rd_kafka_toppar_unlock(rktp);
+ rd_rkb_dbg(rkb, MSG, "FETCH",
+ "%.*s [%" PRId32
+ "]: partition not in state ACTIVE: "
+ "discarding fetch response",
+ RD_KAFKAP_STR_PR(topic), hdr.Partition);
+ rd_kafka_toppar_destroy(rktp); /* from get */
+ rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize);
+ if (aborted_txns)
+ rd_kafka_aborted_txns_destroy(aborted_txns);
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
fetch_version = rktp->rktp_fetch_version;
rd_kafka_toppar_unlock(rktp);
diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c
new file mode 100644
index 0000000000..e605d63704
--- /dev/null
+++ b/tests/0139-offset_validation_mock.c
@@ -0,0 +1,146 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2023, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#include "../src/rdkafka_proto.h"
+
+
+struct _produce_args {
+ const char *topic;
+ int sleep;
+ rd_kafka_conf_t *conf;
+};
+
+static int produce_concurrent_thread(void *args) {
+ rd_kafka_t *p1;
+ test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED;
+
+ struct _produce_args *produce_args = args;
+ rd_sleep(produce_args->sleep);
+
+ p1 = test_create_handle(RD_KAFKA_PRODUCER, produce_args->conf);
+ TEST_CALL_ERR__(
+ rd_kafka_producev(p1, RD_KAFKA_V_TOPIC(produce_args->topic),
+ RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
+ rd_kafka_flush(p1, -1);
+ rd_kafka_destroy(p1);
+ return 0;
+}
+
+/**
+ * @brief Send a produce request in the middle of an offset validation
+ * and expect that the fetched message is discarded, don't producing
+ * a duplicate when state becomes active again. See #4249.
+ */
+static void do_test_no_duplicates_during_offset_validation(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const char *c1_groupid = topic;
+ rd_kafka_t *c1;
+ rd_kafka_conf_t *conf, *conf_producer;
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ int initial_msg_count = 5;
+ thrd_t thrd;
+ struct _produce_args args = RD_ZERO_INIT;
+ uint64_t testid = test_id_generate();
+
+ SUB_TEST_QUICK();
+
+ mcluster = test_mock_cluster_new(1, &bootstraps);
+ rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
+
+ /* Slow down OffsetForLeaderEpoch so a produce and
+ * subsequent fetch can happen while it's in-flight */
+ rd_kafka_mock_broker_push_request_error_rtts(
+ mcluster, 1, RD_KAFKAP_OffsetForLeaderEpoch, 1,
+ RD_KAFKA_RESP_ERR_NO_ERROR, 5000);
+
+ test_conf_init(&conf_producer, NULL, 60);
+ test_conf_set(conf_producer, "bootstrap.servers", bootstraps);
+
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, testid, 0, 0, initial_msg_count, 10,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "1", NULL);
+
+ args.topic = topic;
+ /* Makes that the message is produced while an offset validation
+ * is ongoing */
+ args.sleep = 5;
+ args.conf = conf_producer;
+ /* Spin up concurrent thread */
+ if (thrd_create(&thrd, produce_concurrent_thread, (void *)&args) !=
+ thrd_success)
+ TEST_FAIL("Failed to create thread");
+
+ test_conf_init(&conf, NULL, 60);
+
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ /* Makes that an offset validation happens at the same
+ * time a new message is being produced */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", "false");
+ test_conf_set(conf, "enable.auto.offset.store", "false");
+ test_conf_set(conf, "enable.partition.eof", "true");
+
+ c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
+ test_consumer_subscribe(c1, topic);
+
+ /* Consume initial messages */
+ test_consumer_poll("MSG_INIT", c1, testid, 0, 0, initial_msg_count,
+ NULL);
+ /* EOF after initial messages */
+ test_consumer_poll("MSG_EOF", c1, testid, 1, initial_msg_count, 0,
+ NULL);
+ /* Concurrent producer message and EOF */
+ test_consumer_poll("MSG_AND_EOF", c1, testid, 1, initial_msg_count, 1,
+ NULL);
+ /* Only an EOF, not a duplicate message */
+ test_consumer_poll("MSG_EOF2", c1, testid, 1, initial_msg_count, 0,
+ NULL);
+
+ thrd_join(thrd, NULL);
+
+ rd_kafka_destroy(c1);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_LATER_CHECK();
+ SUB_TEST_PASS();
+}
+
+int main_0139_offset_validation_mock(int argc, char **argv) {
+
+ do_test_no_duplicates_during_offset_validation();
+
+ return 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 75725704ff..f165b1adfb 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -129,6 +129,7 @@ set(
0136-resolve_cb.c
0137-barrier_batch_consume.c
0138-admin_mock.c
+ 0139-offset_validation_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
diff --git a/tests/test.c b/tests/test.c
index 4c49f5d447..722955457c 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -246,6 +246,8 @@ _TEST_DECL(0135_sasl_credentials);
_TEST_DECL(0136_resolve_cb);
_TEST_DECL(0137_barrier_batch_consume);
_TEST_DECL(0138_admin_mock);
+_TEST_DECL(0139_offset_validation_mock);
+
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -491,6 +493,7 @@ struct test tests[] = {
_TEST(0136_resolve_cb, TEST_F_LOCAL),
_TEST(0137_barrier_batch_consume, 0),
_TEST(0138_admin_mock, TEST_F_LOCAL, TEST_BRKVER(2, 4, 0, 0)),
+ _TEST(0139_offset_validation_mock, 0),
/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index 360ae32b7c..3296db168c 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -219,6 +219,7 @@
+