Skip to content

Commit

Permalink
[feature][client] PIP-184: Topic specific consumer priorityLevel
Browse files Browse the repository at this point in the history
Resolves apache#16481
  • Loading branch information
Dave Maughan committed Jul 26, 2022
1 parent 6f1f6aa commit cb82d6c
Show file tree
Hide file tree
Showing 13 changed files with 446 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,4 +813,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @param enabled whether to enable AutoScaledReceiverQueueSize.
*/
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

/**
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
*
* @param topicNameOrPattern a topic name or a regular expression to match a topic name
* @return a {@link TopicConsumerBuilder} instance
*/
TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern);

/**
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
*
* @param topicNameOrPattern a topic name or a regular expression to match a topic name
* @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance
*/
ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* {@link TopicConsumerBuilder} is used to configure topic specific options to override those set at the
* {@link ConsumerBuilder} level.
*
* @see ConsumerBuilder#topicConfiguration(String)
*
* @param <T> the type of the value in the {@link ConsumerBuilder}
*/
public interface TopicConsumerBuilder<T> {
/**
* Configure the priority level of this topic.
*
* @see ConsumerBuilder#priorityLevel(int)
*
* @param priorityLevel the priority of this topic
* @return the {@link TopicConsumerBuilder} instance
*/
TopicConsumerBuilder<T> priorityLevel(int priorityLevel);

/**
* Complete the configuration of the topic specific options and return control back to the
* {@link ConsumerBuilder} instance.
*
* @return the {@link ConsumerBuilder} instance
*/
ConsumerBuilder<T> build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicConsumerBuilder;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
Expand Down Expand Up @@ -537,4 +539,19 @@ public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
return this;
}

@Override
public TopicConsumerBuilder<T> topicConfiguration(String topicNameOrPattern) {
checkArgument(StringUtils.isNotBlank(topicNameOrPattern), "topicNameOrPattern cannot be blank");
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.of(topicNameOrPattern, conf);
conf.getTopicConfigurations().add(topicConf);
return new TopicConsumerBuilderImpl<>(this, topicConf);
}

@Override
public ConsumerBuilder<T> topicConfiguration(String topicNameOrPattern,
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
builderConsumer.accept(topicConfiguration(topicNameOrPattern));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
Expand Down Expand Up @@ -146,6 +148,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final NegativeAcksTracker negativeAcksTracker;

protected final ConsumerStatsRecorder stats;
@Getter(AccessLevel.PACKAGE)
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private volatile BatchMessageIdImpl startMessageId;
Expand Down Expand Up @@ -266,7 +269,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
this.parentConsumerHasListener = parentConsumerHasListener;
this.priorityLevel = conf.getPriorityLevel();
this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import lombok.RequiredArgsConstructor;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.TopicConsumerBuilder;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;

@RequiredArgsConstructor
class TopicConsumerBuilderImpl<T> implements TopicConsumerBuilder<T> {
private final ConsumerBuilder<T> consumerBuilder;
private final TopicConsumerConfigurationData topicConf;

@Override
public TopicConsumerBuilder<T> priorityLevel(int priorityLevel) {
checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
topicConf.setPriorityLevel(priorityLevel);
return this;
}

@Override
public ConsumerBuilder<T> build() {
return consumerBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -164,6 +166,20 @@ public int getMaxPendingChuckedMessage() {

private boolean autoScaledReceiverQueueSizeEnabled = false;

private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();

public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) {
return topicConfigurations.stream()
.filter(topicConf -> topicConf.matchesTopicName(topicName))
.findFirst()
.orElseGet(() -> TopicConsumerConfigurationData.of(topicName, this));
}

public void setTopicConfigurations(List<TopicConsumerConfigurationData> topicConfigurations) {
checkArgument(topicConfigurations != null, "topicConfigurations should not be null.");
this.topicConfigurations = topicConfigurations;
}

public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;

import java.io.Serializable;
import java.util.regex.Pattern;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TopicConsumerConfigurationData implements Serializable {
private static final long serialVersionUID = 1L;

@NonNull
private Pattern topicsPattern;
private int priorityLevel;

public boolean matchesTopicName(String topicName) {
if (topicsPattern == null || topicName == null) {
return false;
}
return topicsPattern.matcher(topicName).matches();
}

public static TopicConsumerConfigurationData of(@NonNull String topicNameOrPattern,
ConsumerConfigurationData<?> conf) {
return of(topicNameOrPattern, conf.getPriorityLevel());
}

public static TopicConsumerConfigurationData of(@NonNull String topicNameOrPattern, int priorityLevel) {
Pattern topicsPattern = Pattern.compile(topicNameOrPattern);
return new TopicConsumerConfigurationData(topicsPattern, priorityLevel);
}

public static void main(String[] args) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -31,13 +34,16 @@
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -338,4 +344,29 @@ public void testStartPaused() {
consumerBuilderImpl.startPaused(true);
verify(consumerBuilderImpl.getConf()).setStartPaused(true);
}

@Test
public void testTopicConsumerBuilder() {
List<TopicConsumerConfigurationData> topicConsumerConfigurationDataList = new ArrayList<>();
when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList);

ConsumerBuilder<?> consumerBuilder = consumerBuilderImpl.topicConfiguration("foo").priorityLevel(1).build();

assertThat(consumerBuilder).isSameAs(consumerBuilderImpl);
assertThat(topicConsumerConfigurationDataList).hasSize(1);
TopicConsumerConfigurationData topicConsumerConfigurationData = topicConsumerConfigurationDataList.get(0);
assertThat(topicConsumerConfigurationData.getTopicsPattern().pattern()).isEqualTo("foo");
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
}

@DataProvider(name = "nullOrBlankTopicPatterns")
public Object[] nullOrBlankTopicPatterns() {
return new Object[]{" ", "", null};
}

@Test(dataProvider = "nullOrBlankTopicPatterns")
public void testTopicConsumerBuilderBlankPattern(String topicNameOrPattern) {
assertThatIllegalArgumentException()
.isThrownBy(() -> consumerBuilderImpl.topicConfiguration(topicNameOrPattern));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand All @@ -26,6 +27,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -39,6 +42,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand All @@ -47,23 +51,28 @@
import org.testng.annotations.Test;

public class ConsumerImplTest {
private final String topic = "non-persistent://tenant/ns1/my-topic";

private ExecutorProvider executorProvider;
private ExecutorService internalExecutor;
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
private ConsumerConfigurationData<byte[]> consumerConf;

@BeforeMethod(alwaysRun = true)
public void setUp() {
consumerConf = new ConsumerConfigurationData<>();
createConsumer(consumerConf);
}

private void createConsumer(ConsumerConfigurationData consumerConf) {
executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
internalExecutor = Executors.newSingleThreadScheduledExecutor();
consumerConf = new ConsumerConfigurationData<>();

PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>();
String topic = "non-persistent://tenant/ns1/my-topic";
CompletableFuture<Consumer<byte[]>> subscribeFuture = new CompletableFuture<>();

consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
Expand Down Expand Up @@ -239,4 +248,15 @@ public void testMaxReceiverQueueSize() {
Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 100);
Assert.assertEquals(consumer.getAvailablePermits(), permits + 100);
}

@Test
public void testTopicPriorityLevel() {
ConsumerConfigurationData<Object> consumerConf = new ConsumerConfigurationData<>();
consumerConf.getTopicConfigurations().add(
TopicConsumerConfigurationData.of(topic, 1));

createConsumer(consumerConf);

assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}
}
Loading

0 comments on commit cb82d6c

Please sign in to comment.