diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index b7805c36b3bf9..55483708fdf6a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.RawReaderImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; /** * Topic reader which receives raw messages (i.e. as they are stored in the managed ledger). @@ -43,6 +44,16 @@ static CompletableFuture create(PulsarClient client, String topic, St return future.thenApply(__ -> r); } + static CompletableFuture create(PulsarClient client, + ConsumerConfigurationData consumerConfiguration, + boolean createTopicIfDoesNotExist) { + CompletableFuture> future = new CompletableFuture<>(); + RawReader r = new RawReaderImpl((PulsarClientImpl) client, + consumerConfiguration, future, createTopicIfDoesNotExist); + return future.thenApply(__ -> r); + } + + /** * Get the topic for the reader. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 3d7ad9f58657d..5ac051d227119 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -65,6 +65,14 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); } + public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData consumerConfiguration, + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist) { + this.consumerConfiguration = consumerConfiguration; + consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); + } + + @Override public String getTopic() { return consumerConfiguration.getTopicNames().stream() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index d3fcc36a54653..d9ddc00b2e863 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; @@ -56,6 +59,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE; + @Test(groups = "broker-impl") @Slf4j public class RawReaderTest extends MockedPulsarServiceBaseTest { @@ -195,6 +200,36 @@ public void testRawReader() throws Exception { reader.closeAsync().get(3, TimeUnit.SECONDS); } + @Test + public void testRawReaderWithConfigurationCreation() throws Exception { + int numKeys = 10; + + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + + Set keys = publishMessages(topic, numKeys); + ConsumerConfigurationData consumerConfiguration = new ConsumerConfigurationData<>(); + consumerConfiguration.getTopicNames().add(topic); + consumerConfiguration.setSubscriptionName(subscription); + consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); + consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE); + consumerConfiguration.setReadCompacted(true); + consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + consumerConfiguration.setAckReceiptEnabled(true); + RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true).get(); + + MessageId lastMessageId = reader.getLastMessageIdAsync().get(); + while (true) { + try (RawMessage m = reader.readNextAsync().get()) { + Assert.assertTrue(keys.remove(extractKey(m))); + if (lastMessageId.compareTo(m.getMessageId()) == 0) { + break; + } + } + } + Assert.assertTrue(keys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); + } + @Test public void testSeekToStart() throws Exception { int numKeys = 10; @@ -279,7 +314,7 @@ public void testSeekToMiddle() throws Exception { */ @Test public void testFlowControl() throws Exception { - int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; + int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5; String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numMessages); @@ -311,7 +346,7 @@ public void testFlowControl() throws Exception { @Test public void testFlowControlBatch() throws Exception { - int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; + int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5; String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numMessages, true);