Skip to content

Commit 2f831f0

Browse files
committed
Fixes apache#6555 Send Key hash range change events to consumers in Key_Shared subscription.
1 parent 4cf4b85 commit 2f831f0

23 files changed

+917
-12
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java

+33
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@
2222
import java.util.ArrayList;
2323
import java.util.Collections;
2424
import java.util.Comparator;
25+
import java.util.HashSet;
2526
import java.util.LinkedHashMap;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.NavigableMap;
30+
import java.util.Set;
2931
import java.util.TreeMap;
32+
import java.util.concurrent.ConcurrentSkipListMap;
3033
import java.util.concurrent.locks.ReadWriteLock;
3134
import java.util.concurrent.locks.ReentrantReadWriteLock;
35+
import java.util.stream.Collectors;
36+
import org.apache.commons.collections4.CollectionUtils;
3237
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
3338
import org.apache.pulsar.client.api.Range;
39+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
3440
import org.apache.pulsar.common.util.Murmur3_32Hash;
3541

3642
/**
@@ -74,6 +80,7 @@ public void addConsumer(Consumer consumer) throws ConsumerAssignException {
7480
}
7581
});
7682
}
83+
notifyActiveConsumerChanged();
7784
} finally {
7885
rwLock.writeLock().unlock();
7986
}
@@ -99,6 +106,7 @@ public void removeConsumer(Consumer consumer) {
99106
}
100107
});
101108
}
109+
notifyActiveConsumerChanged();
102110
} finally {
103111
rwLock.writeLock().unlock();
104112
}
@@ -148,4 +156,29 @@ public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
148156
Map<Integer, List<Consumer>> getRangeConsumer() {
149157
return Collections.unmodifiableMap(hashRing);
150158
}
159+
160+
@Override
161+
public StickyKeyConsumerPredicate generateSpecialPredicate(final Consumer consumer){
162+
NavigableMap<Integer, List<String>> cpHashRing = new ConcurrentSkipListMap<>();
163+
for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
164+
if (CollectionUtils.isEmpty(entry.getValue())) {
165+
continue;
166+
}
167+
cpHashRing.put(entry.getKey(), entry.getValue()
168+
.stream()
169+
.map(v -> v == consumer ? StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK
170+
: StickyKeyConsumerPredicate.OTHER_CONSUMER_MARK)
171+
.collect(Collectors.toList())
172+
);
173+
}
174+
return new StickyKeyConsumerPredicate.Predicate4ConsistentHashingStickyKeyConsumerSelector(cpHashRing);
175+
}
176+
177+
private void notifyActiveConsumerChanged() {
178+
// TODO add configuration for this events in future
179+
Set<Consumer> consumerSet = new HashSet<>(hashRing.values().stream()
180+
.flatMap(list -> list.stream()).collect(Collectors.toSet()));
181+
consumerSet.forEach(consumer ->
182+
consumer.notifyActiveConsumerChange(generateSpecialPredicate(consumer).encode()));
183+
}
151184
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

+8
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ void notifyActiveConsumerChange(Consumer activeConsumer) {
204204
cnx.getCommandSender().sendActiveConsumerChange(consumerId, this == activeConsumer);
205205
}
206206

207+
void notifyActiveConsumerChange(String keySharedProps) {
208+
if (log.isDebugEnabled()) {
209+
log.debug("notify consumer {} - that [{}] for subscription {} has new keySharedProps: {}",
210+
consumerId, topicName, subscription.getName(), keySharedProps);
211+
}
212+
cnx.getCommandSender().sendActiveConsumerChange(consumerId, keySharedProps);
213+
}
214+
207215
public boolean readCompacted() {
208216
return readCompacted;
209217
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentSkipListMap;
2828
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
2929
import org.apache.pulsar.client.api.Range;
30+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
3031

3132
/**
3233
* This is a consumer selector based fixed hash range.
@@ -83,8 +84,15 @@ public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignExc
8384
rangeMap.put(rangeSize, consumer);
8485
consumerRange.put(consumer, rangeSize);
8586
} else {
86-
splitRange(findBiggestRange(), consumer);
87+
// TODO "findBiggestEntry()" is better than "findBiggestRange()" for find effected consumer.
88+
int biggestRange = findBiggestRange();
89+
splitRange(biggestRange, consumer);
90+
// notify change for effected consumer.
91+
Consumer effectedConsumer = rangeMap.get(biggestRange);
92+
effectedConsumer.notifyActiveConsumerChange(generateSpecialPredicate(effectedConsumer).encode());
8793
}
94+
// notify new-active-consumer change
95+
consumer.notifyActiveConsumerChange(generateSpecialPredicate(consumer).encode());
8896
}
8997

9098
@Override
@@ -96,9 +104,20 @@ public synchronized void removeConsumer(Consumer consumer) {
96104
rangeMap.put(removeRange, lowerEntry.getValue());
97105
rangeMap.remove(lowerEntry.getKey());
98106
consumerRange.put(lowerEntry.getValue(), removeRange);
107+
// notify change for effected consumer
108+
lowerEntry.getValue().notifyActiveConsumerChange(
109+
generateSpecialPredicate(lowerEntry.getValue()).encode());
99110
} else {
100111
rangeMap.remove(removeRange);
112+
// notify change for effected consumer.
113+
Map.Entry<Integer, Consumer> lowerEntry = rangeMap.higherEntry(removeRange);
114+
if (lowerEntry != null) {
115+
lowerEntry.getValue().notifyActiveConsumerChange(
116+
generateSpecialPredicate(lowerEntry.getValue()).encode());
117+
}
101118
}
119+
// // notify removed consumer change
120+
// consumer.notifyActiveConsumerChange(StickyKeyConsumerPredicate.DENIED.encode());
102121
}
103122
}
104123

@@ -164,4 +183,18 @@ private boolean is2Power(int num) {
164183
Map<Integer, Consumer> getRangeConsumer() {
165184
return Collections.unmodifiableMap(rangeMap);
166185
}
186+
187+
@Override
188+
public StickyKeyConsumerPredicate generateSpecialPredicate(Consumer consumer){
189+
Integer highIndex = consumerRange.get(consumer);
190+
if (highIndex == null) {
191+
highIndex = -1;
192+
}
193+
Integer lowIndex = rangeMap.lowerKey(highIndex);
194+
if (lowIndex == null){
195+
lowIndex = -1;
196+
}
197+
return new StickyKeyConsumerPredicate
198+
.Predicate4HashRangeAutoSplitStickyKeyConsumerSelector(lowIndex, highIndex, rangeSize);
199+
}
167200
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java

+26
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24+
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
27+
import java.util.Set;
2628
import java.util.concurrent.ConcurrentSkipListMap;
2729
import org.apache.pulsar.client.api.Range;
30+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
2831
import org.apache.pulsar.common.api.proto.IntRange;
2932
import org.apache.pulsar.common.api.proto.KeySharedMeta;
3033

@@ -58,11 +61,13 @@ public void addConsumer(Consumer consumer) throws BrokerServiceException.Consume
5861
rangeMap.put(intRange.getStart(), consumer);
5962
rangeMap.put(intRange.getEnd(), consumer);
6063
}
64+
notifyActiveConsumerChanged();
6165
}
6266

6367
@Override
6468
public void removeConsumer(Consumer consumer) {
6569
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
70+
notifyActiveConsumerChanged();
6671
}
6772

6873
@Override
@@ -146,4 +151,25 @@ Map<Integer, Consumer> getRangeConsumer() {
146151
return Collections.unmodifiableMap(rangeMap);
147152
}
148153

154+
@Override
155+
public StickyKeyConsumerPredicate generateSpecialPredicate(Consumer consumer){
156+
ConcurrentSkipListMap<Integer, String> cpRangeMap = new ConcurrentSkipListMap<>();
157+
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
158+
String v = StickyKeyConsumerPredicate.OTHER_CONSUMER_MARK;
159+
if (consumer == entry.getValue()){
160+
v = StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK;
161+
}
162+
cpRangeMap.put(entry.getKey(), v);
163+
}
164+
return new StickyKeyConsumerPredicate
165+
.Predicate4HashRangeExclusiveStickyKeyConsumerSelector(cpRangeMap, rangeSize);
166+
}
167+
168+
private void notifyActiveConsumerChanged() {
169+
// TODO add configuration for s events
170+
Set<Consumer> consumerSet = new HashSet<>(rangeMap.values());
171+
consumerSet.forEach(consumer ->
172+
consumer.notifyActiveConsumerChange(generateSpecialPredicate(consumer).encode()));
173+
}
174+
149175
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java

+2
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boo
6969

7070
void sendActiveConsumerChange(long consumerId, boolean isActive);
7171

72+
void sendActiveConsumerChange(long consumerId, String keySharedProps);
73+
7274
void sendSuccess(long requestId);
7375

7476
void sendError(long requestId, ServerError error, String message);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java

+12
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,18 @@ public void sendActiveConsumerChange(long consumerId, boolean isActive) {
195195
cnx.ctx().voidPromise());
196196
}
197197

198+
@Override
199+
public void sendActiveConsumerChange(long consumerId, String keySharedProps) {
200+
// TODO implements
201+
if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) {
202+
// if the client is older than `v12`, we don't need to send consumer group changes.
203+
return;
204+
}
205+
cnx.ctx().writeAndFlush(
206+
Commands.newActiveConsumerChange(consumerId, keySharedProps),
207+
cnx.ctx().voidPromise());
208+
}
209+
198210
@Override
199211
public void sendSuccess(long requestId) {
200212
cnx.ctx().writeAndFlush(Commands.newSuccess(requestId));

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java

+7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
2424
import org.apache.pulsar.client.api.Range;
25+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
2526
import org.apache.pulsar.common.util.Murmur3_32Hash;
2627

2728
public interface StickyKeyConsumerSelector {
@@ -68,4 +69,10 @@ static int makeStickyKeyHash(byte[] stickyKey) {
6869
* @return A map where key is a consumer name and value is list of hash range it receiving message for.
6970
*/
7071
Map<Consumer, List<Range>> getConsumerKeyHashRanges();
72+
73+
/***
74+
* Generate specify key-predicate for {@param consumer}.
75+
* @return specify key-predicate.
76+
*/
77+
StickyKeyConsumerPredicate generateSpecialPredicate(Consumer consumer);
7178
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java

+116
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.Mockito.doAnswer;
2123
import static org.mockito.Mockito.mock;
2224
import static org.mockito.Mockito.when;
2325

2426

27+
import java.nio.charset.StandardCharsets;
28+
import java.util.concurrent.atomic.AtomicInteger;
2529
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
2630
import org.apache.pulsar.client.api.Range;
31+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
32+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate.Predicate4ConsistentHashingStickyKeyConsumerSelector;
2733
import org.testng.Assert;
2834
import org.testng.annotations.Test;
2935

@@ -37,6 +43,116 @@
3743
@Test(groups = "broker")
3844
public class ConsistentHashingStickyKeyConsumerSelectorTest {
3945

46+
@Test
47+
public void testEventListener() throws Exception{
48+
final ConsistentHashingStickyKeyConsumerSelector selector =
49+
new ConsistentHashingStickyKeyConsumerSelector(100);
50+
// consumer count: 0 --> 1
51+
Consumer consumer1 = mock(Consumer.class);
52+
when(consumer1.consumerName()).thenReturn("c1");
53+
AtomicInteger eventCount1 = new AtomicInteger();
54+
doAnswer(invocation -> {
55+
String props = invocation.getArgument(0);
56+
StickyKeyConsumerPredicate predicate = StickyKeyConsumerPredicate.decode(props);
57+
Assert.assertTrue(predicate instanceof Predicate4ConsistentHashingStickyKeyConsumerSelector);
58+
eventCount1.incrementAndGet();
59+
return null;
60+
}).when(consumer1).notifyActiveConsumerChange(anyString());
61+
selector.addConsumer(consumer1);
62+
Assert.assertEquals(1, eventCount1.get());
63+
// consumer count: 1 --> 2
64+
Consumer consumer2 = mock(Consumer.class);
65+
when(consumer2.consumerName()).thenReturn("c2");
66+
AtomicInteger eventCount2 = new AtomicInteger();
67+
doAnswer(invocation -> {
68+
String props = invocation.getArgument(0);
69+
StickyKeyConsumerPredicate predicate = StickyKeyConsumerPredicate.decode(props);
70+
Assert.assertTrue(predicate instanceof Predicate4ConsistentHashingStickyKeyConsumerSelector);
71+
eventCount2.incrementAndGet();
72+
return null;
73+
}).when(consumer2).notifyActiveConsumerChange(anyString());
74+
selector.addConsumer(consumer2);
75+
Assert.assertEquals(1, eventCount2.get());
76+
Assert.assertEquals(2, eventCount1.get());
77+
// consumer count: 2 --> 3
78+
Consumer consumer3 = mock(Consumer.class);
79+
when(consumer3.consumerName()).thenReturn("c3");
80+
AtomicInteger eventCount3 = new AtomicInteger();
81+
doAnswer(invocation -> {
82+
String props = invocation.getArgument(0);
83+
StickyKeyConsumerPredicate predicate = StickyKeyConsumerPredicate.decode(props);
84+
Assert.assertTrue(predicate instanceof Predicate4ConsistentHashingStickyKeyConsumerSelector);
85+
eventCount3.incrementAndGet();
86+
return null;
87+
}).when(consumer3).notifyActiveConsumerChange(anyString());
88+
selector.addConsumer(consumer3);
89+
Assert.assertEquals(eventCount1.get(), 3);
90+
Assert.assertEquals(eventCount2.get(), 2);
91+
Assert.assertEquals(eventCount3.get(), 1);
92+
// consumer count: 3 --> 2
93+
selector.removeConsumer(consumer1);
94+
Assert.assertEquals(eventCount1.get(), 3);
95+
Assert.assertEquals(eventCount2.get(), 3);
96+
Assert.assertEquals(eventCount3.get(), 2);
97+
// consumer count: 2 --> 1
98+
selector.removeConsumer(consumer2);
99+
Assert.assertEquals(eventCount1.get(), 3);
100+
Assert.assertEquals(eventCount2.get(), 3);
101+
Assert.assertEquals(eventCount3.get(), 3);
102+
// consumer count: 1 --> 0
103+
selector.removeConsumer(consumer3);
104+
Assert.assertEquals(eventCount1.get(), 3);
105+
Assert.assertEquals(eventCount2.get(), 3);
106+
Assert.assertEquals(eventCount3.get(), 3);
107+
}
108+
109+
@Test(dependsOnMethods = {"testConsumerSelect"})
110+
public void testGenerateSpecialPredicate() throws Exception{
111+
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
112+
String key1 = "anyKey";
113+
// one consumer
114+
Consumer consumer1 = mock(Consumer.class);
115+
when(consumer1.consumerName()).thenReturn("c1");
116+
selector.addConsumer(consumer1);
117+
Assert.assertTrue(selector.generateSpecialPredicate(consumer1).test(key1));
118+
// more consumer
119+
Consumer consumer2 = mock(Consumer.class);
120+
when(consumer2.consumerName()).thenReturn("c2");
121+
selector.addConsumer(consumer2);
122+
Consumer consumer3 = mock(Consumer.class);
123+
when(consumer3.consumerName()).thenReturn("c3");
124+
selector.addConsumer(consumer3);
125+
Consumer consumer4 = mock(Consumer.class);
126+
when(consumer4.consumerName()).thenReturn("c4");
127+
selector.addConsumer(consumer4);
128+
Consumer consumer5 = mock(Consumer.class);
129+
when(consumer5.consumerName()).thenReturn("c5");
130+
selector.addConsumer(consumer5);
131+
// do test
132+
final Map<Consumer, StickyKeyConsumerPredicate> predicateMapping = new HashMap<>();
133+
predicateMapping.put(consumer1,
134+
StickyKeyConsumerPredicate.decode(selector.generateSpecialPredicate(consumer1).encode()));
135+
predicateMapping.put(consumer2,
136+
StickyKeyConsumerPredicate.decode(selector.generateSpecialPredicate(consumer2).encode()));
137+
predicateMapping.put(consumer3,
138+
StickyKeyConsumerPredicate.decode(selector.generateSpecialPredicate(consumer3).encode()));
139+
predicateMapping.put(consumer4,
140+
StickyKeyConsumerPredicate.decode(selector.generateSpecialPredicate(consumer4).encode()));
141+
predicateMapping.put(consumer5,
142+
StickyKeyConsumerPredicate.decode(selector.generateSpecialPredicate(consumer5).encode()));
143+
for (int i = 0; i < 100; i++){
144+
String randomKey = UUID.randomUUID().toString();
145+
Consumer selectedConsumer = selector.select(randomKey.getBytes(StandardCharsets.UTF_8));
146+
for (Map.Entry<Consumer, StickyKeyConsumerPredicate> entry : predicateMapping.entrySet()){
147+
if (selectedConsumer == entry.getKey()){
148+
Assert.assertTrue(entry.getValue().test(randomKey));
149+
} else {
150+
Assert.assertFalse(entry.getValue().test(randomKey));
151+
}
152+
}
153+
}
154+
}
155+
40156
@Test
41157
public void testConsumerSelect() throws ConsumerAssignException {
42158

0 commit comments

Comments
 (0)