Skip to content

Commit 7e221be

Browse files
codelipenghuisrinath-ctds
authored andcommittedNov 21, 2024
[fix][client] fix the beforeConsume() method earlier hit with message listener (apache#23578)
(cherry picked from commit 137df29) (cherry picked from commit c303cc6)
1 parent 92e11ec commit 7e221be

File tree

4 files changed

+82
-2
lines changed

4 files changed

+82
-2
lines changed
 

‎pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java

+78
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,84 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
475475
consumer.close();
476476
}
477477

478+
@Test(dataProvider = "topicPartition")
479+
public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int partitions) throws Exception {
480+
481+
AtomicInteger beforeConsumeCount = new AtomicInteger(0);
482+
PulsarClient client = PulsarClient.builder()
483+
.serviceUrl(lookupUrl.toString())
484+
.listenerThreads(1)
485+
.build();
486+
487+
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<>() {
488+
@Override
489+
public void close() {
490+
}
491+
492+
@Override
493+
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
494+
beforeConsumeCount.incrementAndGet();
495+
log.info("beforeConsume messageId: {}", message.getMessageId());
496+
return message;
497+
}
498+
499+
@Override
500+
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
501+
}
502+
503+
@Override
504+
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
505+
}
506+
507+
@Override
508+
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
509+
}
510+
511+
@Override
512+
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
513+
}
514+
};
515+
516+
final String topicName = "persistent://my-property/my-ns/my-topic";
517+
518+
if (partitions > 0) {
519+
admin.topics().createPartitionedTopic(topicName, partitions);
520+
} else {
521+
admin.topics().createNonPartitionedTopic(topicName);
522+
}
523+
524+
Consumer<String> consumer = client.newConsumer(Schema.STRING)
525+
.topic(topicName)
526+
.subscriptionType(SubscriptionType.Shared)
527+
.intercept(interceptor)
528+
.subscriptionName("my-subscription")
529+
.messageListener((c, m) -> {
530+
// Simulate a long processing time
531+
try {
532+
Thread.sleep(60000);
533+
} catch (InterruptedException e) {
534+
throw new RuntimeException(e);
535+
}
536+
})
537+
.subscribe();
538+
539+
Producer<String> producer = client.newProducer(Schema.STRING)
540+
.topic("persistent://my-property/my-ns/my-topic")
541+
.create();
542+
543+
final int messages = 10;
544+
for (int i = 0; i < messages; i++) {
545+
producer.newMessage().value("Hello Pulsar!").send();
546+
}
547+
Awaitility.await().untilAsserted(() -> {
548+
// Ensure that the interceptor is not hit before the message listener
549+
Assert.assertEquals(beforeConsumeCount.get(), 1);
550+
});
551+
producer.close();
552+
consumer.close();
553+
client.close();
554+
}
555+
478556
@Test
479557
public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
480558

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,7 @@ protected void callMessageListener(Message<T> msg) {
11641164
id = msg.getMessageId();
11651165
}
11661166
unAckedMessageTracker.add(id, msg.getRedeliveryCount());
1167+
beforeConsume(msg);
11671168
listener.received(ConsumerBase.this, msg);
11681169
} catch (Throwable t) {
11691170
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,8 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
505505
return null;
506506
}
507507
messageProcessed(message);
508-
return beforeConsume(message);
508+
message = listener == null ? beforeConsume(message) : message;
509+
return message;
509510
} catch (InterruptedException e) {
510511
ExceptionHandler.handleInterruptedException(e);
511512
State state = getState();

‎pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
396396
decreaseIncomingMessageSize(message);
397397
checkArgument(message instanceof TopicMessageImpl);
398398
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
399-
message = beforeConsume(message);
399+
message = listener == null ? beforeConsume(message) : message;
400400
}
401401
resumeReceivingFromPausedConsumersIfNeeded();
402402
return message;

0 commit comments

Comments
 (0)