From 2c039a46ba216fa0024aa86d4bb27d761515dd7b Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 23 Nov 2022 15:45:17 +0100 Subject: [PATCH] Kafka producer nacking fix 5510 Signed-off-by: Daniel Kec --- .../connectors/kafka/KafkaSubscriber.java | 2 ++ .../connectors/kafka/AbstractSampleBean.java | 21 +++++++++++++++++++ .../connectors/kafka/KafkaMpTest.java | 19 +++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaSubscriber.java b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaSubscriber.java index daa12e02083..d393bf639df 100644 --- a/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaSubscriber.java +++ b/messaging/connectors/kafka/src/main/java/io/helidon/messaging/connectors/kafka/KafkaSubscriber.java @@ -116,6 +116,8 @@ record = new ProducerRecord<>(topic, message.getPayload()); subscription.request(backpressure); } }); + } else { + message.nack(exception); } }); } diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java index 8f57ca7fd22..ce6ce8d9bf2 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/AbstractSampleBean.java @@ -24,12 +24,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import jakarta.enterprise.context.ApplicationScoped; +import io.helidon.common.reactive.Multi; import io.helidon.common.reactive.Single; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -305,4 +307,23 @@ public Message channel13ToChannel12(KafkaMessage msg) { return Message.of(msg.getPayload()); } } + + @ApplicationScoped + public static class Channel14 extends AbstractSampleBean { + + AtomicBoolean acked = new AtomicBoolean(); + CompletableFuture nacked = new CompletableFuture<>(); + + public CompletableFuture getNacked() { + return nacked; + } + + @Outgoing("test-channel-14") + public Multi> channel14() { + return Multi.just(Message.of("test", + () -> CompletableFuture.completedStage(null).thenRun(() -> acked.set(true)), + t -> CompletableFuture.completedFuture(null).thenAccept(v -> nacked.complete(t)) + )); + } + } } diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java index 4660c1271a1..b91308dd660 100644 --- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java +++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaMpTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; @@ -54,6 +55,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.spi.ConfigProviderResolver; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -209,6 +211,13 @@ private static Map cdiConfig() { "mp.messaging.incoming.test-channel-13.group.id", "sameGroup", "mp.messaging.incoming.test-channel-13.key.deserializer", LongDeserializer.class.getName(), "mp.messaging.incoming.test-channel-13.value.deserializer", StringDeserializer.class.getName())); + p.putAll(Map.of( + "mp.messaging.outgoing.test-channel-14.connector", KafkaConnector.CONNECTOR_NAME, + "mp.messaging.outgoing.test-channel-14.max.block.ms", "100", + "mp.messaging.outgoing.test-channel-14.bootstrap.servers", KAFKA_SERVER, + "mp.messaging.outgoing.test-channel-14.topic", UNEXISTING_TOPIC, + "mp.messaging.outgoing.test-channel-14.key.serializer", LongSerializer.class.getName(), + "mp.messaging.outgoing.test-channel-14.value.serializer", StringSerializer.class.getName())); return p; } @@ -253,6 +262,7 @@ private static void cdiContainerUp() { classes.add(AbstractSampleBean.Channel9.class); classes.add(AbstractSampleBean.Channel11.class); classes.add(AbstractSampleBean.Channel12.class); + classes.add(AbstractSampleBean.Channel14.class); classes.add(MessagingCdiExtension.class); Map p = new HashMap<>(cdiConfig()); @@ -398,6 +408,15 @@ void kafkaSubscriberConnectionError() throws InterruptedException { kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_10); } + @Test + void kafkaProduceWithNack() throws InterruptedException, ExecutionException, TimeoutException { + LOGGER.fine(() -> "==========> test kafkaProduceWithNack()"); + AbstractSampleBean.Channel14 kafkaProdBean = cdiContainer.select(AbstractSampleBean.Channel14.class).get(); + Throwable t = kafkaProdBean.getNacked().get(5, TimeUnit.SECONDS); + assertNotNull(t); + assertThat(t.getCause(), Matchers.instanceOf(org.apache.kafka.common.errors.TimeoutException.class)); + } + @Test void kafkaSubscriberSendError() throws InterruptedException { LOGGER.fine(() -> "==========> test kafkaSubscriberSendError()");