Skip to content

Commit 2373ca3

Browse files
codelipenghuijiazhai
authored andcommitted
PIP-34 Key_Shared subscription core implementation. (#4079)
## Motivation This is a core implementation for PIP-34 and there is a task tracker ISSUE-4077 for this PIP ## Modifications Add a new subscription type named Key_Shared Add PersistentStickyKeyDispatcherMultipleConsumers to handle the message dispatch Add a simple hash range based consumer selector Verifying this change Add new unit tests to verifying the hash range selector and Key_Shared mode message consume. * PIP-34 Key_Shared subscription core implementation. * PIP-34 Add more unit test. 1.test redelivery with Key_Shared subscription 2.test none key dispatch with Key_Shared subscription 3.test ordering key dispatch with Key_Shared subscription * PIP-34 Fix alignment issue of Pulsar.proto * PIP-34 Fix TODO: format * PIP-34 Fix hash and ordering key issues * PIP-34 documentation for Key_Shared subscription * PIP-34 Fix cpp test issue. * PIP-34 Fix cpp format issue.
1 parent 1026b07 commit 2373ca3

29 files changed

+1229
-90
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java

+9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
2222
import org.apache.pulsar.common.api.proto.PulsarApi;
23+
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
2324

2425
/**
2526
* Base type of exception thrown by Pulsar Broker Service
@@ -146,6 +147,12 @@ public AlreadyRunningException(String msg) {
146147
}
147148
}
148149

150+
public static class ConsumerAssignException extends BrokerServiceException {
151+
public ConsumerAssignException(String msg) {
152+
super(msg);
153+
}
154+
}
155+
149156
public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
150157
if (t instanceof ServerMetadataException) {
151158
return PulsarApi.ServerError.MetadataError;
@@ -166,6 +173,8 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
166173
return PulsarApi.ServerError.ServiceNotReady;
167174
} else if (t instanceof IncompatibleSchemaException) {
168175
return PulsarApi.ServerError.IncompatibleSchema;
176+
} else if (t instanceof ConsumerAssignException) {
177+
return ServerError.ConsumerAssignError;
169178
} else {
170179
return PulsarApi.ServerError.UnknownError;
171180
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
145145
stats.setClientVersion(cnx.getClientVersion());
146146
stats.metadata = this.metadata;
147147

148-
if (subType == SubType.Shared) {
148+
if (subType == SubType.Shared || subType == SubType.Key_Shared) {
149149
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
150150
} else {
151151
// We don't need to keep track of pending acks if the subscription is not shared
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
22+
import org.apache.pulsar.common.util.Murmur3_32Hash;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Map.Entry;
27+
import java.util.concurrent.ConcurrentSkipListMap;
28+
29+
/**
30+
* This is a consumer selector based fixed hash range.
31+
*
32+
* 1.Each consumer serves a fixed range of hash value
33+
* 2.The whole range of hash value could be covered by all the consumers.
34+
* 3.Once a consumer is removed, the left consumers could still serve the whole range.
35+
*
36+
* Initializing with a fixed hash range, by default 2 << 5.
37+
* First consumer added, hash range looks like:
38+
*
39+
* 0 -> 65536(consumer-1)
40+
*
41+
* Second consumer added, will find a biggest range to split:
42+
*
43+
* 0 -> 32768(consumer-2) -> 65536(consumer-1)
44+
*
45+
* While a consumer removed, The range for this consumer will be taken over
46+
* by other consumer, consumer-2 removed:
47+
*
48+
* 0 -> 65536(consumer-1)
49+
*
50+
* In this approach use skip list map to maintain the hash range and consumers.
51+
*
52+
* Select consumer will return the ceiling key of message key hashcode % range size.
53+
*
54+
*/
55+
public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
56+
57+
public static final int DEFAULT_RANGE_SIZE = 2 << 15;
58+
59+
private final int rangeSize;
60+
61+
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
62+
private final Map<Consumer, Integer> consumerRange;
63+
64+
public HashRangeStickyKeyConsumerSelector() {
65+
this(DEFAULT_RANGE_SIZE);
66+
}
67+
68+
public HashRangeStickyKeyConsumerSelector(int rangeSize) {
69+
if (rangeSize < 2) {
70+
throw new IllegalArgumentException("range size must greater than 2");
71+
}
72+
if (!is2Power(rangeSize)) {
73+
throw new IllegalArgumentException("range size must be nth power with 2");
74+
}
75+
this.rangeMap = new ConcurrentSkipListMap<>();
76+
this.consumerRange = new HashMap<>();
77+
this.rangeSize = rangeSize;
78+
}
79+
80+
@Override
81+
public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignException {
82+
if (rangeMap.size() == 0) {
83+
rangeMap.put(rangeSize, consumer);
84+
consumerRange.put(consumer, rangeSize);
85+
} else {
86+
splitRange(findBiggestRange(), consumer);
87+
}
88+
}
89+
90+
@Override
91+
public synchronized void removeConsumer(Consumer consumer) {
92+
Integer removeRange = consumerRange.get(consumer);
93+
if (removeRange != null) {
94+
if (removeRange == rangeSize && rangeMap.size() > 1) {
95+
Consumer lowerConsumer = rangeMap.lowerEntry(removeRange).getValue();
96+
rangeMap.put(removeRange, lowerConsumer);
97+
consumerRange.put(lowerConsumer, removeRange);
98+
} else {
99+
rangeMap.remove(removeRange);
100+
consumerRange.remove(consumer);
101+
}
102+
}
103+
}
104+
105+
@Override
106+
public Consumer select(byte[] stickyKey) {
107+
if (rangeMap.size() > 0) {
108+
int slot = Murmur3_32Hash.getInstance().makeHash(stickyKey) % rangeSize;
109+
return rangeMap.ceilingEntry(slot).getValue();
110+
} else {
111+
return null;
112+
}
113+
}
114+
115+
private int findBiggestRange() {
116+
int slots = 0;
117+
int busiestRange = rangeSize;
118+
for (Entry<Integer, Consumer> entry : rangeMap.entrySet()) {
119+
Integer lowerKey = rangeMap.lowerKey(entry.getKey());
120+
if (lowerKey == null) {
121+
lowerKey = 0;
122+
}
123+
if (entry.getKey() - lowerKey > slots) {
124+
slots = entry.getKey() - lowerKey;
125+
busiestRange = entry.getKey();
126+
}
127+
}
128+
return busiestRange;
129+
}
130+
131+
private void splitRange(int range, Consumer targetConsumer) throws ConsumerAssignException {
132+
Integer lowerKey = rangeMap.lowerKey(range);
133+
if (lowerKey == null) {
134+
lowerKey = 0;
135+
}
136+
if (range - lowerKey <= 1) {
137+
throw new ConsumerAssignException("No more range can assigned to new consumer, assigned consumers "
138+
+ rangeMap.size());
139+
}
140+
int splitRange = range - ((range - lowerKey) >> 1);
141+
rangeMap.put(splitRange, targetConsumer);
142+
consumerRange.put(targetConsumer, splitRange);
143+
}
144+
145+
private boolean is2Power(int num) {
146+
if(num < 2) return false;
147+
return (num & num - 1) == 0;
148+
}
149+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
22+
23+
public interface StickyKeyConsumerSelector {
24+
25+
/**
26+
* Add a new consumer
27+
* @param consumer new consumer
28+
*/
29+
void addConsumer(Consumer consumer) throws ConsumerAssignException;
30+
31+
/**
32+
* Remove the consumer
33+
* @param consumer consumer to be removed
34+
*/
35+
void removeConsumer(Consumer consumer);
36+
37+
/**
38+
* Select a consumer by sticky key
39+
*
40+
* @param stickyKey sticky key
41+
* @return consumer
42+
*/
43+
Consumer select(byte[] stickyKey);
44+
45+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@
6565
*/
6666
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
6767

68-
private final PersistentTopic topic;
69-
private final ManagedCursor cursor;
68+
protected final PersistentTopic topic;
69+
protected final ManagedCursor cursor;
7070

7171
private CompletableFuture<Void> closeFuture = null;
7272
LongPairSet messagesToReplay = new ConcurrentSortedLongPairSet(128, 2);
@@ -75,9 +75,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
7575
private boolean havePendingRead = false;
7676
private boolean havePendingReplayRead = false;
7777
private boolean shouldRewindBeforeReadingOrReplaying = false;
78-
private final String name;
78+
protected final String name;
7979

80-
private int totalAvailablePermits = 0;
80+
protected int totalAvailablePermits = 0;
8181
private int readBatchSize;
8282
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
8383
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER =
@@ -87,8 +87,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
8787
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
8888
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
8989
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs");
90-
private final ServiceConfiguration serviceConfig;
91-
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
90+
protected final ServiceConfiguration serviceConfig;
91+
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
9292

9393
enum ReadType {
9494
Normal, Replay
@@ -374,9 +374,6 @@ public SubType getType() {
374374
@Override
375375
public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
376376
ReadType readType = (ReadType) ctx;
377-
int start = 0;
378-
int entriesToDispatch = entries.size();
379-
380377
if (readType == ReadType.Normal) {
381378
havePendingRead = false;
382379
} else {
@@ -407,8 +404,17 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
407404
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
408405
}
409406

407+
sendMessagesToConsumers(readType, entries);
408+
409+
readMoreEntries();
410+
}
411+
412+
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
413+
int start = 0;
414+
int entriesToDispatch = entries.size();
410415
long totalMessagesSent = 0;
411416
long totalBytesSent = 0;
417+
412418
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
413419
Consumer c = getNextConsumer();
414420
if (c == null) {
@@ -421,8 +427,8 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
421427

422428
// round-robin dispatch batch size for this consumer
423429
int messagesForC = Math.min(
424-
Math.min(entriesToDispatch, c.getAvailablePermits()),
425-
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
430+
Math.min(entriesToDispatch, c.getAvailablePermits()),
431+
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
426432

427433
if (messagesForC > 0) {
428434

@@ -465,8 +471,6 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
465471
entry.release();
466472
});
467473
}
468-
469-
readMoreEntries();
470474
}
471475

472476
@Override
@@ -531,7 +535,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
531535
*
532536
* @return
533537
*/
534-
private boolean isAtleastOneConsumerAvailable() {
538+
protected boolean isAtleastOneConsumerAvailable() {
535539
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
536540
// abort read if no consumers are connected or if disconnect is initiated
537541
return false;

0 commit comments

Comments
 (0)