Skip to content

Commit dcbdf56

Browse files
Dave MaughanGleiphir2769
Dave Maughan
authored andcommitted
[feature][client] PIP-184: Topic specific consumer priorityLevel (apache#16715)
1 parent 009e375 commit dcbdf56

13 files changed

+477
-6
lines changed

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

+34
Original file line numberDiff line numberDiff line change
@@ -813,4 +813,38 @@ public interface ConsumerBuilder<T> extends Cloneable {
813813
* @param enabled whether to enable AutoScaledReceiverQueueSize.
814814
*/
815815
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
816+
817+
/**
818+
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
819+
*
820+
* @param topicName a topic name
821+
* @return a {@link TopicConsumerBuilder} instance
822+
*/
823+
TopicConsumerBuilder<T> topicConfiguration(String topicName);
824+
825+
/**
826+
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
827+
*
828+
* @param topicName a topic name
829+
* @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance
830+
*/
831+
ConsumerBuilder<T> topicConfiguration(String topicName,
832+
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
833+
834+
/**
835+
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
836+
*
837+
* @param topicsPattern a regular expression to match a topic name
838+
* @return a {@link TopicConsumerBuilder} instance
839+
*/
840+
TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern);
841+
842+
/**
843+
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
844+
*
845+
* @param topicsPattern a regular expression to match a topic name
846+
* @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance
847+
*/
848+
ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
849+
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
816850
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
/**
22+
* {@link TopicConsumerBuilder} is used to configure topic specific options to override those set at the
23+
* {@link ConsumerBuilder} level.
24+
*
25+
* @see ConsumerBuilder#topicConfiguration(String)
26+
*
27+
* @param <T> the type of the value in the {@link ConsumerBuilder}
28+
*/
29+
public interface TopicConsumerBuilder<T> {
30+
/**
31+
* Configure the priority level of this topic.
32+
*
33+
* @see ConsumerBuilder#priorityLevel(int)
34+
*
35+
* @param priorityLevel the priority of this topic
36+
* @return the {@link TopicConsumerBuilder} instance
37+
*/
38+
TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
39+
40+
/**
41+
* Complete the configuration of the topic specific options and return control back to the
42+
* {@link ConsumerBuilder} instance.
43+
*
44+
* @return the {@link ConsumerBuilder} instance
45+
*/
46+
ConsumerBuilder<T> build();
47+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java

+30
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
5353
import org.apache.pulsar.client.api.SubscriptionMode;
5454
import org.apache.pulsar.client.api.SubscriptionType;
55+
import org.apache.pulsar.client.api.TopicConsumerBuilder;
5556
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
5657
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
58+
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
5759
import org.apache.pulsar.client.util.RetryMessageUtil;
5860
import org.apache.pulsar.common.naming.TopicName;
5961
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -537,4 +539,32 @@ public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
537539
conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
538540
return this;
539541
}
542+
543+
@Override
544+
public TopicConsumerBuilder<T> topicConfiguration(String topicName) {
545+
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf);
546+
conf.getTopicConfigurations().add(topicConf);
547+
return new TopicConsumerBuilderImpl<>(this, topicConf);
548+
}
549+
550+
@Override
551+
public ConsumerBuilder<T> topicConfiguration(String topicName,
552+
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
553+
builderConsumer.accept(topicConfiguration(topicName));
554+
return this;
555+
}
556+
557+
@Override
558+
public TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern) {
559+
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicsPattern(topicsPattern, conf);
560+
conf.getTopicConfigurations().add(topicConf);
561+
return new TopicConsumerBuilderImpl<>(this, topicConf);
562+
}
563+
564+
@Override
565+
public ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
566+
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
567+
builderConsumer.accept(topicConfiguration(topicsPattern));
568+
return this;
569+
}
540570
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import java.util.concurrent.locks.ReentrantReadWriteLock;
6262
import java.util.function.Function;
6363
import java.util.stream.Collectors;
64+
import lombok.AccessLevel;
65+
import lombok.Getter;
6466
import org.apache.commons.lang3.StringUtils;
6567
import org.apache.pulsar.client.api.Consumer;
6668
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -146,6 +148,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
146148
private final NegativeAcksTracker negativeAcksTracker;
147149

148150
protected final ConsumerStatsRecorder stats;
151+
@Getter(AccessLevel.PACKAGE)
149152
private final int priorityLevel;
150153
private final SubscriptionMode subscriptionMode;
151154
private volatile BatchMessageIdImpl startMessageId;
@@ -266,7 +269,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
266269
this.partitionIndex = partitionIndex;
267270
this.hasParentConsumer = hasParentConsumer;
268271
this.parentConsumerHasListener = parentConsumerHasListener;
269-
this.priorityLevel = conf.getPriorityLevel();
272+
this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
270273
this.readCompacted = conf.isReadCompacted();
271274
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
272275
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import static com.google.common.base.Preconditions.checkArgument;
22+
import lombok.RequiredArgsConstructor;
23+
import org.apache.pulsar.client.api.ConsumerBuilder;
24+
import org.apache.pulsar.client.api.TopicConsumerBuilder;
25+
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
26+
27+
@RequiredArgsConstructor
28+
class TopicConsumerBuilderImpl<T> implements TopicConsumerBuilder<T> {
29+
private final ConsumerBuilder<T> consumerBuilder;
30+
private final TopicConsumerConfigurationData topicConf;
31+
32+
@Override
33+
public TopicConsumerBuilder<T> priorityLevel(int priorityLevel) {
34+
checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
35+
topicConf.setPriorityLevel(priorityLevel);
36+
return this;
37+
}
38+
39+
@Override
40+
public ConsumerBuilder<T> build() {
41+
return consumerBuilder;
42+
}
43+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java

+16
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.fasterxml.jackson.annotation.JsonIgnore;
2323
import com.google.common.collect.Sets;
2424
import java.io.Serializable;
25+
import java.util.ArrayList;
26+
import java.util.List;
2527
import java.util.Map;
2628
import java.util.Set;
2729
import java.util.SortedMap;
@@ -164,6 +166,20 @@ public int getMaxPendingChuckedMessage() {
164166

165167
private boolean autoScaledReceiverQueueSizeEnabled = false;
166168

169+
private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();
170+
171+
public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) {
172+
return topicConfigurations.stream()
173+
.filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName))
174+
.findFirst()
175+
.orElseGet(() -> TopicConsumerConfigurationData.ofTopicName(topicName, this));
176+
}
177+
178+
public void setTopicConfigurations(List<TopicConsumerConfigurationData> topicConfigurations) {
179+
checkArgument(topicConfigurations != null, "topicConfigurations should not be null.");
180+
this.topicConfigurations = topicConfigurations;
181+
}
182+
167183
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
168184
checkArgument(interval > 0, "interval needs to be > 0");
169185
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl.conf;
20+
21+
import java.io.Serializable;
22+
import java.util.regex.Pattern;
23+
import lombok.AllArgsConstructor;
24+
import lombok.Data;
25+
import lombok.NoArgsConstructor;
26+
import lombok.NonNull;
27+
import lombok.RequiredArgsConstructor;
28+
29+
@Data
30+
@NoArgsConstructor
31+
@AllArgsConstructor
32+
public class TopicConsumerConfigurationData implements Serializable {
33+
private static final long serialVersionUID = 1L;
34+
35+
private TopicNameMatcher topicNameMatcher;
36+
private int priorityLevel;
37+
38+
public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern, int priorityLevel) {
39+
return of(new TopicNameMatcher.TopicsPattern(topicsPattern), priorityLevel);
40+
}
41+
42+
public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull Pattern topicsPattern,
43+
ConsumerConfigurationData<?> conf) {
44+
return ofTopicsPattern(topicsPattern, conf.getPriorityLevel());
45+
}
46+
47+
public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName, int priorityLevel) {
48+
return of(new TopicNameMatcher.TopicName(topicName), priorityLevel);
49+
}
50+
51+
public static TopicConsumerConfigurationData ofTopicName(@NonNull String topicName,
52+
ConsumerConfigurationData<?> conf) {
53+
return ofTopicName(topicName, conf.getPriorityLevel());
54+
}
55+
56+
static TopicConsumerConfigurationData of(@NonNull TopicNameMatcher topicNameMatcher, int priorityLevel) {
57+
return new TopicConsumerConfigurationData(topicNameMatcher, priorityLevel);
58+
}
59+
60+
public interface TopicNameMatcher extends Serializable {
61+
boolean matches(String topicName);
62+
63+
@RequiredArgsConstructor
64+
class TopicsPattern implements TopicNameMatcher {
65+
@NonNull
66+
private final Pattern topicsPattern;
67+
68+
@Override
69+
public boolean matches(String topicName) {
70+
return topicsPattern.matcher(topicName).matches();
71+
}
72+
}
73+
74+
@RequiredArgsConstructor
75+
class TopicName implements TopicNameMatcher {
76+
@NonNull
77+
private final String topicName;
78+
79+
@Override
80+
public boolean matches(String topicName) {
81+
return this.topicName.equals(topicName);
82+
}
83+
}
84+
}
85+
}

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.mockito.Mockito.mock;
2223
import static org.mockito.Mockito.verify;
2324
import static org.mockito.Mockito.when;
2425
import static org.testng.Assert.assertNotNull;
26+
import java.util.ArrayList;
2527
import java.util.Arrays;
2628
import java.util.HashMap;
2729
import java.util.List;
@@ -31,12 +33,14 @@
3133
import java.util.regex.Pattern;
3234
import org.apache.pulsar.client.api.BatchReceivePolicy;
3335
import org.apache.pulsar.client.api.Consumer;
36+
import org.apache.pulsar.client.api.ConsumerBuilder;
3437
import org.apache.pulsar.client.api.DeadLetterPolicy;
3538
import org.apache.pulsar.client.api.PulsarClientException;
3639
import org.apache.pulsar.client.api.Schema;
3740
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3841
import org.apache.pulsar.client.api.SubscriptionMode;
3942
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
43+
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
4044
import org.testng.annotations.BeforeMethod;
4145
import org.testng.annotations.Test;
4246

@@ -338,4 +342,18 @@ public void testStartPaused() {
338342
consumerBuilderImpl.startPaused(true);
339343
verify(consumerBuilderImpl.getConf()).setStartPaused(true);
340344
}
345+
346+
@Test
347+
public void testTopicConsumerBuilder() {
348+
List<TopicConsumerConfigurationData> topicConsumerConfigurationDataList = new ArrayList<>();
349+
when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList);
350+
351+
ConsumerBuilder<?> consumerBuilder = consumerBuilderImpl.topicConfiguration(Pattern.compile("foo")).priorityLevel(1).build();
352+
353+
assertThat(consumerBuilder).isSameAs(consumerBuilderImpl);
354+
assertThat(topicConsumerConfigurationDataList).hasSize(1);
355+
TopicConsumerConfigurationData topicConsumerConfigurationData = topicConsumerConfigurationDataList.get(0);
356+
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
357+
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
358+
}
341359
}

0 commit comments

Comments
 (0)