diff --git a/pip/pip-374.md b/pip/pip-374.md
index 4264617647433..49fe337159628 100644
--- a/pip/pip-374.md
+++ b/pip/pip-374.md
@@ -67,5 +67,5 @@ Since we added a default method onArrival() in interface, one who has provided t
 <!--
 Updated afterwards
 -->
-* Mailing List discussion thread:
-* Mailing List voting thread:
+* Mailing List discussion thread: https://lists.apache.org/thread/hcfpm4j6hpwxb2olfrro8g4dls35q8rx
+* Mailing List voting thread: https://lists.apache.org/thread/wrr02s4cdzqmo1vonp92w6229qo0rv0z
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
index afb17a186477c..8115f34121d3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -33,6 +33,7 @@
 import lombok.Cleanup;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.KeyValue;
@@ -870,6 +871,101 @@ public void onPartitionsChange(String topicName, int partitions) {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    @Test(dataProvider = "topicPartition")
+    public void testConsumerInterceptorForOnArrive(int topicPartition) throws PulsarClientException,
+            InterruptedException, PulsarAdminException {
+        String topicName = "persistent://my-property/my-ns/on-arrive";
+        if (topicPartition > 0) {
+            admin.topics().createPartitionedTopic(topicName, topicPartition);
+        }
+
+        final int receiveQueueSize = 100;
+        final int totalNumOfMessages = receiveQueueSize * 2;
+
+        // The onArrival method is called for half of the receiveQueueSize messages before beforeConsume is called for all messages.
+        CountDownLatch latch = new CountDownLatch(receiveQueueSize / 2);
+        final AtomicInteger onArrivalCount = new AtomicInteger(0);
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {}
+
+            @Override
+            public Message<String> onArrival(Consumer<String> consumer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                msg.getMessageBuilder().addProperty().setKey("onArrival").setValue("1");
+                latch.countDown();
+                onArrivalCount.incrementAndGet();
+                return msg;
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                return message;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+
+            }
+
+            @Override
+            public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+            }
+
+            @Override
+            public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("test-arrive")
+                .intercept(interceptor)
+                .receiverQueueSize(receiveQueueSize)
+                .subscribe();
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            producer.send("Mock message");
+        }
+
+        // Not call receive message, just wait for onArrival interceptor.
+        latch.await();
+        Assert.assertEquals(latch.getCount(), 0);
+
+        for (int i = 0; i < totalNumOfMessages; i++) {
+            Message<String> message = consumer.receive();
+            MessageImpl<String> msgImpl;
+            if (message instanceof MessageImpl<String>) {
+                msgImpl = (MessageImpl<String>) message;
+            } else if (message instanceof TopicMessageImpl<String>) {
+                msgImpl = (MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage();
+            } else {
+                throw new ClassCastException("Message type is not expected");
+            }
+            boolean haveKey = false;
+            for (KeyValue keyValue : msgImpl.getMessageBuilder().getPropertiesList()) {
+                if ("onArrival".equals(keyValue.getKey())) {
+                    haveKey = true;
+                }
+            }
+            Assert.assertTrue(haveKey);
+        }
+        Assert.assertEquals(totalNumOfMessages, onArrivalCount.get());
+
+        producer.close();
+        consumer.close();
+    }
+
     private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
             throws PulsarClientException {
         for (int i = 0; i < msgCount; i++) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
index be2f9b0f10826..1beea3adba239 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -41,6 +41,44 @@ public interface ConsumerInterceptor<T> extends AutoCloseable {
      */
     void close();
 
+    /**
+     * This method is called when a message arrives in the consumer.
+     *
+     * <p>This method provides visibility into the messages that have been received
+     * by the consumer but have not yet been processed. This can be useful for
+     * monitoring the state of the consumer's receiver queue and understanding
+     * the consumer's processing rate.
+     *
+     * <p>The method is allowed to modify the message, in which case the modified
+     * message will be returned.
+     *
+     * <p>Any exception thrown by this method will be caught by the caller, logged,
+     * but not propagated to the client.
+     *
+     * <p>Since the consumer may run multiple interceptors, a particular
+     * interceptor's <tt>onArrival</tt> callback will be called in the order
+     * specified by {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The
+     * first interceptor in the list gets the consumed message, the following
+     * interceptor will be passed the message returned by the previous interceptor,
+     * and so on. Since interceptors are allowed to modify the message, interceptors
+     * may potentially get the messages already modified by other interceptors.
+     * However, building a pipeline of mutable interceptors that depend on the output
+     * of the previous interceptor is discouraged, because of potential side-effects
+     * caused by interceptors potentially failing to modify the message and throwing
+     * an exception. If one of the interceptors in the list throws an exception from
+     * <tt>onArrival</tt>, the exception is caught, logged, and the next interceptor
+     * is called with the message returned by the last successful interceptor in the
+     * list, or otherwise the original consumed message.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param message the message that has arrived in the receiver queue
+     * @return the message that is either modified by the interceptor or the same
+     *         message passed into the method
+     */
+    default Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+        return message;
+    }
+
     /**
      * This is called just before the message is returned by
      * {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9748a42f0cb2b..03256a3e139b6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -852,6 +852,14 @@ public String toString() {
                 + '}';
     }
 
+    protected Message<T> onArrival(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.onArrival(this, message);
+        } else {
+            return message;
+        }
+    }
+
     protected Message<T> beforeConsume(Message<T> message) {
         if (interceptors != null) {
             return interceptors.beforeConsume(this, message);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 996569704d712..60b9d145c4897 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1301,9 +1301,10 @@ private void executeNotifyCallback(final MessageImpl<T> message) {
                 increaseAvailablePermits(cnx());
                 return;
             }
+            Message<T> interceptMsg = onArrival(message);
             if (hasNextPendingReceive()) {
-                notifyPendingReceivedCallback(message, null);
-            } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
+                notifyPendingReceivedCallback(interceptMsg, null);
+            } else if (enqueueMessageAndCheckBatchReceive(interceptMsg) && hasPendingBatchReceive()) {
                 notifyPendingBatchReceivedCallBack();
             }
         });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
index 832dc0bacaee9..dd1e2cec3b3ef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -44,6 +44,38 @@ public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
         this.interceptors = interceptors;
     }
 
+
+    /**
+     * This method is called when a message arrives in the consumer.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onArrival(Consumer, Message) method for each
+     * interceptor.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets
+     * caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous
+     * successful interceptor beforeConsume call.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param message message to be consume by the client.
+     * @return messages that are either modified by interceptors or same as messages passed to this method.
+     */
+    public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
+            try {
+                interceptorMessage = interceptors.get(i).onArrival(consumer, interceptorMessage);
+            } catch (Throwable e) {
+                if (consumer != null) {
+                    log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}",
+                            consumer.getTopic(), consumer.getConsumerName(), e);
+                } else {
+                    log.warn("Error executing interceptor beforeConsume callback", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
     /**
      * This is called just before the message is returned by {@link Consumer#receive()},
      * {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index bf8bd6cc95117..513c0101ac6ac 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1608,6 +1608,11 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
     private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
         return new ConsumerInterceptors<T>(new ArrayList<>()) {
 
+            @Override
+            public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
+                return multiTopicInterceptors.onArrival(consumer, message);
+            }
+
             @Override
             public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
                 return message;