From 9c7ed7a51366f2040150bd9621d0108dd42289e6 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 1 Apr 2020 20:15:08 -0700 Subject: [PATCH] [pulsar-client] add api to wait for inflight messages while closing producer --- .../client/api/ProducerCreationTest.java | 53 +++++++++++++++++++ .../apache/pulsar/client/api/Producer.java | 12 ++++- .../client/impl/PartitionedProducerImpl.java | 7 ++- .../pulsar/client/impl/ProducerImpl.java | 41 ++++++++++++-- 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java index 8aed4f978f4be..e04e1c71bd90c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java @@ -18,15 +18,25 @@ */ package org.apache.pulsar.client.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; +import static org.testng.Assert.fail; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + public class ProducerCreationTest extends ProducerConsumerBase { @BeforeMethod @@ -92,4 +102,47 @@ public void testGeneratedNameProducerReconnect(TopicDomain domain) throws Pulsar Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1); Assert.assertTrue(producer.isConnected()); } + + /** + * It tests closing producer waits until it receives acks from the broker. + * + *
+     *  a. Create producer successfully
+     *  b. stop broker
+     *  c. publish messages
+     *  d. close producer with wait flag
+     *  f. start broker
+     *  g. producer will be closed after receiving ack of in flight published message
+     * 
+ * + * @throws Exception + */ + @Test(timeOut = 30000) + public void testCloseProducerWaitForInFlightMessages() throws Exception { + final String topicName = "persistent://public/default/closeProducerWait"; + admin.topics().createPartitionedTopic(topicName, 4); + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()) + .startingBackoffInterval(10, TimeUnit.MILLISECONDS).maxBackoffInterval(10, TimeUnit.MILLISECONDS) + .build(); + Producer producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false).sendTimeout(10, TimeUnit.MINUTES).create(); + stopBroker(); + int totalMessages = 10; + List> sendResults = Lists.newArrayList(); + for (int i = 0; i < totalMessages; i++) { + sendResults.add(producer.sendAsync("test".getBytes())); + } + CompletableFuture closeFuture = producer.closeAsync(true); + startBroker(); + Thread.sleep(1000); + try { + producer.send("test".getBytes()); + fail("should have failed because producer is closing"); + } catch (Exception e) { + // Ok.. expected + } + closeFuture.get(); + // all messages must be published successfully before it's closed + FutureUtil.waitForAll(sendResults).get(10, TimeUnit.SECONDS); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java index 5b644ac21a467..13c4e594a837c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -158,7 +158,7 @@ public interface Producer extends Closeable { void close() throws PulsarClientException; /** - * Close the producer and releases resources allocated. + * Close the producer immediately and releases resources allocated. * *

No more writes will be accepted from this producer. Waits until all pending write request are persisted. * In case of errors, pending writes will not be retried. @@ -167,6 +167,16 @@ public interface Producer extends Closeable { */ CompletableFuture closeAsync(); + /** + * Close the producer and releases resources allocated. If waitForPendingMessage flag is true then producer waits in + * flight messages to be acknowledged before closing the producer resources. + * + *

No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case + * of errors, pending writes will not be retried. + * + * @return a future that can used to track when the producer has been closed + */ + CompletableFuture closeAsync(boolean waitForPendingMessage); /** * @return Whether the producer is currently connected to the broker */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index dd7b29bb8f923..01e81d5586c53 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -203,6 +203,11 @@ public boolean isConnected() { @Override public CompletableFuture closeAsync() { + return closeAsync(false); + } + + @Override + public CompletableFuture closeAsync(boolean waitForPendingMessages) { if (getState() == State.Closing || getState() == State.Closed) { return CompletableFuture.completedFuture(null); } @@ -218,7 +223,7 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); for (Producer producer : producers) { if (producer != null) { - producer.closeAsync().handle((closed, ex) -> { + producer.closeAsync(waitForPendingMessages).handle((closed, ex) -> { if (ex != null) { closeFail.compareAndSet(null, ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6585842718b5b..802cd7aafd35e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -50,7 +50,9 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; + import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; @@ -119,6 +121,13 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne protected volatile long lastSequenceIdPushed; private volatile boolean isLastSequenceIdPotentialDuplicated; + public static final int FALSE = 0; + public static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater CLOSING_IN_PROGRESS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ProducerImpl.class, "closingInProgress"); + @SuppressWarnings("unused") + private volatile int closingInProgress = FALSE; + private final MessageCrypto msgCrypto; private ScheduledFuture keyGeneratorTask = null; @@ -627,6 +636,10 @@ private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBu } private boolean isValidProducerState(SendCallback callback) { + if (closingInProgress == TRUE) { + callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed")); + return false; + } switch (getState()) { case Ready: // OK @@ -722,6 +735,25 @@ protected WriteInEventLoopCallback newObject(Handle ha @Override public CompletableFuture closeAsync() { + return closeAsync(false); + } + + @Override + public CompletableFuture closeAsync(boolean waitForPendingMessage) { + CompletableFuture closeResult = new CompletableFuture<>(); + closeAsync(waitForPendingMessage, closeResult); + return closeResult; + } + + private void closeAsync(boolean waitForPendingMessage, CompletableFuture closeFuture) { + if (waitForPendingMessage && !pendingMessages.isEmpty()) { + CLOSING_IN_PROGRESS_UPDATER.set(this, TRUE); + client.timer().newTimeout((timeout) -> { + closeAsync(true, closeFuture); + }, 10, TimeUnit.MILLISECONDS); + return; + } + final State currentState = getAndUpdateState(state -> { if (state == State.Closed) { return state; @@ -730,7 +762,8 @@ public CompletableFuture closeAsync() { }); if (currentState == State.Closed || currentState == State.Closing) { - return CompletableFuture.completedFuture(null); + closeFuture.complete(null); + return; } Timeout timeout = sendTimeout; @@ -768,13 +801,13 @@ public CompletableFuture closeAsync() { pendingMessages.clear(); } - return CompletableFuture.completedFuture(null); + closeFuture.complete(null); + return; } long requestId = client.newRequestId(); ByteBuf cmd = Commands.newCloseProducer(producerId, requestId); - CompletableFuture closeFuture = new CompletableFuture<>(); cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> { cnx.removeProducer(producerId); if (exception == null || !cnx.ctx().channel().isActive()) { @@ -798,8 +831,6 @@ public CompletableFuture closeAsync() { return null; }); - - return closeFuture; } @Override