From 2a67470706a32124842bb6467c76c545ba24980d Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Wed, 20 Sep 2023 19:04:15 +0200
Subject: [PATCH 1/4] Failing test

---
 tests/0139-offset_validation_mock.c | 132 ++++++++++++++++++++++++++++
 1 file changed, 132 insertions(+)

diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c
index d1619634b1..9ce01a599c 100644
--- a/tests/0139-offset_validation_mock.c
+++ b/tests/0139-offset_validation_mock.c
@@ -212,6 +212,136 @@ static void do_test_ssl_error_retried(void) {
 }
 
 
+/**
+ * @brief Storing an offset without leader epoch should still be allowed
+ *        and the greater than check should apply only to the offset.
+ *        See #4384.
+ */
+static void do_test_store_offset_without_leader_epoch(void) {
+        rd_kafka_mock_cluster_t *mcluster;
+        rd_kafka_conf_t *conf;
+        const char *bootstraps;
+        const char *topic      = test_mk_topic_name(__FUNCTION__, 1);
+        const char *c1_groupid = topic;
+        rd_kafka_t *c1;
+        rd_kafka_topic_t *rdk_topic;
+        uint64_t testid = test_id_generate();
+        rd_kafka_topic_partition_list_t *rktpars;
+        rd_kafka_topic_partition_t *rktpar;
+        int32_t leader_epoch;
+
+        SUB_TEST_QUICK();
+
+        mcluster = test_mock_cluster_new(3, &bootstraps);
+        rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
+
+        test_conf_init(&conf, NULL, 60);
+        test_conf_set(conf, "bootstrap.servers", bootstraps);
+        test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
+        test_conf_set(conf, "auto.offset.reset", "earliest");
+        test_conf_set(conf, "enable.auto.commit", "false");
+        test_conf_set(conf, "enable.auto.offset.store", "false");
+        test_conf_set(conf, "enable.partition.eof", "true");
+
+        c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
+        test_consumer_subscribe(c1, topic);
+
+        /* Leader epoch becomes 1. */
+        rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
+
+        /* Read EOF. */
+        test_consumer_poll("MSG_ALL", c1, testid, 1, 0, 0, NULL);
+
+        TEST_SAY(
+            "Storing offset without leader epoch with rd_kafka_offset_store");
+        rdk_topic = rd_kafka_topic_new(c1, topic, NULL);
+        /* Legacy function stores offset + 1 */
+        rd_kafka_offset_store(rdk_topic, 0, 1);
+        rd_kafka_topic_destroy(rdk_topic);
+
+        rd_kafka_commit(c1, NULL, rd_false);
+
+        rktpars = rd_kafka_topic_partition_list_new(1);
+        rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+        rd_kafka_committed(c1, rktpars, -1);
+
+        TEST_ASSERT(rktpars->elems[0].offset == 2, "expected %d, got %" PRId64,
+                    2, rktpars->elems[0].offset);
+        leader_epoch =
+            rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);
+
+        /* OffsetFetch returns the leader epoch even if not set. */
+        TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
+                    leader_epoch);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        TEST_SAY(
+            "Storing offset without leader epoch with rd_kafka_offsets_store");
+        rktpars = rd_kafka_topic_partition_list_new(1);
+        rd_kafka_topic_partition_list_add(rktpars, topic, 0)->offset = 5;
+        rd_kafka_offsets_store(c1, rktpars);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        rd_kafka_commit(c1, NULL, rd_false);
+
+        rktpars = rd_kafka_topic_partition_list_new(1);
+        rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+        rd_kafka_committed(c1, rktpars, -1);
+
+        TEST_ASSERT(rktpars->elems[0].offset == 5, "expected %d, got %" PRId64,
+                    5, rktpars->elems[0].offset);
+        leader_epoch =
+            rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);
+        /* OffsetFetch returns the leader epoch even if not set. */
+        TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
+                    leader_epoch);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        TEST_SAY(
+            "While storing offset with leader epoch it should check that value "
+            "first");
+        /* Setting it to (6,1), as last one has epoch -1. */
+        rktpars        = rd_kafka_topic_partition_list_new(1);
+        rktpar         = rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+        rktpar->offset = 6;
+        rd_kafka_topic_partition_set_leader_epoch(rktpar, 1);
+        rd_kafka_offsets_store(c1, rktpars);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        rd_kafka_commit(c1, NULL, rd_false);
+
+        /* Trying to store (7,0), it should skip the commit. */
+        rktpars        = rd_kafka_topic_partition_list_new(1);
+        rktpar         = rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+        rktpar->offset = 7;
+        rd_kafka_topic_partition_set_leader_epoch(rktpar, 0);
+        rd_kafka_offsets_store(c1, rktpars);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        rd_kafka_commit(c1, NULL, rd_false);
+
+        /* Committed offset is (6,1). */
+        rktpars = rd_kafka_topic_partition_list_new(1);
+        rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+        rd_kafka_committed(c1, rktpars, -1);
+
+        TEST_ASSERT(rktpars->elems[0].offset == 6, "expected %d, got %" PRId64,
+                    6, rktpars->elems[0].offset);
+        leader_epoch =
+            rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);
+        TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
+                    leader_epoch);
+        rd_kafka_topic_partition_list_destroy(rktpars);
+
+        rd_kafka_destroy(c1);
+
+        test_mock_cluster_destroy(mcluster);
+
+        TEST_LATER_CHECK();
+        SUB_TEST_PASS();
+}
+
+
 int main_0139_offset_validation_mock(int argc, char **argv) {
 
         if (test_needs_auth()) {
@@ -223,5 +353,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) {
 
         do_test_ssl_error_retried();
 
+        do_test_store_offset_without_leader_epoch();
+
         return 0;
 }

From 508d46ea5b726182bdfd422a1c412a34b4ecafd2 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Wed, 20 Sep 2023 19:05:22 +0200
Subject: [PATCH 2/4] PR number.

---
 CHANGELOG.md            | 12 ++++++++++++
 src/rdkafka_partition.h | 25 ++++++++++++++++++-------
 2 files changed, 30 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fbc67c8242..da16dea3fa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,18 @@ librdkafka v2.2.1 is a maintenance release:
 
  * Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
  * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0.
+ * Fix for stored offsets not being committed if they lacked the leader epoch (#4442).
+
+
+## Fixes
+
+### Consumer fixes
+
+  * Stored offsets where excluded from the commit if the leader epoch was
+    less than committed epoch, as it's possible if leader epoch is the default -1.
+    This didn't happen in Python, Go and .NET bindings when stored position was
+    taken from the message. Solved by only checking that offset is greater
+    than committed one.
 
 
 
diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h
index f9dd686423..be55d50a47 100644
--- a/src/rdkafka_partition.h
+++ b/src/rdkafka_partition.h
@@ -68,24 +68,35 @@ struct rd_kafka_toppar_err {
                                   *   last msg sequence */
 };
 
-
+/**
+ * @brief Fetchpos comparator, only offset is compared.
+ */
+static RD_UNUSED RD_INLINE int
+rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a,
+                              const rd_kafka_fetch_pos_t *b) {
+        if (a->offset < b->offset)
+                return -1;
+        else if (a->offset > b->offset)
+                return 1;
+        else
+                return 0;
+}
 
 /**
- * @brief Fetchpos comparator, leader epoch has precedence.
+ * @brief Fetchpos comparator, leader epoch has precedence
+ *        iff both values are not null.
  */
 static RD_UNUSED RD_INLINE int
 rd_kafka_fetch_pos_cmp(const rd_kafka_fetch_pos_t *a,
                        const rd_kafka_fetch_pos_t *b) {
+        if (a->leader_epoch == -1 || b->leader_epoch == -1)
+                return rd_kafka_fetch_pos_cmp_offset(a, b);
         if (a->leader_epoch < b->leader_epoch)
                 return -1;
         else if (a->leader_epoch > b->leader_epoch)
                 return 1;
-        else if (a->offset < b->offset)
-                return -1;
-        else if (a->offset > b->offset)
-                return 1;
         else
-                return 0;
+                return rd_kafka_fetch_pos_cmp_offset(a, b);
 }
 
 

From 7143457f63540bd96413547f459228ffa8cc11f1 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Thu, 28 Sep 2023 17:04:35 +0200
Subject: [PATCH 3/4] Assert no error when committing

---
 tests/0139-offset_validation_mock.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c
index 9ce01a599c..48f5cc7e51 100644
--- a/tests/0139-offset_validation_mock.c
+++ b/tests/0139-offset_validation_mock.c
@@ -282,7 +282,7 @@ static void do_test_store_offset_without_leader_epoch(void) {
         rd_kafka_offsets_store(c1, rktpars);
         rd_kafka_topic_partition_list_destroy(rktpars);
 
-        rd_kafka_commit(c1, NULL, rd_false);
+        TEST_CALL_ERR__(rd_kafka_commit(c1, NULL, rd_false));
 
         rktpars = rd_kafka_topic_partition_list_new(1);
         rd_kafka_topic_partition_list_add(rktpars, topic, 0);

From dbbee3676668ad54ba17cdf13c432e77de576152 Mon Sep 17 00:00:00 2001
From: Emanuele Sabellico <esabellico@confluent.io>
Date: Thu, 28 Sep 2023 17:14:43 +0200
Subject: [PATCH 4/4] Address comments

---
 CHANGELOG.md            | 7 ++++---
 src/rdkafka_partition.h | 7 +------
 2 files changed, 5 insertions(+), 9 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index da16dea3fa..739ed5aab7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,11 +11,12 @@ librdkafka v2.2.1 is a maintenance release:
 
 ### Consumer fixes
 
-  * Stored offsets where excluded from the commit if the leader epoch was
+  * Stored offsets were excluded from the commit if the leader epoch was
     less than committed epoch, as it's possible if leader epoch is the default -1.
     This didn't happen in Python, Go and .NET bindings when stored position was
-    taken from the message. Solved by only checking that offset is greater
-    than committed one.
+    taken from the message.
+    Solved by checking only that the stored offset is greater
+    than committed one, if either stored or committed leader epoch is -1 (#4442).
 
 
 
diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h
index be55d50a47..6fbb86db04 100644
--- a/src/rdkafka_partition.h
+++ b/src/rdkafka_partition.h
@@ -74,12 +74,7 @@ struct rd_kafka_toppar_err {
 static RD_UNUSED RD_INLINE int
 rd_kafka_fetch_pos_cmp_offset(const rd_kafka_fetch_pos_t *a,
                               const rd_kafka_fetch_pos_t *b) {
-        if (a->offset < b->offset)
-                return -1;
-        else if (a->offset > b->offset)
-                return 1;
-        else
-                return 0;
+        return (RD_CMP(a->offset, b->offset));
 }
 
 /**