Skip to content

Commit

Permalink
[fix][test] Fix flaky SubscriptionSeekTest.testSeekIsByReceive (apach…
Browse files Browse the repository at this point in the history
…e#23170)

(cherry picked from commit a1f3322)
(cherry picked from commit 6434d57)
  • Loading branch information
lhotari authored and srinath-ctds committed Aug 23, 2024
1 parent e3b9cdc commit b463ba4
Showing 1 changed file with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ protected void cleanup() throws Exception {
public void testSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeek";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();

Expand Down Expand Up @@ -137,11 +139,13 @@ public void testSeek() throws Exception {

@Test
public void testSeekIsByReceive() throws PulsarClientException {
final String topicName = "persistent://prop/use/ns-abc/testSeek";
final String topicName = "persistent://prop/use/ns-abc/testSeekIsByReceive";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

String subscriptionName = "my-subscription";
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName)
.subscribe();
Expand All @@ -163,6 +167,7 @@ public void testSeekForBatch() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand All @@ -189,6 +194,7 @@ public void testSeekForBatch() throws Exception {
producer.close();


@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -219,6 +225,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchMessageAndSpecifiedBatchIndex";
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand Down Expand Up @@ -263,6 +270,7 @@ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
.serviceUrl(lookupUrl.toString())
.build();

@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -299,6 +307,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
String subscriptionName = "my-subscription-batch";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
Expand All @@ -324,7 +333,7 @@ public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionExc

producer.close();


@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
Expand Down Expand Up @@ -380,6 +389,7 @@ public void testConcurrentResetCursor() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentReset_" + System.currentTimeMillis();
final String subscriptionName = "test-sub-name";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
Expand Down Expand Up @@ -429,6 +439,7 @@ public void testSeekOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";

admin.topics().createPartitionedTopic(topicName, 2);
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();

Expand All @@ -446,9 +457,11 @@ public void testSeekTime() throws Exception {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").receiverQueueSize(0).subscribe();

Expand Down Expand Up @@ -482,6 +495,7 @@ public void testSeekTimeByFunction() throws Exception {
int msgNum = 20;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();
Expand Down Expand Up @@ -529,6 +543,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
// Disable pre-fetch in consumer to track the messages received
org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
Expand Down Expand Up @@ -582,12 +597,14 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.subscribe();

@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
Expand All @@ -614,20 +631,20 @@ public void testShouldCloseAllConsumersForMultipleConsumerDispatcherWhenSeek() t
for (Consumer consumer : consumers) {
assertFalse(connectedSinceSet.contains(consumer.getStats().getConnectedSince()));
}
consumer1.close();
consumer2.close();
}

@Test
public void testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testOnlyCloseActiveConsumerForSingleActiveConsumerDispatcherWhenSeek";
// Disable pre-fetch in consumer to track the messages received
@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-subscription")
.subscribe();

@Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Failover)
Expand Down Expand Up @@ -667,11 +684,13 @@ public void testSeekByFunction() throws Exception {
int msgNum = 160;
admin.topics().createPartitionedTopic(topicName, partitionNum);
creatProducerAndSendMsg(topicName, msgNum);
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topic(topicName).subscriptionName("my-sub").subscribe();

TopicName partitionedTopic = TopicName.get(topicName);
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.startMessageId(MessageId.earliest)
.topic(partitionedTopic.getPartition(0).toString()).create();
Expand Down Expand Up @@ -720,12 +739,11 @@ public void testSeekByFunction() throws Exception {
for (MessageId messageId : msgNotIn) {
assertFalse(received.contains(messageId));
}
reader.close();
consumer.close();
}

private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.enableBatching(false)
Expand All @@ -734,7 +752,6 @@ private List<MessageId> creatProducerAndSendMsg(String topic, int msgNum) throws
for (int i = 0; i < msgNum; i++) {
messageIds.add(producer.send("msg" + i));
}
producer.close();
return messageIds;
}

Expand All @@ -755,6 +772,7 @@ public void testSeekByFunctionAndMultiTopic() throws Exception {
MessageId msgIdInTopic2Partition0 = admin.topics().getLastMessageId(topic2.getPartition(0).toString());
MessageId msgIdInTopic2Partition2 = admin.topics().getLastMessageId(topic2.getPartition(2).toString());

@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
Expand Down Expand Up @@ -795,6 +813,7 @@ public void testSeekWillNotEncounteredFencedError() throws Exception {
// Create a pulsar client with a subscription fenced counter.
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
@Cleanup
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
new ClientCnx(conf, eventLoopGroup) {
protected void handleError(CommandError error) {
Expand All @@ -806,10 +825,13 @@ protected void handleError(CommandError error) {
});

// publish some messages.
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("s1")
.subscribe();

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topicName).create();
MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
Expand Down Expand Up @@ -849,6 +871,7 @@ protected void handleError(CommandError error) {
public void testExceptionBySeekFunction() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
creatProducerAndSendMsg(topicName,10);
@Cleanup
org.apache.pulsar.client.api.Consumer consumer = pulsarClient
.newConsumer()
.topic(topicName).subscriptionName("my-sub").subscribe();
Expand Down

0 comments on commit b463ba4

Please sign in to comment.