Skip to content

Commit

Permalink
[fix][client] Fix ReaderBuilder doest not give illegalArgument on con…
Browse files Browse the repository at this point in the history
…nection failure retry (apache#22639)

(cherry picked from commit b56f238)
(cherry picked from commit 8b2d5e9)
rdhabalia authored and srinath-ctds committed May 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 79f8e29 commit 0a02cd9
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -48,6 +50,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
@@ -851,4 +854,28 @@ public void testHasMessageAvailableAfterSeekTimestamp(boolean initializeLastMess
assertTrue(reader.hasMessageAvailable());
}
}

@Test
public void testReaderBuilderStateOnRetryFailure() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testRetryReader";
RetentionPolicies retention = new RetentionPolicies(-1, -1);
admin.namespaces().setRetention(ns, retention);
String badUrl = "pulsar://bad-host:8080";

PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build();

ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100,
TimeUnit.SECONDS);

for (int i = 0; i < 3; i++) {
try {
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.info("It should time out due to invalid url");
} catch (IllegalArgumentException e) {
fail("It should not fail with corrupt reader state");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -86,8 +86,9 @@ public CompletableFuture<Reader<T>> createAsync() {
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}

if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0
|| conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest;
if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0)
|| (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) {
return FutureUtil
.failedFuture(new IllegalArgumentException(
"Start message id or start message from roll back must be specified but they cannot be"
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@ public void readerBuilderLoadConfTest() throws Exception {
@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest)
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest)
.startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
// no-op
} finally {

0 comments on commit 0a02cd9

Please sign in to comment.