From 12db7df67da4385946678b2034aa762fa4c18cc3 Mon Sep 17 00:00:00 2001 From: Dave Maughan Date: Thu, 7 Jul 2022 13:07:17 +0100 Subject: [PATCH 1/2] [feature][client] PIP-184: Topic specific consumer priorityLevel Resolves #16481 --- .../pulsar/client/api/ConsumerBuilder.java | 34 ++++++++ .../client/api/TopicConsumerBuilder.java | 47 ++++++++++ .../client/impl/ConsumerBuilderImpl.java | 30 +++++++ .../pulsar/client/impl/ConsumerImpl.java | 5 +- .../client/impl/TopicConsumerBuilderImpl.java | 43 ++++++++++ .../impl/conf/ConsumerConfigurationData.java | 16 ++++ .../conf/TopicConsumerConfigurationData.java | 85 +++++++++++++++++++ .../client/impl/ConsumerBuilderImplTest.java | 20 +++++ .../pulsar/client/impl/ConsumerImplTest.java | 26 +++++- .../impl/TopicConsumerBuilderImplTest.java | 55 ++++++++++++ .../conf/ConsumerConfigurationDataTest.java | 48 +++++++++++ .../TopicConsumerConfigurationDataTest.java | 74 ++++++++++++++++ site2/docs/client-libraries-java.md | 2 +- 13 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index e303e55538ea5..804aa7097f373 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -813,4 +813,38 @@ public interface ConsumerBuilder extends Cloneable { * @param enabled whether to enable AutoScaledReceiverQueueSize. */ ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicName a topic name + * @return a {@link TopicConsumerBuilder} instance + */ + TopicConsumerBuilder topicConfiguration(String topicName); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicName a topic name + * @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance + */ + ConsumerBuilder topicConfiguration(String topicName, + java.util.function.Consumer> builderConsumer); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicsPattern a regular expression to match a topic name + * @return a {@link TopicConsumerBuilder} instance + */ + TopicConsumerBuilder topicConfiguration(Pattern topicsPattern); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicsPattern a regular expression to match a topic name + * @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance + */ + ConsumerBuilder topicConfiguration(Pattern topicsPattern, + java.util.function.Consumer> builderConsumer); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java new file mode 100644 index 0000000000000..5096f52577625 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * {@link TopicConsumerBuilder} is used to configure topic specific options to override those set at the + * {@link ConsumerBuilder} level. + * + * @see ConsumerBuilder#topicConfiguration(String) + * + * @param the type of the value in the {@link ConsumerBuilder} + */ +public interface TopicConsumerBuilder { + /** + * Configure the priority level of this topic. + * + * @see ConsumerBuilder#priorityLevel(int) + * + * @param priorityLevel the priority of this topic + * @return the {@link TopicConsumerBuilder} instance + */ + TopicConsumerBuilder priorityLevel(int priorityLevel); + + /** + * Complete the configuration of the topic specific options and return control back to the + * {@link ConsumerBuilder} instance. + * + * @return the {@link ConsumerBuilder} instance + */ + ConsumerBuilder build(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 616ed86abb8a2..d30e72aa53c5a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -52,8 +52,10 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicConsumerBuilder; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -537,4 +539,32 @@ public ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled) { conf.setAutoScaledReceiverQueueSizeEnabled(enabled); return this; } + + @Override + public TopicConsumerBuilder topicConfiguration(String topicName) { + TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf); + conf.getTopicConfigurations().add(topicConf); + return new TopicConsumerBuilderImpl<>(this, topicConf); + } + + @Override + public ConsumerBuilder topicConfiguration(String topicName, + java.util.function.Consumer> builderConsumer) { + builderConsumer.accept(topicConfiguration(topicName)); + return this; + } + + @Override + public TopicConsumerBuilder topicConfiguration(Pattern topicsPattern) { + TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicsPattern(topicsPattern, conf); + conf.getTopicConfigurations().add(topicConf); + return new TopicConsumerBuilderImpl<>(this, topicConf); + } + + @Override + public ConsumerBuilder topicConfiguration(Pattern topicsPattern, + java.util.function.Consumer> builderConsumer) { + builderConsumer.accept(topicConfiguration(topicsPattern)); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2955065f31305..c0c830546e0dc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -61,6 +61,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -146,6 +148,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final NegativeAcksTracker negativeAcksTracker; protected final ConsumerStatsRecorder stats; + @Getter(AccessLevel.PACKAGE) private final int priorityLevel; private final SubscriptionMode subscriptionMode; private volatile BatchMessageIdImpl startMessageId; @@ -266,7 +269,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.partitionIndex = partitionIndex; this.hasParentConsumer = hasParentConsumer; this.parentConsumerHasListener = parentConsumerHasListener; - this.priorityLevel = conf.getPriorityLevel(); + this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel(); this.readCompacted = conf.isReadCompacted(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); this.negativeAcksTracker = new NegativeAcksTracker(this, conf); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java new file mode 100644 index 0000000000000..33f91366584ad --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import lombok.RequiredArgsConstructor; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.TopicConsumerBuilder; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; + +@RequiredArgsConstructor +class TopicConsumerBuilderImpl implements TopicConsumerBuilder { + private final ConsumerBuilder consumerBuilder; + private final TopicConsumerConfigurationData topicConf; + + @Override + public TopicConsumerBuilder priorityLevel(int priorityLevel) { + checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0"); + topicConf.setPriorityLevel(priorityLevel); + return this; + } + + @Override + public ConsumerBuilder build() { + return consumerBuilder; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 6c22d143a6f06..dcde042f4e87c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Sets; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -164,6 +166,20 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; + private List topicConfigurations = new ArrayList<>(); + + public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { + return topicConfigurations.stream() + .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName)) + .findFirst() + .orElseGet(() -> TopicConsumerConfigurationData.ofTopicName(topicName, this)); + } + + public void setTopicConfigurations(List topicConfigurations) { + checkArgument(topicConfigurations != null, "topicConfigurations should not be null."); + this.topicConfigurations = topicConfigurations; + } + public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) { checkArgument(interval > 0, "interval needs to be > 0"); this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java new file mode 100644 index 0000000000000..e6d7a9aa0d77e --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import java.io.Serializable; +import java.util.regex.Pattern; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TopicConsumerConfigurationData implements Serializable { + private static final long serialVersionUID = 1L; + + private TopicNameMatcher topicNameMatcher; + private int priorityLevel; + + public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern, int priorityLevel) { + return of(new TopicNameMatcher.TopicsPattern(topicsPattern), priorityLevel); + } + + public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern, + ConsumerConfigurationData conf) { + return ofTopicsPattern(topicsPattern, conf.getPriorityLevel()); + } + + public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName, int priorityLevel) { + return of(new TopicNameMatcher.TopicName(topicName), priorityLevel); + } + + public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName, + ConsumerConfigurationData conf) { + return ofTopicName(topicName, conf.getPriorityLevel()); + } + + static TopicConsumerConfigurationData of(@NonNull TopicNameMatcher topicNameMatcher, int priorityLevel) { + return new TopicConsumerConfigurationData(topicNameMatcher, priorityLevel); + } + + public interface TopicNameMatcher extends Serializable { + boolean matches(String topicName); + + @RequiredArgsConstructor + class TopicsPattern implements TopicNameMatcher { + @NonNull + private final Pattern topicsPattern; + + @Override + public boolean matches(String topicName) { + return topicsPattern.matcher(topicName).matches(); + } + } + + @RequiredArgsConstructor + class TopicName implements TopicNameMatcher { + @NonNull + private final String topicName; + + @Override + public boolean matches(String topicName) { + return this.topicName.equals(topicName); + } + } + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index ff60caca2c19a..03bcc6ff2388e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -31,13 +34,16 @@ import java.util.regex.Pattern; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -338,4 +344,18 @@ public void testStartPaused() { consumerBuilderImpl.startPaused(true); verify(consumerBuilderImpl.getConf()).setStartPaused(true); } + + @Test + public void testTopicConsumerBuilder() { + List topicConsumerConfigurationDataList = new ArrayList<>(); + when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList); + + ConsumerBuilder consumerBuilder = consumerBuilderImpl.topicConfiguration(Pattern.compile("foo")).priorityLevel(1).build(); + + assertThat(consumerBuilder).isSameAs(consumerBuilderImpl); + assertThat(topicConsumerConfigurationDataList).hasSize(1); + TopicConsumerConfigurationData topicConsumerConfigurationData = topicConsumerConfigurationDataList.get(0); + assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue(); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index fcea9d490707d..756c18441b38b 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.awaitility.Awaitility; import org.testng.Assert; @@ -47,23 +49,28 @@ import org.testng.annotations.Test; public class ConsumerImplTest { + private final String topic = "non-persistent://tenant/ns1/my-topic"; private ExecutorProvider executorProvider; private ExecutorService internalExecutor; private ConsumerImpl consumer; - private ConsumerConfigurationData consumerConf; + private ConsumerConfigurationData consumerConf; @BeforeMethod(alwaysRun = true) public void setUp() { + consumerConf = new ConsumerConfigurationData<>(); + createConsumer(consumerConf); + } + + private void createConsumer(ConsumerConfigurationData consumerConf) { executorProvider = new ExecutorProvider(1, "ConsumerImplTest"); internalExecutor = Executors.newSingleThreadScheduledExecutor(); - consumerConf = new ConsumerConfigurationData<>(); + PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor); ClientConfigurationData clientConf = client.getConfiguration(); clientConf.setOperationTimeoutMs(100); clientConf.setStatsIntervalSeconds(0); - CompletableFuture> subscribeFuture = new CompletableFuture<>(); - String topic = "non-persistent://tenant/ns1/my-topic"; + CompletableFuture> subscribeFuture = new CompletableFuture<>(); consumerConf.setSubscriptionName("test-sub"); consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf, @@ -239,4 +246,15 @@ public void testMaxReceiverQueueSize() { Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 100); Assert.assertEquals(consumer.getAvailablePermits(), permits + 100); } + + @Test + public void testTopicPriorityLevel() { + ConsumerConfigurationData consumerConf = new ConsumerConfigurationData<>(); + consumerConf.getTopicConfigurations().add( + TopicConsumerConfigurationData.ofTopicName(topic, 1)); + + createConsumer(consumerConf); + + assertThat(consumer.getPriorityLevel()).isEqualTo(1); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java new file mode 100644 index 0000000000000..cfc2380cebe34 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TopicConsumerBuilderImplTest { + private TopicConsumerConfigurationData topicConsumerConfigurationData; + private TopicConsumerBuilderImpl topicConsumerBuilderImpl; + + @SuppressWarnings("unchecked") + @BeforeMethod(alwaysRun = true) + public void setup() { + ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); + topicConsumerConfigurationData = mock(TopicConsumerConfigurationData.class); + topicConsumerBuilderImpl = new TopicConsumerBuilderImpl<>(consumerBuilder, topicConsumerConfigurationData); + } + + @Test + public void testInvalidPriorityLevel() { + assertThatIllegalArgumentException() + .isThrownBy(() -> topicConsumerBuilderImpl.priorityLevel(-1)); + verify(topicConsumerConfigurationData, never()).setPriorityLevel(anyInt()); + } + + @Test + public void testValidPriorityLevel() { + topicConsumerBuilderImpl.priorityLevel(0); + verify(topicConsumerConfigurationData).setPriorityLevel(0); + } +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java new file mode 100644 index 0000000000000..0ec031d250561 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.regex.Pattern; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ConsumerConfigurationDataTest { + @DataProvider(name = "topicConf") + public Object[][] topicConf() { + return new Object[][] { + new Object[] {"foo", 2}, + new Object[] {"bar", 1} + }; + } + + @Test(dataProvider = "topicConf") + public void testTopicConsumerConfigurationData(String topicName, int expectedPriority) { + ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + consumerConfigurationData.setPriorityLevel(1); + + consumerConfigurationData.getTopicConfigurations() + .add(TopicConsumerConfigurationData.ofTopicsPattern(Pattern.compile("^foo$"), 2)); + + TopicConsumerConfigurationData topicConsumerConfigurationData = + consumerConfigurationData.getMatchingTopicConfiguration(topicName); + + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java new file mode 100644 index 0000000000000..a2bea68d1ac23 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import java.util.regex.Pattern; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TopicConsumerConfigurationDataTest { + @Test + public void testOfFactoryMethod() { + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .ofTopicName("foo", 1); + + assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue(); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } + + @Test + public void testOfDefaultFactoryMethod() { + ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + consumerConfigurationData.setPriorityLevel(1); + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .ofTopicName("foo", consumerConfigurationData); + + assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue(); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } + + @DataProvider(name = "topicNameMatch") + public Object[][] topicNameMatch() { + return new Object[][] { + new Object[] {"foo", true}, + new Object[] {"bar", false} + }; + } + + @Test(dataProvider = "topicNameMatch") + public void testTopicNameMatch(String topicName, boolean expectedMatch) { + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .ofTopicsPattern(Pattern.compile("^foo$"), 1); + assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches(topicName)).isEqualTo(expectedMatch); + } + + @Test + public void testNullTopicsPattern() { + assertThatNullPointerException() + .isThrownBy(() -> TopicConsumerConfigurationData.ofTopicsPattern(null, 1)); + } + + @Test + public void testTopicNameMatchNullTopicName() { + assertThat(TopicConsumerConfigurationData + .ofTopicName("foo", 1).getTopicNameMatcher().matches(null)).isFalse(); + } +} diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md index c1cad8a7fe1a3..da6f24a9060b5 100644 --- a/site2/docs/client-libraries-java.md +++ b/site2/docs/client-libraries-java.md @@ -729,7 +729,7 @@ When you create a consumer, you can use the `loadConf` configuration. The follow `consumerName`|String|Consumer name|null `ackTimeoutMillis`|long|Timeout of unacked messages|0 `tickDurationMillis`|long|Granularity of the ack-timeout redelivery.

Using an higher `tickDurationMillis` reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).|1000 -`priorityLevel`|int|Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type.

The broker follows descending priorities. For example, 0=max-priority, 1, 2,...

In Shared subscription type, the broker **first dispatches messages to the max priority level consumers if they have permits**. Otherwise, the broker considers next priority level consumers.

**Example 1**
If a subscription has consumerA with `priorityLevel` 0 and consumerB with `priorityLevel` 1, then the broker **only dispatches messages to consumerA until it runs out permits** and then starts dispatching messages to consumerB.

**Example 2**
Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.|0 +`priorityLevel`|int|Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. It can be set at the consumer level so all topics being consumed will have the same priority level or each topic being consumed can be given a different priority level.

The broker follows descending priorities. For example, 0=max-priority, 1, 2,...

In Shared subscription type, the broker **first dispatches messages to the max priority level consumers if they have permits**. Otherwise, the broker considers next priority level consumers.

**Example 1**
If a subscription has consumerA with `priorityLevel` 0 and consumerB with `priorityLevel` 1, then the broker **only dispatches messages to consumerA until it runs out permits** and then starts dispatching messages to consumerB.

**Example 2**
Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.|0 `cryptoFailureAction`|ConsumerCryptoFailureAction|Consumer should take action when it receives a message that can not be decrypted.
  • **FAIL**: this is the default option to fail messages until crypto succeeds.
  • **DISCARD**:silently acknowledge and not deliver message to an application.
  • **CONSUME**: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.

  • The decompression of message fails.

    If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

    Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.|
  • ConsumerCryptoFailureAction.FAIL
  • `properties`|SortedMap|A name or value property of this consumer.

    `properties` is application defined metadata attached to a consumer.

    When getting a topic stats, associate this metadata with the consumer stats for easier identification.|new TreeMap() `readCompacted`|boolean|If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

    A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

    Only enabling `readCompacted` on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

    Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a `PulsarClientException`.|false From d05a8f5fa4102af6c4a145cfb9baa3383bc4a881 Mon Sep 17 00:00:00 2001 From: Dave Maughan Date: Mon, 1 Aug 2022 10:08:37 +0100 Subject: [PATCH 2/2] Remove unused imports --- .../org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 03bcc6ff2388e..32afe69c3d082 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,7 +42,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /**