Skip to content

Commit

Permalink
[Reader] Should set either start message id or start message from rol…
Browse files Browse the repository at this point in the history
…l back duration. (apache#6392)

Currently, when constructing a reader, users can set both start message id and start time. 

This is strange and the behavior should be forbidden.
  • Loading branch information
yjshen authored Feb 24, 2020
1 parent 336e971 commit f862961
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void testReaderWithTimeLong() throws Exception {

// (3) Create reader and set position 1 hour back so, it should only read messages which are 2 hours old which
// published on step 2
Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();

List<MessageId> receivedMessageIds = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ public CompletableFuture<Reader<T>> createAsync() {
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}

if (conf.getStartMessageId() == null) {
if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 ||
conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder"));
.failedFuture(new IllegalArgumentException(
"Start message id or start message from roll back must be specified but they cannot be specified at the same time"));
}

if (conf.getStartMessageFromRollbackDurationInSec() > 0) {
conf.setStartMessageId(MessageId.earliest);
}

return client.createReaderAsync(conf, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -96,5 +100,26 @@ public void readerBuilderLoadConfTest() throws Exception {
assertTrue(obj instanceof ReaderConfigurationData);
assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName);
assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId);
client.close();
}

@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) {
// no-op
} finally {
client.close();
}
}

@Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*")
public void shouldSetOneStartOpt() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
try (Reader reader = client.newReader().topic("abc").create()) {
// no-op
} finally {
client.close();
}
}
}

0 comments on commit f862961

Please sign in to comment.