Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][client] PIP-374: Visibility of messages in receiverQueue for the consumers #23303

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pip/pip-374.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one who has provided t
<!--
Updated afterwards
-->
* Mailing List discussion thread:
* Mailing List voting thread:
* Mailing List discussion thread: https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx
* Mailing List voting thread: https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand Down Expand Up @@ -870,6 +871,101 @@ public void onPartitionsChange(String topicName, int partitions) {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}

@Test(dataProvider = "topicPartition")
public void testConsumerInterceptorForOnArrive(int topicPartition) throws PulsarClientException,
InterruptedException, PulsarAdminException {
String topicName = "persistent://my-property/my-ns/on-arrive";
if (topicPartition > 0) {
admin.topics().createPartitionedTopic(topicName, topicPartition);
}

final int receiveQueueSize = 100;
final int totalNumOfMessages = receiveQueueSize * 2;

// The onArrival method is called for half of the receiveQueueSize messages before beforeConsume is called for all messages.
CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2);
final AtomicInteger onArrivalCount = new AtomicInteger(0);
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {}

@Override
public Message<String> onArrival(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1");
latch.countDown();
onArrivalCount.incrementAndGet();
return msg;
}

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {

}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
}

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("test-arrive")
.intercept(interceptor)
.receiverQueueSize(receiveQueueSize)
.subscribe();

for (int i = 0; i < totalNumOfMessages; i++) {
producer.send("Mock message");
}

// Not call receive message, just wait for onArrival interceptor.
latch.await();
Assert.assertEquals(latch.getCount(), 0);

for (int i = 0; i < totalNumOfMessages; i++) {
Message<String> message = consumer.receive();
MessageImpl<String> msgImpl;
if (message instanceof MessageImpl<String>) {
msgImpl = (MessageImpl<String>) message;
} else if (message instanceof TopicMessageImpl<String>) {
msgImpl = (MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage();
} else {
throw new ClassCastException("Message type is not expected");
}
boolean haveKey = false;
for (KeyValue keyValue : msgImpl.getMessageBuilder().getPropertiesList()) {
if ("onArrival".equals(keyValue.getKey())) {
haveKey = true;
}
}
Assert.assertTrue(haveKey);
}
Assert.assertEquals(totalNumOfMessages, onArrivalCount.get());

producer.close();
consumer.close();
}

private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
*/
void close();

/**
* This method is called when a message arrives in the consumer.
*
* <p>This method provides visibility into the messages that have been received
* by the consumer but have not yet been processed. This can be useful for
* monitoring the state of the consumer's receiver queue and understanding
* the consumer's processing rate.
*
* <p>The method is allowed to modify the message, in which case the modified
* message will be returned.
*
* <p>Any exception thrown by this method will be caught by the caller, logged,
* but not propagated to the client.
*
* <p>Since the consumer may run multiple interceptors, a particular
* interceptor's <tt>onArrival</tt> callback will be called in the order
* specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The
* first interceptor in the list gets the consumed message, the following
* interceptor will be passed the message returned by the previous interceptor,
* and so on. Since interceptors are allowed to modify the message, interceptors
* may potentially get the messages already modified by other interceptors.
* However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects
* caused by interceptors potentially failing to modify the message and throwing
* an exception. If one of the interceptors in the list throws an exception from
* <tt>onArrival</tt>, the exception is caught, logged, and the next interceptor
* is called with the message returned by the last successful interceptor in the
* list, or otherwise the original consumed message.
*
* @param consumer the consumer which contains the interceptor
* @param message the message that has arrived in the receiver queue
* @return the message that is either modified by the interceptor or the same
* message passed into the method
*/
default Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
return message;
}

/**
* This is called just before the message is returned by
* {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,14 @@ public String toString() {
+ '}';
}

protected Message<T> onArrival(Message<T> message) {
if (interceptors != null) {
return interceptors.onArrival(this, message);
} else {
return message;
}
}

protected Message<T> beforeConsume(Message<T> message) {
if (interceptors != null) {
return interceptors.beforeConsume(this, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,9 +1301,10 @@ private void executeNotifyCallback(final MessageImpl<T> message) {
increaseAvailablePermits(cnx());
return;
}
Message<T> interceptMsg = onArrival(message);
if (hasNextPendingReceive()) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingReceivedCallback(interceptMsg, null);
} else if (enqueueMessageAndCheckBatchReceive(interceptMsg) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@ public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
this.interceptors = interceptors;
}


/**
* This method is called when a message arrives in the consumer.
* <p>
* This method calls {@link ConsumerInterceptor#onArrival(Consumer, Message) method for each
* interceptor.
* <p>
* This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets
* caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous
* successful interceptor beforeConsume call.
*
* @param consumer the consumer which contains the interceptors
* @param message message to be consume by the client.
* @return messages that are either modified by interceptors or same as messages passed to this method.
*/
public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
Message<T> interceptorMessage = message;
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
try {
interceptorMessage = interceptors.get(i).onArrival(consumer, interceptorMessage);
} catch (Throwable e) {
if (consumer != null) {
log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}",
consumer.getTopic(), consumer.getConsumerName(), e);
} else {
log.warn("Error executing interceptor beforeConsume callback", e);
}
}
}
return interceptorMessage;
}

/**
* This is called just before the message is returned by {@link Consumer#receive()},
* {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,11 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
return multiTopicInterceptors.onArrival(consumer, message);
}

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
Expand Down
Loading