diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
index 3ed6ef764936c..cf7c61e1fcc1b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java
@@ -18,15 +18,25 @@
*/
package org.apache.pulsar.client.api;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
+import static org.testng.Assert.fail;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
public class ProducerCreationTest extends ProducerConsumerBase {
@BeforeMethod
@@ -92,4 +102,47 @@ public void testGeneratedNameProducerReconnect(TopicDomain domain) throws Pulsar
Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1);
Assert.assertTrue(producer.isConnected());
}
+
+ /**
+ * It tests closing producer waits until it receives acks from the broker.
+ *
+ *
+ * a. Create producer successfully
+ * b. stop broker
+ * c. publish messages
+ * d. close producer with wait flag
+ * f. start broker
+ * g. producer will be closed after receiving ack of in flight published message
+ *
+ *
+ * @throws Exception
+ */
+ @Test(timeOut = 30000)
+ public void testCloseProducerWaitForInFlightMessages() throws Exception {
+ final String topicName = "persistent://public/default/closeProducerWait";
+ admin.topics().createPartitionedTopic(topicName, 4);
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
+ .startingBackoffInterval(10, TimeUnit.MILLISECONDS).maxBackoffInterval(10, TimeUnit.MILLISECONDS)
+ .build();
+ Producer producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false).sendTimeout(10, TimeUnit.MINUTES).create();
+ stopBroker();
+ int totalMessages = 10;
+ List> sendResults = Lists.newArrayList();
+ for (int i = 0; i < totalMessages; i++) {
+ sendResults.add(producer.sendAsync("test".getBytes()));
+ }
+ CompletableFuture closeFuture = producer.closeAsync();
+ startBroker();
+ Thread.sleep(1000);
+ try {
+ producer.send("test".getBytes());
+ fail("should have failed because producer is closing");
+ } catch (Exception e) {
+ // Ok.. expected
+ }
+ closeFuture.get();
+ // all messages must be published successfully before it's closed
+ FutureUtil.waitForAll(sendResults).get(10, TimeUnit.SECONDS);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 05ca1a5b6c877..ccb44e7847df5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -52,7 +52,9 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
@@ -124,6 +126,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne
protected volatile long lastSequenceIdPushed;
private volatile boolean isLastSequenceIdPotentialDuplicated;
+ private volatile boolean closingInProgress = false;
+
private final MessageCrypto msgCrypto;
private ScheduledFuture> keyGeneratorTask = null;
@@ -716,6 +720,10 @@ private void doBatchSendAndAdd(MessageImpl> msg, SendCallback callback, ByteBu
}
private boolean isValidProducerState(SendCallback callback, long sequenceId) {
+ if (closingInProgress) {
+ callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed"));
+ return false;
+ }
switch (getState()) {
case Ready:
// OK
@@ -825,6 +833,20 @@ protected WriteInEventLoopCallback newObject(Handle ha
@Override
public CompletableFuture closeAsync() {
+ CompletableFuture closeResult = new CompletableFuture<>();
+ closeAsync(closeResult);
+ return closeResult;
+ }
+
+ private void closeAsync(CompletableFuture closeFuture) {
+ if (!pendingMessages.isEmpty()) {
+ closingInProgress = true;
+ client.timer().newTimeout((timeout) -> {
+ closeAsync(closeFuture);
+ }, 10, TimeUnit.MILLISECONDS);
+ return;
+ }
+
final State currentState = getAndUpdateState(state -> {
if (state == State.Closed) {
return state;
@@ -833,7 +855,8 @@ public CompletableFuture closeAsync() {
});
if (currentState == State.Closed || currentState == State.Closing) {
- return CompletableFuture.completedFuture(null);
+ closeFuture.complete(null);
+ return;
}
Timeout timeout = sendTimeout;
@@ -871,13 +894,13 @@ public CompletableFuture closeAsync() {
pendingMessages.clear();
}
- return CompletableFuture.completedFuture(null);
+ closeFuture.complete(null);
+ return;
}
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);
- CompletableFuture closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
if (exception == null || !cnx.ctx().channel().isActive()) {
@@ -901,8 +924,6 @@ public CompletableFuture closeAsync() {
return null;
});
-
- return closeFuture;
}
@Override