Skip to content

Commit 6b8ebbd

Browse files
authoredNov 24, 2021
[PIP-105] Part-1 Support add subscription properties (#12869)
### Motivation See #12269 I divided this PIP into two parts, the first part supports setting properties for subscriptions The second part will support plug-in loading, filtering, etc. ### Modifications Support add subscription properties
1 parent c94c0aa commit 6b8ebbd

File tree

13 files changed

+336
-28
lines changed

13 files changed

+336
-28
lines changed
 

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -1010,20 +1010,22 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
10101010
new SubscriptionNotFoundException(
10111011
"Subscription does not exist"));
10121012
}
1013-
1013+
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
1014+
.subscriptionName(subscriptionName)
1015+
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
1016+
.consumerName(consumerName).isDurable(isDurable).subType(subType)
1017+
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
1018+
.initialPosition(initialPosition)
1019+
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
1020+
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
1021+
.subscriptionProperties(SubscriptionOption.getPropertiesMap(
1022+
subscribe.getSubscriptionPropertiesList()))
1023+
.build();
10141024
if (schema != null) {
10151025
return topic.addSchemaIfIdleOrCheckCompatible(schema)
1016-
.thenCompose(v -> topic.subscribe(
1017-
ServerCnx.this, subscriptionName, consumerId,
1018-
subType, priorityLevel, consumerName, isDurable,
1019-
startMessageId, metadata,
1020-
readCompacted, initialPosition, startMessageRollbackDurationSec,
1021-
isReplicated, keySharedMeta));
1026+
.thenCompose(v -> topic.subscribe(option));
10221027
} else {
1023-
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
1024-
subType, priorityLevel, consumerName, isDurable,
1025-
startMessageId, metadata, readCompacted, initialPosition,
1026-
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
1028+
return topic.subscribe(option);
10271029
}
10281030
})
10291031
.thenAccept(consumer -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
import lombok.Builder;
27+
import lombok.Getter;
28+
import org.apache.pulsar.client.api.MessageId;
29+
import org.apache.pulsar.common.api.proto.CommandSubscribe;
30+
import org.apache.pulsar.common.api.proto.KeySharedMeta;
31+
import org.apache.pulsar.common.api.proto.KeyValue;
32+
33+
@Getter
34+
@Builder
35+
public class SubscriptionOption {
36+
private final TransportCnx cnx;
37+
private String subscriptionName;
38+
private long consumerId;
39+
private CommandSubscribe.SubType subType;
40+
private int priorityLevel;
41+
private String consumerName;
42+
private boolean isDurable;
43+
private MessageId startMessageId;
44+
private Map<String, String> metadata;
45+
private boolean readCompacted;
46+
private CommandSubscribe.InitialPosition initialPosition;
47+
private long startMessageRollbackDurationSec;
48+
private boolean replicatedSubscriptionStateArg;
49+
private KeySharedMeta keySharedMeta;
50+
private Optional<Map<String, String>> subscriptionProperties;
51+
52+
public static Optional<Map<String, String>> getPropertiesMap(List<KeyValue> list) {
53+
if (list == null) {
54+
return Optional.of(Collections.emptyMap());
55+
}
56+
return Optional.of(list.stream().collect(Collectors.toMap(
57+
KeyValue::getKey, KeyValue::getValue, (key1, key2) -> key1)));
58+
}
59+
}

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java

+8
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ default boolean isMarkerMessage() {
129129
*/
130130
void recordAddLatency(long latency, TimeUnit unit);
131131

132+
@Deprecated
132133
CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType,
133134
int priorityLevel, String consumerName, boolean isDurable,
134135
MessageId startMessageId,
@@ -137,6 +138,13 @@ CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName,
137138
long startMessageRollbackDurationSec, boolean replicateSubscriptionState,
138139
KeySharedMeta keySharedMeta);
139140

141+
/**
142+
* Subscribe a topic.
143+
* @param option
144+
* @return
145+
*/
146+
CompletableFuture<Consumer> subscribe(SubscriptionOption option);
147+
140148
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
141149
boolean replicateSubscriptionState);
142150

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

+24
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.pulsar.broker.service.Replicator;
5656
import org.apache.pulsar.broker.service.StreamingStats;
5757
import org.apache.pulsar.broker.service.Subscription;
58+
import org.apache.pulsar.broker.service.SubscriptionOption;
5859
import org.apache.pulsar.broker.service.Topic;
5960
import org.apache.pulsar.broker.service.TransportCnx;
6061
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
@@ -224,6 +225,16 @@ public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean
224225
return CompletableFuture.completedFuture(null);
225226
}
226227

228+
@Override
229+
public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
230+
return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
231+
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
232+
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
233+
option.isReadCompacted(), option.getInitialPosition(),
234+
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
235+
option.getKeySharedMeta());
236+
}
237+
227238
@Override
228239
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
229240
SubType subType, int priorityLevel, String consumerName,
@@ -232,6 +243,19 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
232243
InitialPosition initialPosition,
233244
long resetStartMessageBackInSec, boolean replicateSubscriptionState,
234245
KeySharedMeta keySharedMeta) {
246+
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
247+
isDurable, startMessageId, metadata, readCompacted, initialPosition, resetStartMessageBackInSec,
248+
replicateSubscriptionState, keySharedMeta);
249+
}
250+
251+
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
252+
long consumerId, SubType subType, int priorityLevel,
253+
String consumerName, boolean isDurable,
254+
MessageId startMessageId, Map<String, String> metadata,
255+
boolean readCompacted, InitialPosition initialPosition,
256+
long resetStartMessageBackInSec,
257+
boolean replicateSubscriptionState,
258+
KeySharedMeta keySharedMeta) {
235259

236260
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
237261
final CompletableFuture<Consumer> future = new CompletableFuture<>();

‎pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.base.MoreObjects;
2424
import java.util.Collections;
25+
import java.util.HashMap;
2526
import java.util.LinkedHashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -116,6 +117,7 @@ public class PersistentSubscription implements Subscription {
116117
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
117118
private volatile Position lastMarkDeleteForTransactionMarker;
118119
private final PendingAckHandle pendingAckHandle;
120+
private Map<String, String> subscriptionProperties;
119121

120122
private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
121123
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
@@ -143,14 +145,21 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
143145
}
144146

145147
public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
146-
boolean replicated) {
148+
boolean replicated) {
149+
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
150+
}
151+
152+
public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
153+
boolean replicated, Map<String, String> subscriptionProperties) {
147154
this.topic = topic;
148155
this.cursor = cursor;
149156
this.topicName = topic.getName();
150157
this.subName = subscriptionName;
151158
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
152159
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
153160
this.setReplicated(replicated);
161+
this.subscriptionProperties = subscriptionProperties == null
162+
? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties);
154163
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
155164
&& !checkTopicIsEventsNames(TopicName.get(topicName))) {
156165
this.pendingAckHandle = new PendingAckHandleImpl(this);
@@ -1141,13 +1150,16 @@ void topicTerminated() {
11411150
}
11421151
}
11431152

1153+
public Map<String, String> getSubscriptionProperties() {
1154+
return subscriptionProperties;
1155+
}
1156+
11441157
/**
11451158
* Return a merged map that contains the cursor properties specified by used
11461159
* (eg. when using compaction subscription) and the subscription properties.
11471160
*/
11481161
protected Map<String, Long> mergeCursorProperties(Map<String, Long> userProperties) {
1149-
Map<String, Long> baseProperties = isReplicated() ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES
1150-
: NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
1162+
Map<String, Long> baseProperties = getBaseCursorProperties(isReplicated());
11511163

11521164
if (userProperties.isEmpty()) {
11531165
// Use only the static instance in the common case

0 commit comments

Comments
 (0)
Please sign in to comment.