Skip to content

Commit

Permalink
[pulsar-client] add api to wait for inflight messages while closing p…
Browse files Browse the repository at this point in the history
…roducer
  • Loading branch information
rdhabalia committed Apr 2, 2020
1 parent c955ff9 commit 9c7ed7a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@
*/
package org.apache.pulsar.client.api;


import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import static org.testng.Assert.fail;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


import com.google.common.collect.Lists;

public class ProducerCreationTest extends ProducerConsumerBase {

@BeforeMethod
Expand Down Expand Up @@ -92,4 +102,47 @@ public void testGeneratedNameProducerReconnect(TopicDomain domain) throws Pulsar
Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1);
Assert.assertTrue(producer.isConnected());
}

/**
* It tests closing producer waits until it receives acks from the broker.
*
* <pre>
* a. Create producer successfully
* b. stop broker
* c. publish messages
* d. close producer with wait flag
* f. start broker
* g. producer will be closed after receiving ack of in flight published message
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testCloseProducerWaitForInFlightMessages() throws Exception {
final String topicName = "persistent://public/default/closeProducerWait";
admin.topics().createPartitionedTopic(topicName, 4);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.startingBackoffInterval(10, TimeUnit.MILLISECONDS).maxBackoffInterval(10, TimeUnit.MILLISECONDS)
.build();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false).sendTimeout(10, TimeUnit.MINUTES).create();
stopBroker();
int totalMessages = 10;
List<CompletableFuture<MessageId>> sendResults = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
sendResults.add(producer.sendAsync("test".getBytes()));
}
CompletableFuture<Void> closeFuture = producer.closeAsync(true);
startBroker();
Thread.sleep(1000);
try {
producer.send("test".getBytes());
fail("should have failed because producer is closing");
} catch (Exception e) {
// Ok.. expected
}
closeFuture.get();
// all messages must be published successfully before it's closed
FutureUtil.waitForAll(sendResults).get(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public interface Producer<T> extends Closeable {
void close() throws PulsarClientException;

/**
* Close the producer and releases resources allocated.
* Close the producer immediately and releases resources allocated.
*
* <p>No more writes will be accepted from this producer. Waits until all pending write request are persisted.
* In case of errors, pending writes will not be retried.
Expand All @@ -167,6 +167,16 @@ public interface Producer<T> extends Closeable {
*/
CompletableFuture<Void> closeAsync();

/**
* Close the producer and releases resources allocated. If waitForPendingMessage flag is true then producer waits in
* flight messages to be acknowledged before closing the producer resources.
*
* <p>No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
* of errors, pending writes will not be retried.
*
* @return a future that can used to track when the producer has been closed
*/
CompletableFuture<Void> closeAsync(boolean waitForPendingMessage);
/**
* @return Whether the producer is currently connected to the broker
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ public boolean isConnected() {

@Override
public CompletableFuture<Void> closeAsync() {
return closeAsync(false);
}

@Override
public CompletableFuture<Void> closeAsync(boolean waitForPendingMessages) {
if (getState() == State.Closing || getState() == State.Closed) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -218,7 +223,7 @@ public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
for (Producer<T> producer : producers) {
if (producer != null) {
producer.closeAsync().handle((closed, ex) -> {
producer.closeAsync(waitForPendingMessages).handle((closed, ex) -> {
if (ex != null) {
closeFail.compareAndSet(null, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand Down Expand Up @@ -119,6 +121,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
protected volatile long lastSequenceIdPushed;
private volatile boolean isLastSequenceIdPotentialDuplicated;

public static final int FALSE = 0;
public static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ProducerImpl> CLOSING_IN_PROGRESS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ProducerImpl.class, "closingInProgress");
@SuppressWarnings("unused")
private volatile int closingInProgress = FALSE;

private final MessageCrypto msgCrypto;

private ScheduledFuture<?> keyGeneratorTask = null;
Expand Down Expand Up @@ -627,6 +636,10 @@ private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBu
}

private boolean isValidProducerState(SendCallback callback) {
if (closingInProgress == TRUE) {
callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed"));
return false;
}
switch (getState()) {
case Ready:
// OK
Expand Down Expand Up @@ -722,6 +735,25 @@ protected WriteInEventLoopCallback newObject(Handle<WriteInEventLoopCallback> ha

@Override
public CompletableFuture<Void> closeAsync() {
return closeAsync(false);
}

@Override
public CompletableFuture<Void> closeAsync(boolean waitForPendingMessage) {
CompletableFuture<Void> closeResult = new CompletableFuture<>();
closeAsync(waitForPendingMessage, closeResult);
return closeResult;
}

private void closeAsync(boolean waitForPendingMessage, CompletableFuture<Void> closeFuture) {
if (waitForPendingMessage && !pendingMessages.isEmpty()) {
CLOSING_IN_PROGRESS_UPDATER.set(this, TRUE);
client.timer().newTimeout((timeout) -> {
closeAsync(true, closeFuture);
}, 10, TimeUnit.MILLISECONDS);
return;
}

final State currentState = getAndUpdateState(state -> {
if (state == State.Closed) {
return state;
Expand All @@ -730,7 +762,8 @@ public CompletableFuture<Void> closeAsync() {
});

if (currentState == State.Closed || currentState == State.Closing) {
return CompletableFuture.completedFuture(null);
closeFuture.complete(null);
return;
}

Timeout timeout = sendTimeout;
Expand Down Expand Up @@ -768,13 +801,13 @@ public CompletableFuture<Void> closeAsync() {
pendingMessages.clear();
}

return CompletableFuture.completedFuture(null);
closeFuture.complete(null);
return;
}

long requestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);

CompletableFuture<Void> closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
if (exception == null || !cnx.ctx().channel().isActive()) {
Expand All @@ -798,8 +831,6 @@ public CompletableFuture<Void> closeAsync() {

return null;
});

return closeFuture;
}

@Override
Expand Down

0 comments on commit 9c7ed7a

Please sign in to comment.