diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index ae1a4dba17257..99aec9355b4c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -111,7 +111,7 @@ static class RawConsumerImpl extends ConsumerImpl { consumerFuture, SubscriptionMode.Durable, MessageId.earliest, - Schema.BYTES); + Schema.BYTES, null); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index f8ac40a945f80..8c175d7326e33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -24,6 +24,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.matches; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -36,7 +37,6 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -80,7 +80,6 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -1224,7 +1223,7 @@ public void testClosingReplicationProducerTwice() throws Exception { verify(clientImpl) .createProducerAsync( any(ProducerConfigurationData.class), - any(Schema.class) + any(Schema.class), eq(null) ); replicator.disconnect(false); @@ -1235,7 +1234,7 @@ public void testClosingReplicationProducerTwice() throws Exception { verify(clientImpl, Mockito.times(2)) .createProducerAsync( any(ProducerConfigurationData.class), - any(Schema.class) + any(Schema.class), any(null) ); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index de4992594d4c6..528ff64d7e806 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -242,7 +243,7 @@ public void testConcurrentReplicator() throws Exception { Thread.sleep(3000); Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class), - Mockito.any(Schema.class)); + Mockito.any(Schema.class), eq(null)); client1.shutdown(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java index 0d6d3036efbc5..92bb64175ad9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.v1; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -241,7 +242,7 @@ public void testConcurrentReplicator() throws Exception { Thread.sleep(3000); Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class), - Mockito.any(Schema.class)); + Mockito.any(Schema.class), eq(null)); client1.shutdown(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java new file mode 100644 index 0000000000000..d83384698e32d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +public class InterceptorsTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testProducerInterceptor() throws PulsarClientException { + ProducerInterceptor interceptor1 = new ProducerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeSend(Producer producer, Message message) { + MessageImpl msg = (MessageImpl) message; + log.info("Before send message: {}", new String(msg.getData())); + java.util.List properties = msg.getMessageBuilder().getPropertiesList(); + for (int i = 0; i < properties.size(); i++) { + if ("key".equals(properties.get(i).getKey())) { + msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("after").build()); + } + } + return msg; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable cause) { + message.getProperties(); + Assert.assertEquals("complete", message.getProperty("key")); + log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause); + } + }; + + ProducerInterceptor interceptor2 = new ProducerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeSend(Producer producer, Message message) { + MessageImpl msg = (MessageImpl) message; + log.info("Before send message: {}", new String(msg.getData())); + java.util.List properties = msg.getMessageBuilder().getPropertiesList(); + for (int i = 0; i < properties.size(); i++) { + if ("key".equals(properties.get(i).getKey())) { + msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("complete").build()); + } + } + return msg; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable cause) { + message.getProperties(); + Assert.assertEquals("complete", message.getProperty("key")); + log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause); + } + }; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .intercept(interceptor1, interceptor2) + .create(); + + MessageId messageId = producer.newMessage().property("key", "before").value("Hello Pulsar!").send(); + log.info("Send result messageId: {}", messageId); + producer.close(); + } + + @Test + public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException { + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + MessageImpl msg = (MessageImpl) message; + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build()); + return msg; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledge messageId: {}", messageId, cause); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause); + } + }; + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + producer.newMessage().value("Hello Pulsar!").send(); + + Message received = consumer.receive(); + MessageImpl msg = (MessageImpl) received; + boolean haveKey = false; + for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { + if ("beforeConsumer".equals(keyValue.getKey())) { + haveKey = true; + } + } + Assert.assertTrue(haveKey); + consumer.acknowledge(received); + producer.close(); + consumer.close(); + } + + @Test + public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException { + + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + MessageImpl msg = (MessageImpl) message; + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build()); + return msg; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledge messageId: {}", messageId, cause); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause); + } + }; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + Producer producer1 = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic1") + .create(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .subscribe(); + + producer.newMessage().value("Hello Pulsar!").send(); + producer1.newMessage().value("Hello Pulsar!").send(); + + int keyCount = 0; + for (int i = 0; i < 2; i++) { + Message received = consumer.receive(); + MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); + for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { + if ("beforeConsumer".equals(keyValue.getKey())) { + keyCount++; + } + } + consumer.acknowledge(received); + } + Assert.assertEquals(2, keyCount); + producer.close(); + producer1.close(); + consumer.close(); + } + + @Test + public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException { + + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + MessageImpl msg = (MessageImpl) message; + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build()); + return msg; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledge messageId: {}", messageId, cause); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause); + } + }; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + Producer producer1 = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic1") + .create(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topicsPattern("persistent://my-property/my-ns/my-.*") + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .subscribe(); + + producer.newMessage().value("Hello Pulsar!").send(); + producer1.newMessage().value("Hello Pulsar!").send(); + + int keyCount = 0; + for (int i = 0; i < 2; i++) { + Message received = consumer.receive(); + MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); + for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { + if ("beforeConsumer".equals(keyValue.getKey())) { + keyCount++; + } + } + consumer.acknowledge(received); + } + Assert.assertEquals(2, keyCount); + producer.close(); + producer1.close(); + consumer.close(); + } + + @Test + public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException { + + List ackHolder = new ArrayList<>(); + + ConsumerInterceptor interceptor = new ConsumerInterceptor() { + @Override + public void close() { + + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + MessageImpl msg = (MessageImpl) message; + msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build()); + return msg; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + log.info("onAcknowledge messageId: {}", messageId, cause); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + long acknowledged = ackHolder.stream().filter(m -> (m.compareTo(messageId) <= 0)).count(); + Assert.assertEquals(acknowledged, 100); + ackHolder.clear(); + log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause); + } + }; + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .subscriptionType(SubscriptionType.Failover) + .intercept(interceptor) + .subscriptionName("my-subscription") + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + for (int i = 0; i < 100; i++) { + producer.newMessage().value("Hello Pulsar!").send(); + } + + int keyCount = 0; + for (int i = 0; i < 100; i++) { + Message received = consumer.receive(); + MessageImpl msg = (MessageImpl) received; + for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { + if ("beforeConsumer".equals(keyValue.getKey())) { + keyCount++; + } + } + ackHolder.add(received.getMessageId()); + if (i == 99) { + consumer.acknowledgeCumulative(received); + } + } + Assert.assertEquals(100, keyCount); + producer.close(); + consumer.close(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java index 69d885fb78e41..f5a926602df66 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -288,4 +288,10 @@ public interface Consumer extends Closeable { * @return Whether the consumer is connected to the broker */ boolean isConnected(); + + /** + * Get the name of consumer. + * @return consumer name. + */ + String getConsumerName(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 8657859103c51..da1cbb5d17b9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -330,4 +330,12 @@ public interface ConsumerBuilder extends Cloneable { * Set subscriptionInitialPosition for the consumer */ ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition); + + /** + * Intercept {@link Consumer}. + * + * @param interceptors the list of interceptors to intercept the consumer created by this builder. + * @return consumer builder. + */ + ConsumerBuilder intercept(ConsumerInterceptor ...interceptors); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java new file mode 100644 index 0000000000000..1134d8a2b4d42 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.List; + +/** + * A plugin interface that allows you to intercept (and possibly mutate) + * messages received by the consumer. + *

+ * A primary use case is to hook into consumer applications for custom + * monitoring, logging, etc. + *

+ * Exceptions thrown by interceptor methods will be caught, logged, but + * not propagated further. + */ +public interface ConsumerInterceptor extends AutoCloseable { + + /** + * Close the interceptor. + */ + void close(); + + /** + * This is called just before the message is returned by + * {@link Consumer#receive()}, {@link MessageListener#received(Consumer, + * Message)} or the {@link java.util.concurrent.CompletableFuture} returned by + * {@link Consumer#receiveAsync()} completes. + *

+ * This method is allowed to modify message, in which case the new message + * will be returned. + *

+ * Any exception thrown by this method will be caught by the caller, logged, + * but not propagated to client. + *

+ * Since the consumer may run multiple interceptors, a particular + * interceptor's + * beforeConsume callback will be called in the order specified by + * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first + * interceptor in the list gets the consumed message, the following + * interceptor will be passed + * the message returned by the previous interceptor, and so on. Since + * interceptors are allowed to modify message, interceptors may potentially + * get the messages already modified by other interceptors. However building a + * pipeline of mutable + * interceptors that depend on the output of the previous interceptor is + * discouraged, because of potential side-effects caused by interceptors + * potentially failing to modify the message and throwing an exception. + * if one of interceptors in the list throws an exception from + * beforeConsume, the exception is caught, logged, + * and the next interceptor is called with the message returned by the last + * successful interceptor in the list, or otherwise the original consumed + * message. + * + * @param consumer the consumer which contains the interceptor + * @param message the message to be consumed by the client. + * @return message that is either modified by the interceptor or same message + * passed into the method. + */ + Message beforeConsume(Consumer consumer, Message message); + + /** + * This is called consumer sends the acknowledgment to the broker. + * + *

Any exception thrown by this method will be ignored by the caller. + * + * @param consumer the consumer which contains the interceptor + * @param messageId message to ack, null if acknowledge fail. + * @param exception the exception on acknowledge. + */ + void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception); + + /** + * This is called consumer send the cumulative acknowledgment to the broker. + * + *

Any exception thrown by this method will be ignored by the caller. + * + * @param consumer the consumer which contains the interceptor + * @param messageId message to ack, null if acknowledge fail. + * @param exception the exception on acknowledge. + */ + void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable exception); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 8256b4a2820d4..b3aa720902033 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -318,4 +318,12 @@ public interface ProducerBuilder extends Cloneable { * @return */ ProducerBuilder properties(Map properties); + + /** + * Intercept {@link Producer}. + * + * @param interceptors the list of interceptors to intercept the producer created by this builder. + * @return producer builder. + */ + ProducerBuilder intercept(ProducerInterceptor ... interceptors); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java new file mode 100644 index 0000000000000..b6a0d775585d0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * A plugin interface that allows you to intercept (and possibly mutate) the + * messages received by the producer before they are published to the Pulsar + * brokers. + *

+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but + * not propagated further. + *

+ * ProducerInterceptor callbacks may be called from multiple threads. Interceptor + * implementation must ensure thread-safety, if needed. + */ +public interface ProducerInterceptor extends AutoCloseable { + + /** + * Close the interceptor. + */ + void close(); + + /** + * This is called from {@link Producer#send(Object)} and {@link + * Producer#sendAsync(Object)} methods, before + * send the message to the brokers. This method is allowed to modify the + * record, in which case, the new record + * will be returned. + *

+ * Any exception thrown by this method will be caught by the caller and + * logged, but not propagated further. + *

+ * Since the producer may run multiple interceptors, a particular + * interceptor's {@link #beforeSend(Producer, Message)} callback will be called in the + * order specified by + * {@link ProducerBuilder#intercept(ProducerInterceptor[])}. + *

+ * The first interceptor in the list gets the message passed from the client, + * the following interceptor will be passed the message returned by the + * previous interceptor, and so on. Since interceptors are allowed to modify + * messages, interceptors may potentially get the message already modified by + * other interceptors. However, building a pipeline of mutable interceptors + * that depend on the output of the previous interceptor is discouraged, + * because of potential side-effects caused by interceptors potentially + * failing to modify the message and throwing an exception. If one of the + * interceptors in the list throws an exception from + * {@link#beforeSend(Message)}, the exception is caught, logged, and the next + * interceptor is called with the message returned by the last successful + * interceptor in the list, or otherwise the client. + * + * @param producer the producer which contains the interceptor. + * @param message message to send + * @return the intercepted message + */ + Message beforeSend(Producer producer, Message message); + + /** + * This method is called when the message sent to the broker has been + * acknowledged, or when sending the message fails. + * This method is generally called just before the user callback is + * called, and in additional cases when an exception on the producer side. + *

+ * Any exception thrown by this method will be ignored by the caller. + *

+ * This method will generally execute in the background I/O thread, so the + * implementation should be reasonably fast. Otherwise, sending of messages + * from other threads could be delayed. + * + * @param producer the producer which contains the interceptor. + * @param message the message that application sends + * @param msgId the message id that assigned by the broker; null if send failed. + * @param exception the exception on sending messages, null indicates send has succeed. + */ + void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index a6fda6f8b6632..d8a0de3b9a4be 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -60,10 +60,11 @@ enum ConsumerType { protected final ConcurrentLinkedQueue>> pendingReceives; protected int maxReceiverQueueSize; protected Schema schema; + protected final ConsumerInterceptors interceptors; protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize, ExecutorService listenerExecutor, - CompletableFuture> subscribeFuture, Schema schema) { + CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors) { super(client, topic); this.maxReceiverQueueSize = receiverQueueSize; this.subscription = conf.getSubscriptionName(); @@ -81,6 +82,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat this.listenerExecutor = listenerExecutor; this.pendingReceives = Queues.newConcurrentLinkedQueue(); this.schema = schema; + this.interceptors = interceptors; } @Override @@ -335,6 +337,7 @@ public String getSubscription() { return subscription; } + @Override public String getConsumerName() { return this.consumerName; } @@ -360,4 +363,24 @@ protected void setMaxReceiverQueueSize(int newSize) { this.maxReceiverQueueSize = newSize; } + protected Message beforeConsume(Message message) { + if (interceptors != null) { + return interceptors.beforeConsume(this, message); + } else { + return message; + } + } + + protected void onAcknowledge(MessageId messageId, Throwable exception) { + if (interceptors != null) { + interceptors.onAcknowledge(this, messageId, exception); + } + } + + protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) { + if (interceptors != null) { + interceptors.onAcknowledgeCumulative(this, messageId, exception); + } + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f0067f7f10f2f..2095babbb1de8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -20,6 +20,8 @@ import static com.google.common.base.Preconditions.checkArgument; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -32,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.ConsumerInterceptor; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClientException; @@ -52,6 +55,7 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { private final PulsarClientImpl client; private ConsumerConfigurationData conf; private final Schema schema; + private List> interceptorList; private static long MIN_ACK_TIMEOUT_MILLIS = 1000; @@ -104,8 +108,9 @@ public CompletableFuture> subscribeAsync() { return FutureUtil.failedFuture( new InvalidConfigurationException("Subscription name must be set on the consumer builder")); } - - return client.subscribeAsync(conf, schema); + return interceptorList == null || interceptorList.size() == 0 ? + client.subscribeAsync(conf, schema, null) : + client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList)); } @Override @@ -242,7 +247,16 @@ public ConsumerBuilder subscriptionInitialPosition(SubscriptionInitialPositio return this; } - public ConsumerConfigurationData getConf() { + @Override + public ConsumerBuilder intercept(ConsumerInterceptor... interceptors) { + if (interceptorList == null) { + interceptorList = new ArrayList<>(); + } + interceptorList.addAll(Arrays.asList(interceptors)); + return this; + } + + public ConsumerConfigurationData getConf() { return conf; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fe37b69a50963..a0a231902fbac 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -143,14 +143,14 @@ enum SubscriptionMode { } ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, - ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, Schema schema) { - this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema); + ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors) { + this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, interceptors); } ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, ExecutorService listenerExecutor, int partitionIndex, CompletableFuture> subscribeFuture, - SubscriptionMode subscriptionMode, MessageId startMessageId, Schema schema) { - super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema); + SubscriptionMode subscriptionMode, MessageId startMessageId, Schema schema, ConsumerInterceptors interceptors) { + super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors); this.consumerId = client.newConsumerId(); this.subscriptionMode = subscriptionMode; this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; @@ -263,8 +263,9 @@ protected Message internalReceive() throws PulsarClientException { Message message; try { message = incomingMessages.take(); - messageProcessed(message); - return message; + Message interceptMsg = beforeConsume(message); + messageProcessed(interceptMsg); + return interceptMsg; } catch (InterruptedException e) { Thread.currentThread().interrupt(); stats.incrementNumReceiveFailed(); @@ -293,8 +294,9 @@ protected CompletableFuture> internalReceiveAsync() { if (message == null && conf.getReceiverQueueSize() == 0) { sendFlowPermitsToBroker(cnx(), 1); } else if (message != null) { - messageProcessed(message); - result.complete(message); + Message interceptMsg = beforeConsume(message); + messageProcessed(interceptMsg); + result.complete(interceptMsg); } return result; @@ -352,10 +354,11 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl Message message; try { message = incomingMessages.poll(timeout, unit); - if (message != null) { - messageProcessed(message); + Message interceptMsg = beforeConsume(message); + if (interceptMsg != null) { + messageProcessed(interceptMsg); } - return message; + return interceptMsg; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -410,7 +413,13 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack checkArgument(messageId instanceof MessageIdImpl); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); - return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + getState())); + PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); + if (AckType.Individual.equals(ackType)) { + onAcknowledge(messageId, exception); + } else if (AckType.Cumulative.equals(ackType)) { + onAcknowledgeCumulative(messageId, exception); + } + return FutureUtil.failedFuture(exception); } if (messageId instanceof BatchMessageIdImpl) { @@ -444,7 +453,9 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack unAckedMessageTracker.remove(msgId); stats.incrementNumAcksSent(1); } + onAcknowledge(messageId, null); } else if (ackType == AckType.Cumulative) { + onAcknowledgeCumulative(messageId, null); stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId)); } @@ -837,9 +848,10 @@ void notifyPendingReceivedCallback(final Message message, Exception exception receivedFuture.complete(message); } else { // increase permits for available message-queue - messageProcessed(message); + Message interceptMsg = beforeConsume(message); + messageProcessed(interceptMsg); // return message to receivedCallback - listenerExecutor.execute(() -> receivedFuture.complete(message)); + listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg)); } } } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java new file mode 100644 index 0000000000000..a0d30fcb963e9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerInterceptor; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain + * of custom interceptors. + */ +public class ConsumerInterceptors implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class); + + private final List> interceptors; + + public ConsumerInterceptors(List> interceptors) { + this.interceptors = interceptors; + } + + /** + * This is called just before the message is returned by {@link Consumer#receive()}, + * {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture} + * returned by {@link Consumer#receiveAsync()} completes. + *

+ * This method calls {@link ConsumerInterceptor#beforeConsume(Consumer, Message)} for each interceptor. Messages returned + * from each interceptor get passed to beforeConsume() of the next interceptor in the chain of interceptors. + *

+ * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets + * caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous + * successful interceptor beforeConsume call. + * + * @param consumer the consumer which contains the interceptors + * @param message message to be consume by the client. + * @return messages that are either modified by interceptors or same as messages passed to this method. + */ + public Message beforeConsume(Consumer consumer, Message message) { + Message interceptorMessage = message; + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage); + } catch (Exception e) { + if (consumer != null) { + log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", consumer.getTopic(), consumer.getConsumerName(), e); + } else { + log.warn("Error executing interceptor beforeConsume callback", e); + } + } + } + return interceptorMessage; + } + + /** + * This is called when acknowledge request return from the broker. + *

+ * This method calls {@link ConsumerInterceptor#onAcknowledge(Consumer, MessageId, Throwable)} method for each interceptor. + *

+ * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated. + * + * @param consumer the consumer which contains the interceptors + * @param messageId message to acknowledge. + * @param exception exception returned by broker. + */ + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptors.get(i).onAcknowledge(consumer, messageId, exception); + } catch (Exception e) { + log.warn("Error executing interceptor onAcknowledge callback ", e); + } + } + } + + /** + * This is called when acknowledge cumulative request return from the broker. + *

+ * This method calls {@link ConsumerInterceptor#onAcknowledgeCumulative(Consumer, MessageId, Throwable)} (Message, Throwable)} method for each interceptor. + *

+ * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated. + * + * @param consumer the consumer which contains the interceptors + * @param messageId messages to acknowledge. + * @param exception exception returned by broker. + */ + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable exception) { + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception); + } catch (Exception e) { + log.warn("Error executing interceptor onAcknowledgeCumulative callback ", e); + } + } + } + + @Override + public void close() throws IOException { + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptors.get(i).close(); + } catch (Exception e) { + log.error("Fail to close consumer interceptor ", e); + } + } + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 96b68c1af4490..97e2247f8d291 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -241,7 +241,7 @@ public String getProducerName() { return null; } - ByteBuf getDataBuffer() { + public ByteBuf getDataBuffer() { return payload; } @@ -274,7 +274,7 @@ public String getProperty(String name) { return properties.get(name); } - MessageMetadata.Builder getMessageBuilder() { + public MessageMetadata.Builder getMessageBuilder() { return msgMetadataBuilder; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f1cb9cf262eeb..75fdac681a343 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -85,9 +85,9 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private final ConsumerConfigurationData internalConfig; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorService listenerExecutor, - CompletableFuture> subscribeFuture, Schema schema) { + CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors) { super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf, - Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema); + Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors); checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); @@ -632,7 +632,7 @@ public static MultiTopicsConsumerImpl createPartitionedConsumer(PulsarCli ExecutorService listenerExecutor, CompletableFuture> subscribeFuture, int numPartitions, - Schema schema) { + Schema schema, ConsumerInterceptors interceptors) { checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer"); // get topic name, then remove it from conf, so constructor will create a consumer with no topic. @@ -641,7 +641,7 @@ public static MultiTopicsConsumerImpl createPartitionedConsumer(PulsarCli cloneConf.getTopicNames().remove(topicName); CompletableFuture future = new CompletableFuture<>(); - MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema); + MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema, interceptors); future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions)) .thenRun(()-> subscribeFuture.complete(consumer)) @@ -695,7 +695,7 @@ private void subscribeTopicPartitions(CompletableFuture subscribeResult, S String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerImpl newConsumer = new ConsumerImpl<>(client, partitionName, configurationData, - client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); + client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema, interceptors); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); return subFuture; }) @@ -706,7 +706,7 @@ private void subscribeTopicPartitions(CompletableFuture subscribeResult, S CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerImpl newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, - client.externalExecutorProvider().getExecutor(), 0, subFuture, schema); + client.externalExecutorProvider().getExecutor(), 0, subFuture, schema, interceptors); consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); futureList = Collections.singletonList(subFuture); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 3e159e8f48f33..12ecf2b0f289a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -53,8 +53,8 @@ public class PartitionedProducerImpl extends ProducerBase { private final TopicMetadata topicMetadata; public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, - CompletableFuture> producerCreatedFuture, Schema schema) { - super(client, topic, conf, producerCreatedFuture, schema); + CompletableFuture> producerCreatedFuture, Schema schema, ProducerInterceptors interceptors) { + super(client, topic, conf, producerCreatedFuture, schema, interceptors); this.producers = Lists.newArrayListWithCapacity(numPartitions); this.topicMetadata = new TopicMetadataImpl(numPartitions); this.routerPolicy = getMessageRouter(); @@ -111,7 +111,7 @@ private void start() { for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) { String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString(); ProducerImpl producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(), - partitionIndex, schema); + partitionIndex, schema, interceptors); producers.add(producer); producer.producerCreatedFuture().handle((prod, createException) -> { if (createException != null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index d0b0c606c56e1..3f6dfecdb30d9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -51,8 +51,8 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, ConsumerConfigurationData conf, ExecutorService listenerExecutor, CompletableFuture> subscribeFuture, - Schema schema) { - super(client, conf, listenerExecutor, subscribeFuture, schema); + Schema schema, ConsumerInterceptors interceptors) { + super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors); this.topicsPattern = topicsPattern; if (this.namespaceName == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index eb02b6bd8f3f4..39e632c97afea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -36,13 +36,15 @@ public abstract class ProducerBase extends HandlerState implements Producer> producerCreatedFuture; protected final ProducerConfigurationData conf; protected final Schema schema; + protected final ProducerInterceptors interceptors; protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf, - CompletableFuture> producerCreatedFuture, Schema schema) { + CompletableFuture> producerCreatedFuture, Schema schema, ProducerInterceptors interceptors) { super(client, topic); this.producerCreatedFuture = producerCreatedFuture; this.conf = conf; this.schema = schema; + this.interceptors = interceptors; } @Override @@ -148,6 +150,20 @@ public CompletableFuture> producerCreatedFuture() { return producerCreatedFuture; } + protected Message beforeSend(Message message) { + if (interceptors != null) { + return interceptors.beforeSend(this, message); + } else { + return message; + } + } + + protected void onSendAcknowledgement(Message message, MessageId msgId, Throwable exception) { + if (interceptors != null) { + interceptors.onSendAcknowledgement(this, message, msgId, exception); + } + } + @Override public String toString() { return "ProducerBase{" + "topic='" + topic + '\'' + '}'; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index ff391be3eeb4d..834e0c082f221 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -33,6 +36,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.ProducerInterceptor; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; @@ -46,6 +50,7 @@ public class ProducerBuilderImpl implements ProducerBuilder { private final PulsarClientImpl client; private ProducerConfigurationData conf; private Schema schema; + private List> interceptorList; @VisibleForTesting public ProducerBuilderImpl(PulsarClientImpl client, Schema schema) { @@ -97,7 +102,9 @@ public CompletableFuture> createAsync() { .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder")); } - return client.createProducerAsync(conf, schema); + return interceptorList == null || interceptorList.size() == 0 ? + client.createProducerAsync(conf, schema, null) : + client.createProducerAsync(conf, schema, new ProducerInterceptors<>(interceptorList)); } @Override @@ -226,4 +233,13 @@ public ProducerBuilder properties(@NonNull Map properties) { conf.getProperties().putAll(properties); return this; } + + @Override + public ProducerBuilder intercept(ProducerInterceptor... interceptors) { + if (interceptorList == null) { + interceptorList = new ArrayList<>(); + } + interceptorList.addAll(Arrays.asList(interceptors)); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index c3428e2644ea2..9401e7cd201a0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -121,8 +121,9 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne .newUpdater(ProducerImpl.class, "msgIdGenerator"); public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, - CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema) { - super(client, topic, conf, producerCreatedFuture, schema); + CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema, + ProducerInterceptors interceptors) { + super(client, topic, conf, producerCreatedFuture, schema, interceptors); this.producerId = client.newProducerId(); this.producerName = conf.getProducerName(); this.partitionIndex = partitionIndex; @@ -205,9 +206,16 @@ public long getLastSequenceId() { @Override CompletableFuture internalSendAsync(Message message) { + CompletableFuture future = new CompletableFuture<>(); - sendAsync(message, new SendCallback() { + MessageImpl interceptorMessage = (MessageImpl) beforeSend(message); + //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors. + interceptorMessage.getDataBuffer().retain(); + if (interceptors != null) { + interceptorMessage.getProperties(); + } + sendAsync(interceptorMessage, new SendCallback() { SendCallback nextCallback = null; MessageImpl nextMsg = null; long createdAt = System.nanoTime(); @@ -229,25 +237,40 @@ public MessageImpl getNextMessage() { @Override public void sendComplete(Exception e) { - if (e != null) { - stats.incrementSendFailed(); - future.completeExceptionally(e); - } else { - future.complete(message.getMessageId()); - stats.incrementNumAcksReceived(System.nanoTime() - createdAt); - } - while (nextCallback != null) { - SendCallback sendCallback = nextCallback; - MessageImpl msg = nextMsg; + try { if (e != null) { stats.incrementSendFailed(); - sendCallback.getFuture().completeExceptionally(e); + onSendAcknowledgement(interceptorMessage, null, e); + future.completeExceptionally(e); } else { - sendCallback.getFuture().complete(msg.getMessageId()); + onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null); + future.complete(interceptorMessage.getMessageId()); stats.incrementNumAcksReceived(System.nanoTime() - createdAt); } - nextMsg = nextCallback.getNextMessage(); - nextCallback = nextCallback.getNextSendCallback(); + } finally { + interceptorMessage.getDataBuffer().release(); + } + + while (nextCallback != null) { + SendCallback sendCallback = nextCallback; + MessageImpl msg = nextMsg; + //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors. + try { + msg.getDataBuffer().retain(); + if (e != null) { + stats.incrementSendFailed(); + onSendAcknowledgement((Message) msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } else { + onSendAcknowledgement((Message) msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); + stats.incrementNumAcksReceived(System.nanoTime() - createdAt); + } + nextMsg = nextCallback.getNextMessage(); + nextCallback = nextCallback.getNextSendCallback(); + } finally { + msg.getDataBuffer().release(); + } } } @@ -292,14 +315,18 @@ public void sendAsync(Message message, SendCallback callback) { String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE) ? "Compressed" : ""; - callback.sendComplete(new PulsarClientException.InvalidMessageException( - format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize, - PulsarDecoder.MaxMessageSize))); + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException( + format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize, + PulsarDecoder.MaxMessageSize)); + callback.sendComplete(invalidMessageException); return; } if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) { - callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message")); + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException("Cannot re-use the same message"); + callback.sendComplete(invalidMessageException); compressedPayload.release(); return; } @@ -463,8 +490,7 @@ private boolean canEnqueueRequest(SendCallback callback) { semaphore.acquire(); } else { if (!semaphore.tryAcquire()) { - callback.sendComplete( - new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full")); + callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full")); return false; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java new file mode 100644 index 0000000000000..59d1ad3183a34 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A container that holds the list{@link org.apache.pulsar.client.api.ProducerInterceptor} + * and wraps calls to the chain of custom interceptors. + */ +public class ProducerInterceptors implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class); + + private final List> interceptors; + + public ProducerInterceptors(List> interceptors) { + this.interceptors = interceptors; + } + + /** + * This is called when client sends message to pulsar broker, before key and value gets serialized. + * The method calls {@link ProducerInterceptor#beforeSend(Producer,Message)} method. Message returned from + * first interceptor's beforeSend() is passed to the second interceptor beforeSend(), and so on in the + * interceptor chain. The message returned from the last interceptor is returned from this method. + * + * This method does not throw exceptions. Exceptions thrown by any interceptor methods are caught and ignored. + * If a interceptor in the middle of the chain, that normally modifies the message, throws an exception, + * the next interceptor in the chain will be called with a message returned by the previous interceptor that did + * not throw an exception. + * + * @param producer the producer which contains the interceptor. + * @param message the message from client + * @return the message to send to topic/partition + */ + public Message beforeSend(Producer producer, Message message) { + Message interceptorMessage = message; + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage); + } catch (Exception e) { + if (message != null && producer != null) { + log.warn("Error executing interceptor beforeSend callback for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), e); + } else { + log.warn("Error Error executing interceptor beforeSend callback ", e); + } + } + } + return interceptorMessage; + } + + /** + * This method is called when the message send to the broker has been acknowledged, or when sending the record fails + * before it gets send to the broker. + * This method calls {@link ProducerInterceptor#onSendAcknowledgement(Producer, Message, MessageId, Throwable)} method for + * each interceptor. + * + * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored. + * + * @param producer the producer which contains the interceptor. + * @param message The message returned from the last interceptor is returned from {@link ProducerInterceptor#beforeSend(Producer, Message)} + * @param msgId The message id that broker returned. Null if has error occurred. + * @param exception The exception thrown during processing of this message. Null if no error occurred. + */ + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) { + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptors.get(i).onSendAcknowledgement(producer, message, msgId, exception); + } catch (Exception e) { + log.warn("Error executing interceptor onSendAcknowledgement callback ", e); + } + } + } + + @Override + public void close() throws IOException { + for (int i = 0; i < interceptors.size(); i++) { + try { + interceptors.get(i).close(); + } catch (Exception e) { + log.error("Fail to close producer interceptor ", e); + } + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 4013068246d95..b87b9bd913ca0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -238,10 +238,15 @@ public CompletableFuture> createProducerAsync(final String topi } public CompletableFuture> createProducerAsync(ProducerConfigurationData conf) { - return createProducerAsync(conf, Schema.BYTES); + return createProducerAsync(conf, Schema.BYTES, null); } - public CompletableFuture> createProducerAsync(ProducerConfigurationData conf, Schema schema) { + public CompletableFuture> createProducerAsync(ProducerConfigurationData conf, Schema schema) { + return createProducerAsync(conf, schema, null); + } + + public CompletableFuture> createProducerAsync(ProducerConfigurationData conf, Schema schema, + ProducerInterceptors interceptors) { if (conf == null) { return FutureUtil.failedFuture( new PulsarClientException.InvalidConfigurationException("Producer configuration undefined")); @@ -273,9 +278,9 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat ProducerBase producer; if (metadata.partitions > 1) { producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, - producerCreatedFuture, schema); + producerCreatedFuture, schema, interceptors); } else { - producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema); + producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors); } synchronized (producers) { @@ -336,10 +341,10 @@ public CompletableFuture> subscribeAsync(final String topic, fi } public CompletableFuture> subscribeAsync(ConsumerConfigurationData conf) { - return subscribeAsync(conf, Schema.BYTES); + return subscribeAsync(conf, Schema.BYTES, null); } - public CompletableFuture> subscribeAsync(ConsumerConfigurationData conf, Schema schema) { + public CompletableFuture> subscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); } @@ -377,15 +382,15 @@ public CompletableFuture> subscribeAsync(ConsumerConfigurationDa return FutureUtil .failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern")); } - return patternTopicSubscribeAsync(conf, schema); + return patternTopicSubscribeAsync(conf, schema, interceptors); } else if (conf.getTopicNames().size() == 1) { - return singleTopicSubscribeAsync(conf, schema); + return singleTopicSubscribeAsync(conf, schema, interceptors); } else { - return multiTopicSubscribeAsync(conf, schema); + return multiTopicSubscribeAsync(conf, schema, interceptors); } } - private CompletableFuture> singleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema) { + private CompletableFuture> singleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { if (schema instanceof AutoSchema) { AutoSchema autoSchema = (AutoSchema) schema; return lookup.getSchema(TopicName.get(conf.getSingleTopic())) @@ -395,20 +400,20 @@ private CompletableFuture> singleTopicSubscribeAsync(ConsumerCon log.info("Auto detected schema for topic {} : {}", conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); autoSchema.setSchema(genericSchema); - return doSingleTopicSubscribeAsync(conf, schema); + return doSingleTopicSubscribeAsync(conf, schema, interceptors); } else { return FutureUtil.failedFuture( new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas")); } }); } else { - return doSingleTopicSubscribeAsync(conf, schema); + return doSingleTopicSubscribeAsync(conf, schema, interceptors); } } - private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema) { + private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); String topic = conf.getSingleTopic(); @@ -423,10 +428,10 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC ExecutorService listenerThread = externalExecutorProvider.getExecutor(); if (metadata.partitions > 1) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, - listenerThread, consumerSubscribedFuture, metadata.partitions, schema); + listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, - consumerSubscribedFuture, schema); + consumerSubscribedFuture, schema, interceptors); } synchronized (consumers) { @@ -441,11 +446,11 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC return consumerSubscribedFuture; } - private CompletableFuture> multiTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema) { + private CompletableFuture> multiTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); ConsumerBase consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, - externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema); + externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors); synchronized (consumers) { consumers.put(consumer, Boolean.TRUE); @@ -455,10 +460,10 @@ private CompletableFuture> multiTopicSubscribeAsync(ConsumerConf } public CompletableFuture> patternTopicSubscribeAsync(ConsumerConfigurationData conf) { - return patternTopicSubscribeAsync(conf, Schema.BYTES); + return patternTopicSubscribeAsync(conf, Schema.BYTES, null); } - private CompletableFuture> patternTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema) { + private CompletableFuture> patternTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { String regex = conf.getTopicsPattern().pattern(); TopicName destination = TopicName.get(regex); NamespaceName namespaceName = destination.getNamespaceObject(); @@ -479,7 +484,7 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo conf, externalExecutorProvider.getExecutor(), consumerSubscribedFuture, - schema); + schema, interceptors); synchronized (consumers) { consumers.put(consumer, Boolean.TRUE); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index aafd12549fb9e..388e5c487de2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -85,7 +85,7 @@ public void reachedEndOfTopic(Consumer consumer) { final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName()); consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor, - partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema); + partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index eae02b08c06b3..230a022c617b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -127,4 +127,8 @@ public T getValue() { public Optional getEncryptionCtx() { return msg.getEncryptionCtx(); } + + public Message getMessage() { + return msg; + } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 4380c63ff7c09..f5108fc6c154c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -64,7 +64,7 @@ public void setup() { logger = mock(Logger.class); client = mock(PulsarClientImpl.class); when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES)); - when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class))) + when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null))) .thenReturn(CompletableFuture.completedFuture(producer)); when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null)); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java index 89efcf594de00..399a4c701b27a 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java @@ -21,7 +21,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.ProducerInterceptor; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -190,6 +192,11 @@ public ProducerBuilder property(String key, String value) { public ProducerBuilder properties(Map properties) { return this; } + + @Override + public ProducerBuilder intercept(ProducerInterceptor... interceptors) { + return null; + } } @BeforeMethod @@ -197,7 +204,7 @@ public void setup() throws Exception { this.mockClient = mock(PulsarClient.class); when(mockClient.newProducer(any(Schema.class))) - .thenReturn(new MockProducerBuilder()); + .thenReturn(new MockProducerBuilder()); producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test"); producers.initialize(); @@ -206,12 +213,12 @@ public void setup() throws Exception { private Producer createMockProducer(String topic) { Producer producer = mock(Producer.class); when(producer.closeAsync()) - .thenAnswer(invocationOnMock -> { - synchronized (mockProducers) { - mockProducers.remove(topic); - } - return FutureUtils.Void(); - }); + .thenAnswer(invocationOnMock -> { + synchronized (mockProducers) { + mockProducers.remove(topic); + } + return FutureUtils.Void(); + }); return producer; } @@ -224,13 +231,13 @@ public void testGetCloseProducer() throws Exception { assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(Schema.BYTES); + .newProducer(Schema.BYTES); assertTrue(producers.getProducers().containsKey(producerName)); // second get will not create a new producer assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(Schema.BYTES); + .newProducer(Schema.BYTES); assertTrue(producers.getProducers().containsKey(producerName)); // close