Skip to content

Commit c07b158

Browse files
authored
[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer (apache#23141)
1 parent 3e461c0 commit c07b158

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import com.google.common.collect.Sets;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.HashMap;
@@ -29,8 +30,6 @@
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.concurrent.atomic.AtomicInteger;
32-
33-
import com.google.common.collect.Sets;
3433
import lombok.Cleanup;
3534
import org.apache.commons.lang3.RandomStringUtils;
3635
import org.apache.commons.lang3.RandomUtils;
@@ -79,6 +78,12 @@ public Object[][] getTopicPartition() {
7978
return new Object[][] {{ 0 }, { 3 }};
8079
}
8180

81+
@DataProvider(name = "topics")
82+
public Object[][] getTopics() {
83+
return new Object[][] {{ List.of("persistent://my-property/my-ns/my-topic") },
84+
{ List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }};
85+
}
86+
8287
@Test
8388
public void testProducerInterceptor() throws Exception {
8489
Map<MessageId, List<String>> ackCallback = new HashMap<>();
@@ -403,9 +408,9 @@ public void close() {
403408

404409
@Override
405410
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
406-
MessageImpl<String> msg = (MessageImpl<String>) message;
411+
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
407412
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
408-
return msg;
413+
return message;
409414
}
410415

411416
@Override
@@ -449,13 +454,19 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
449454

450455
int keyCount = 0;
451456
for (int i = 0; i < 2; i++) {
452-
Message<String> received = consumer.receive();
457+
Message<String> received;
458+
if (i % 2 == 0) {
459+
received = consumer.receive();
460+
} else {
461+
received = consumer.receiveAsync().join();
462+
}
453463
MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
454464
for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
455465
if ("beforeConsumer".equals(keyValue.getKey())) {
456466
keyCount++;
457467
}
458468
}
469+
Assert.assertEquals(keyCount, i + 1);
459470
consumer.acknowledge(received);
460471
}
461472
Assert.assertEquals(2, keyCount);
@@ -475,9 +486,9 @@ public void close() {
475486

476487
@Override
477488
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
478-
MessageImpl<String> msg = (MessageImpl<String>) message;
489+
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
479490
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
480-
return msg;
491+
return message;
481492
}
482493

483494
@Override
@@ -612,8 +623,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
612623
consumer.close();
613624
}
614625

615-
@Test
616-
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
626+
@Test(dataProvider = "topics")
627+
public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException {
617628
final int totalNumOfMessages = 100;
618629
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
619630

@@ -640,6 +651,7 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message
640651

641652
@Override
642653
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
654+
Assert.assertTrue(latch.getCount() > 0);
643655
messageIds.forEach(messageId -> latch.countDown());
644656
}
645657

@@ -650,15 +662,15 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
650662
};
651663

652664
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
653-
.topic("persistent://my-property/my-ns/my-topic")
665+
.topics(topics)
654666
.subscriptionType(SubscriptionType.Failover)
655667
.intercept(interceptor)
656668
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
657669
.subscriptionName("my-subscription")
658670
.subscribe();
659671

660672
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
661-
.topic("persistent://my-property/my-ns/my-topic")
673+
.topic(topics.get(0))
662674
.create();
663675

664676
for (int i = 0; i < totalNumOfMessages; i++) {
@@ -682,8 +694,9 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
682694
consumer.close();
683695
}
684696

685-
@Test
686-
public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
697+
@Test(dataProvider = "topics")
698+
public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException,
699+
InterruptedException {
687700
final int totalNumOfMessages = 100;
688701
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);
689702

@@ -714,16 +727,17 @@ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> message
714727

715728
@Override
716729
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
730+
Assert.assertTrue(latch.getCount() > 0);
717731
messageIds.forEach(messageId -> latch.countDown());
718732
}
719733
};
720734

721735
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
722-
.topic("persistent://my-property/my-ns/my-topic")
736+
.topic(topics.get(0))
723737
.create();
724738

725739
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
726-
.topic("persistent://my-property/my-ns/my-topic")
740+
.topics(topics)
727741
.subscriptionName("foo")
728742
.intercept(interceptor)
729743
.ackTimeout(2, TimeUnit.SECONDS)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.collect.Lists;
2727
import io.netty.util.Timeout;
2828
import io.netty.util.TimerTask;
29+
import java.io.IOException;
2930
import java.util.ArrayList;
3031
import java.util.Collection;
3132
import java.util.Collections;
@@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
108109
private final MessageIdAdv startMessageId;
109110
private volatile boolean duringSeek = false;
110111
private final long startMessageRollbackDurationInSec;
112+
private final ConsumerInterceptors<T> internalConsumerInterceptors;
111113
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
112114
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
113115
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
@@ -137,6 +139,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
137139
long startMessageRollbackDurationInSec) {
138140
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
139141
schema, interceptors);
142+
if (interceptors != null) {
143+
this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors);
144+
} else {
145+
this.internalConsumerInterceptors = null;
146+
}
140147

141148
checkArgument(conf.getReceiverQueueSize() > 0,
142149
"Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -316,7 +323,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
316323
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
317324
if (receivedFuture != null) {
318325
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
319-
completePendingReceive(receivedFuture, topicMessage);
326+
final Message<T> interceptMessage = beforeConsume(topicMessage);
327+
completePendingReceive(receivedFuture, interceptMessage);
320328
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
321329
notifyPendingBatchReceivedCallBack();
322330
}
@@ -369,7 +377,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
369377
checkState(message instanceof TopicMessageImpl);
370378
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
371379
resumeReceivingFromPausedConsumersIfNeeded();
372-
return message;
380+
return beforeConsume(message);
373381
} catch (Exception e) {
374382
ExceptionHandler.handleInterruptedException(e);
375383
throw PulsarClientException.unwrap(e);
@@ -388,6 +396,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
388396
decreaseIncomingMessageSize(message);
389397
checkArgument(message instanceof TopicMessageImpl);
390398
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
399+
message = beforeConsume(message);
391400
}
392401
resumeReceivingFromPausedConsumersIfNeeded();
393402
return message;
@@ -447,7 +456,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
447456
checkState(message instanceof TopicMessageImpl);
448457
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
449458
resumeReceivingFromPausedConsumersIfNeeded();
450-
result.complete(message);
459+
result.complete(beforeConsume(message));
451460
}
452461
});
453462
return result;
@@ -1185,7 +1194,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
11851194
return ConsumerImpl.newConsumerImpl(client, partitionName,
11861195
configurationData, client.externalExecutorProvider(),
11871196
partitionIndex, true, listener != null, subFuture,
1188-
startMessageId, schema, interceptors,
1197+
startMessageId, schema, this.internalConsumerInterceptors,
11891198
createIfDoesNotExist, startMessageRollbackDurationInSec);
11901199
}
11911200

@@ -1595,4 +1604,45 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
15951604
return list;
15961605
});
15971606
}
1607+
1608+
private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
1609+
return new ConsumerInterceptors<T>(new ArrayList<>()) {
1610+
1611+
@Override
1612+
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
1613+
return message;
1614+
}
1615+
1616+
@Override
1617+
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
1618+
multiTopicInterceptors.onAcknowledge(consumer, messageId, exception);
1619+
}
1620+
1621+
@Override
1622+
public void onAcknowledgeCumulative(Consumer<T> consumer,
1623+
MessageId messageId, Throwable exception) {
1624+
multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception);
1625+
}
1626+
1627+
@Override
1628+
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
1629+
multiTopicInterceptors.onNegativeAcksSend(consumer, set);
1630+
}
1631+
1632+
@Override
1633+
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
1634+
multiTopicInterceptors.onAckTimeoutSend(consumer, set);
1635+
}
1636+
1637+
@Override
1638+
public void onPartitionsChange(String topicName, int partitions) {
1639+
multiTopicInterceptors.onPartitionsChange(topicName, partitions);
1640+
}
1641+
1642+
@Override
1643+
public void close() throws IOException {
1644+
multiTopicInterceptors.close();
1645+
}
1646+
};
1647+
}
15981648
}

0 commit comments

Comments
 (0)