From d0c8e5fb0bf63f13bf7f0390ebfd059c016f0b67 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Fri, 3 Jan 2025 15:28:21 +0100 Subject: [PATCH] Revert "Fix NPE when only batchHandler is specified (#281)" This reverts commit f02269aa9b02b02ecc8b1ea1ed21603fb3ffafca. --- .../kafka/client/consumer/impl/KafkaReadStreamImpl.java | 5 +---- .../io/vertx/kafka/client/tests/ConsumerTestBase.java | 9 +++------ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java index 465ce470..e6b2cf89 100644 --- a/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java +++ b/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java @@ -209,10 +209,7 @@ private void run(Handler> handler, Handler { if (records != null && records.count() > 0) { - if (handler != null) { - // only set iterator if records are going to be consumed by individual record handler - this.current = records.iterator(); - } + this.current = records.iterator(); if (multiHandler != null) { multiHandler.handle(records); } diff --git a/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java b/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java index 34e95177..9e042156 100644 --- a/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java +++ b/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java @@ -1311,8 +1311,7 @@ public void testConsumerBatchHandler(TestContext ctx) throws Exception { String consumerId = topicName; Async batch1 = ctx.async(); AtomicInteger index = new AtomicInteger(); - int batchSize = 500; - int numMessages = 1000; + int numMessages = 500; kafkaCluster.useTo().produceStrings(numMessages, batch1::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch1.awaitSuccess(10000); @@ -1326,7 +1325,7 @@ public void testConsumerBatchHandler(TestContext ctx) throws Exception { Async batchHandler = ctx.async(); batchHandler.handler(ar -> wrappedConsumer.close()); wrappedConsumer.batchHandler(records -> { - ctx.assertEquals(batchSize, records.size()); + ctx.assertEquals(numMessages, records.size()); for (int i = 0; i < records.size(); i++) { KafkaConsumerRecord record = records.recordAt(i); int dec = count.decrementAndGet(); @@ -1335,10 +1334,8 @@ public void testConsumerBatchHandler(TestContext ctx) throws Exception { } else { ctx.assertEquals("key-" + (-1 - dec), record.key()); } - if (dec == 0) { - batchHandler.complete(); - } } + batchHandler.complete(); }); wrappedConsumer.subscribe(Collections.singleton(topicName)); }