Skip to content

Commit

Permalink
[improve] [broker] Support create RawReader based on configuration (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 authored and Technoboy- committed Mar 18, 2024
1 parent 5fae440 commit f1bdb78
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawReaderImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

/**
* Topic reader which receives raw messages (i.e. as they are stored in the managed ledger).
Expand All @@ -43,6 +44,16 @@ static CompletableFuture<RawReader> create(PulsarClient client, String topic, St
return future.thenApply(__ -> r);
}

static CompletableFuture<RawReader> create(PulsarClient client,
ConsumerConfigurationData<byte[]> consumerConfiguration,
boolean createTopicIfDoesNotExist) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client,
consumerConfiguration, future, createTopicIfDoesNotExist);
return future.thenApply(__ -> r);
}


/**
* Get the topic for the reader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
}

public RawReaderImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> consumerConfiguration,
CompletableFuture<Consumer<byte[]>> consumerFuture,
boolean createTopicIfDoesNotExist) {
this.consumerConfiguration = consumerConfiguration;
consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
}


@Override
public String getTopic() {
return consumerConfiguration.getTopicNames().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -56,6 +59,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;

@Test(groups = "broker-impl")
@Slf4j
public class RawReaderTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -195,6 +200,36 @@ public void testRawReader() throws Exception {
reader.closeAsync().get(3, TimeUnit.SECONDS);
}

@Test
public void testRawReaderWithConfigurationCreation() throws Exception {
int numKeys = 10;

String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");

Set<String> keys = publishMessages(topic, numKeys);
ConsumerConfigurationData<byte[]> consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);
RawReader reader = RawReader.create(pulsarClient, consumerConfiguration, true).get();

MessageId lastMessageId = reader.getLastMessageIdAsync().get();
while (true) {
try (RawMessage m = reader.readNextAsync().get()) {
Assert.assertTrue(keys.remove(extractKey(m)));
if (lastMessageId.compareTo(m.getMessageId()) == 0) {
break;
}
}
}
Assert.assertTrue(keys.isEmpty());
reader.closeAsync().get(3, TimeUnit.SECONDS);
}

@Test
public void testSeekToStart() throws Exception {
int numKeys = 10;
Expand Down Expand Up @@ -279,7 +314,7 @@ public void testSeekToMiddle() throws Exception {
*/
@Test
public void testFlowControl() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");

publishMessages(topic, numMessages);
Expand Down Expand Up @@ -311,7 +346,7 @@ public void testFlowControl() throws Exception {

@Test
public void testFlowControlBatch() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");

publishMessages(topic, numMessages, true);
Expand Down

0 comments on commit f1bdb78

Please sign in to comment.