Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-client] add api to wait for inflight messages while closing producer #6648

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@
*/
package org.apache.pulsar.client.api;


import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import static org.testng.Assert.fail;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


import com.google.common.collect.Lists;

public class ProducerCreationTest extends ProducerConsumerBase {

@BeforeMethod
Expand Down Expand Up @@ -92,4 +102,47 @@ public void testGeneratedNameProducerReconnect(TopicDomain domain) throws Pulsar
Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1);
Assert.assertTrue(producer.isConnected());
}

/**
* It tests closing producer waits until it receives acks from the broker.
*
* <pre>
* a. Create producer successfully
* b. stop broker
* c. publish messages
* d. close producer with wait flag
* f. start broker
* g. producer will be closed after receiving ack of in flight published message
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testCloseProducerWaitForInFlightMessages() throws Exception {
final String topicName = "persistent://public/default/closeProducerWait";
admin.topics().createPartitionedTopic(topicName, 4);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.startingBackoffInterval(10, TimeUnit.MILLISECONDS).maxBackoffInterval(10, TimeUnit.MILLISECONDS)
.build();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false).sendTimeout(10, TimeUnit.MINUTES).create();
stopBroker();
int totalMessages = 10;
List<CompletableFuture<MessageId>> sendResults = Lists.newArrayList();
for (int i = 0; i < totalMessages; i++) {
sendResults.add(producer.sendAsync("test".getBytes()));
}
CompletableFuture<Void> closeFuture = producer.closeAsync();
startBroker();
Thread.sleep(1000);
try {
producer.send("test".getBytes());
fail("should have failed because producer is closing");
} catch (Exception e) {
// Ok.. expected
}
closeFuture.get();
// all messages must be published successfully before it's closed
FutureUtil.waitForAll(sendResults).get(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand Down Expand Up @@ -124,6 +126,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
protected volatile long lastSequenceIdPushed;
private volatile boolean isLastSequenceIdPotentialDuplicated;

private volatile boolean closingInProgress = false;

private final MessageCrypto msgCrypto;

private ScheduledFuture<?> keyGeneratorTask = null;
Expand Down Expand Up @@ -716,6 +720,10 @@ private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBu
}

private boolean isValidProducerState(SendCallback callback, long sequenceId) {
if (closingInProgress) {
callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed"));
return false;
}
switch (getState()) {
case Ready:
// OK
Expand Down Expand Up @@ -825,6 +833,20 @@ protected WriteInEventLoopCallback newObject(Handle<WriteInEventLoopCallback> ha

@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeResult = new CompletableFuture<>();
closeAsync(closeResult);
return closeResult;
}

private void closeAsync(CompletableFuture<Void> closeFuture) {
if (!pendingMessages.isEmpty()) {
closingInProgress = true;
client.timer().newTimeout((timeout) -> {
closeAsync(closeFuture);
}, 10, TimeUnit.MILLISECONDS);
return;
}

final State currentState = getAndUpdateState(state -> {
if (state == State.Closed) {
return state;
Expand All @@ -833,7 +855,8 @@ public CompletableFuture<Void> closeAsync() {
});

if (currentState == State.Closed || currentState == State.Closing) {
return CompletableFuture.completedFuture(null);
closeFuture.complete(null);
return;
}

Timeout timeout = sendTimeout;
Expand Down Expand Up @@ -871,13 +894,13 @@ public CompletableFuture<Void> closeAsync() {
pendingMessages.clear();
}

return CompletableFuture.completedFuture(null);
closeFuture.complete(null);
return;
}

long requestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);

CompletableFuture<Void> closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
if (exception == null || !cnx.ctx().channel().isActive()) {
Expand All @@ -901,8 +924,6 @@ public CompletableFuture<Void> closeAsync() {

return null;
});

return closeFuture;
}

@Override
Expand Down