Skip to content

Commit bdcdace

Browse files
committed
[Fix Comment second time]Fixes apache#6555 Send Key hash range change events to consumers in Key_Shared subscription.
1 parent b35aea4 commit bdcdace

File tree

4 files changed

+417
-7
lines changed

4 files changed

+417
-7
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import java.util.HashSet;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.NavigableMap;
2728
import java.util.Set;
29+
import java.util.TreeMap;
2830
import java.util.concurrent.ConcurrentSkipListMap;
2931
import org.apache.pulsar.client.api.Range;
3032
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
@@ -153,7 +155,7 @@ Map<Integer, Consumer> getRangeConsumer() {
153155

154156
public StickyKeyConsumerPredicate generateSpecialPredicate(Consumer consumer,
155157
final Map<Integer, Consumer> rangeMap){
156-
ConcurrentSkipListMap<Integer, String> cpRangeMap = new ConcurrentSkipListMap<>();
158+
NavigableMap<Integer, String> cpRangeMap = new TreeMap<>();
157159
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
158160
String v = StickyKeyConsumerPredicate.OTHER_CONSUMER_MARK;
159161
if (consumer == entry.getValue()){
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.fail;
22+
import java.lang.reflect.Field;
23+
import java.util.NavigableMap;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Predicate;
26+
import org.apache.pulsar.client.api.Consumer;
27+
import org.apache.pulsar.client.api.ConsumerBuilder;
28+
import org.apache.pulsar.client.api.ConsumerEventListener;
29+
import org.apache.pulsar.client.api.KeySharedPolicy;
30+
import org.apache.pulsar.client.api.Range;
31+
import org.apache.pulsar.client.api.SubscriptionType;
32+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate;
33+
import org.testng.Assert;
34+
import org.testng.annotations.AfterClass;
35+
import org.testng.annotations.BeforeClass;
36+
import org.testng.annotations.Test;
37+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate.Predicate4HashRangeAutoSplitStickyKeyConsumerSelector;
38+
import org.apache.pulsar.client.impl.StickyKeyConsumerPredicate.Predicate4HashRangeExclusiveStickyKeyConsumerSelector;
39+
40+
@Test(groups = "broker")
41+
public class KeySharedE2ETest extends BrokerTestBase {
42+
43+
@BeforeClass
44+
@Override
45+
protected void setup() throws Exception {
46+
super.baseSetup();
47+
}
48+
49+
@AfterClass(alwaysRun = true)
50+
@Override
51+
protected void cleanup() throws Exception {
52+
super.internalCleanup();
53+
}
54+
55+
protected static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;
56+
57+
protected static class TestConsumerStateEventListener implements ConsumerEventListener {
58+
59+
String name = "";
60+
61+
Predicate<String> keyPredicate;
62+
63+
AtomicInteger trigCount = new AtomicInteger();
64+
65+
public TestConsumerStateEventListener(String name) {
66+
this.name = name;
67+
}
68+
69+
@Override
70+
public void becameActive(Consumer<?> consumer, int partitionId) {
71+
}
72+
73+
@Override
74+
public void becameInactive(Consumer<?> consumer, int partitionId) {
75+
76+
}
77+
78+
@Override
79+
public void keySharedRuleChanged(Consumer<?> consumer, Predicate<String> keyPredicate) {
80+
this.keyPredicate = keyPredicate;
81+
trigCount.incrementAndGet();
82+
}
83+
}
84+
85+
protected void clearEventListener(TestConsumerStateEventListener eventListener){
86+
eventListener.keyPredicate = null;
87+
eventListener.trigCount.set(0);
88+
}
89+
90+
@Test
91+
public void testConsumerEventsWithAutoHashRangeImpl() throws Exception {
92+
final String topicName = "persistent://prop/use/ns-abc/key-shared-topic_01-" + System.currentTimeMillis();
93+
final String subName = "sub_key_shared_01";
94+
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
95+
.subscriptionType(SubscriptionType.Key_Shared);
96+
int rangeSize = 0;
97+
// 1.One subscription.
98+
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener("key_shared_listener_01");
99+
ConsumerBuilder<byte[]> consumerBuilder1 = consumerBuilder.clone().consumerName("key_shared_consumer_01")
100+
.consumerEventListener(listener1);
101+
Consumer<byte[]> consumer1 = consumerBuilder1.subscribe();
102+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
103+
Assert.assertEquals(listener1.trigCount.get(), 1);
104+
Assert.assertTrue(listener1.keyPredicate != null);
105+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeAutoSplitStickyKeyConsumerSelector);
106+
Predicate4HashRangeAutoSplitStickyKeyConsumerSelector predicate4ConsistentHashingStickyKeyConsumerSelector1 =
107+
(Predicate4HashRangeAutoSplitStickyKeyConsumerSelector) listener1.keyPredicate;
108+
rangeSize = predicate4ConsistentHashingStickyKeyConsumerSelector1.getRangeSize();
109+
Assert.assertTrue(predicate4ConsistentHashingStickyKeyConsumerSelector1.getRangeSize()
110+
== predicate4ConsistentHashingStickyKeyConsumerSelector1.getHighHash());
111+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector1.getLowHash(), -1);
112+
// 2.Two subscription.
113+
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener("key_shared_listener_02");
114+
ConsumerBuilder<byte[]> consumerBuilder2 = consumerBuilder.clone().consumerName("key_shared_consumer_02")
115+
.consumerEventListener(listener2);
116+
Consumer<byte[]> consumer2 = consumerBuilder2.subscribe();
117+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
118+
// assert listener2
119+
Assert.assertEquals(listener2.trigCount.get(), 1);
120+
Assert.assertTrue(listener2.keyPredicate != null);
121+
Assert.assertTrue(listener2.keyPredicate instanceof Predicate4HashRangeAutoSplitStickyKeyConsumerSelector);
122+
Predicate4HashRangeAutoSplitStickyKeyConsumerSelector predicate4ConsistentHashingStickyKeyConsumerSelector2 =
123+
(Predicate4HashRangeAutoSplitStickyKeyConsumerSelector) listener2.keyPredicate;
124+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector2.getRangeSize(), rangeSize);
125+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector2.getHighHash(), rangeSize >> 1);
126+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector2.getLowHash(), -1);
127+
// assert listener1
128+
Assert.assertEquals(listener1.trigCount.get(), 2);
129+
Assert.assertTrue(listener1.keyPredicate != null);
130+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeAutoSplitStickyKeyConsumerSelector);
131+
predicate4ConsistentHashingStickyKeyConsumerSelector1 =
132+
(Predicate4HashRangeAutoSplitStickyKeyConsumerSelector) listener1.keyPredicate;
133+
Assert.assertTrue(predicate4ConsistentHashingStickyKeyConsumerSelector1.getRangeSize()
134+
== predicate4ConsistentHashingStickyKeyConsumerSelector1.getHighHash());
135+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector1.getLowHash(), rangeSize >> 1);
136+
// 3. close one consumer.
137+
consumer2.close();
138+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
139+
// assert listener1
140+
Assert.assertEquals(listener1.trigCount.get(), 3);
141+
Assert.assertTrue(listener1.keyPredicate != null);
142+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeAutoSplitStickyKeyConsumerSelector);
143+
predicate4ConsistentHashingStickyKeyConsumerSelector1 =
144+
(Predicate4HashRangeAutoSplitStickyKeyConsumerSelector) listener1.keyPredicate;
145+
Assert.assertTrue(predicate4ConsistentHashingStickyKeyConsumerSelector1.getRangeSize()
146+
== predicate4ConsistentHashingStickyKeyConsumerSelector1.getHighHash());
147+
Assert.assertEquals(predicate4ConsistentHashingStickyKeyConsumerSelector1.getLowHash(), -1);
148+
// close sources.
149+
consumer1.close();
150+
admin.topics().delete(topicName);
151+
}
152+
153+
@Test
154+
public void testConsumerEventsWithFixedHashRangeImpl() throws Exception {
155+
final String topicName = "persistent://prop/use/ns-abc/key-shared-topic_01-" + System.currentTimeMillis();
156+
final String subName = "sub_key_shared_01";
157+
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
158+
.subscriptionType(SubscriptionType.Key_Shared);
159+
int rangeSize = 0;
160+
// 1.One subscription.
161+
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener("key_shared_listener_01");
162+
ConsumerBuilder<byte[]> consumerBuilder1 = consumerBuilder.clone().consumerName("key_shared_consumer_01")
163+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 10)))
164+
.consumerEventListener(listener1);
165+
Consumer<byte[]> consumer1 = consumerBuilder1.subscribe();
166+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
167+
Assert.assertEquals(listener1.trigCount.get(), 1);
168+
Assert.assertTrue(listener1.keyPredicate != null);
169+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeExclusiveStickyKeyConsumerSelector);
170+
Predicate4HashRangeExclusiveStickyKeyConsumerSelector predicate1 =
171+
(Predicate4HashRangeExclusiveStickyKeyConsumerSelector) listener1.keyPredicate;
172+
rangeSize = predicate1.getRangeSize();
173+
NavigableMap<Integer, String> rangeMap1 = resolveRangeMap(predicate1);
174+
Assert.assertEquals(rangeMap1.size(), 2);
175+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
176+
Assert.assertEquals(rangeMap1.floorEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
177+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getKey(), Integer.valueOf(10));
178+
Assert.assertEquals(rangeMap1.floorEntry(9).getKey(), Integer.valueOf(0));
179+
// 2.Two subscription.
180+
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener("key_shared_listener_01");
181+
ConsumerBuilder<byte[]> consumerBuilder2 = consumerBuilder.clone().consumerName("key_shared_consumer_01")
182+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(11, 20)))
183+
.consumerEventListener(listener2);
184+
Consumer<byte[]> consumer2 = consumerBuilder2.subscribe();
185+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
186+
// assert listener2
187+
Assert.assertEquals(listener2.trigCount.get(), 1);
188+
Assert.assertTrue(listener2.keyPredicate != null);
189+
Assert.assertTrue(listener2.keyPredicate instanceof Predicate4HashRangeExclusiveStickyKeyConsumerSelector);
190+
Predicate4HashRangeExclusiveStickyKeyConsumerSelector predicate2 =
191+
(Predicate4HashRangeExclusiveStickyKeyConsumerSelector) listener2.keyPredicate;
192+
Assert.assertEquals(predicate2.getRangeSize(), rangeSize);
193+
NavigableMap<Integer, String> rangeMap2 = resolveRangeMap(predicate2);
194+
Assert.assertEquals(rangeMap2.size(), 4);
195+
Assert.assertEquals(rangeMap2.ceilingEntry(19).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
196+
Assert.assertEquals(rangeMap2.floorEntry(19).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
197+
Assert.assertEquals(rangeMap2.ceilingEntry(19).getKey(), Integer.valueOf(20));
198+
Assert.assertEquals(rangeMap2.floorEntry(19).getKey(), Integer.valueOf(11));
199+
// assert listener1
200+
Assert.assertEquals(listener1.trigCount.get(), 2);
201+
Assert.assertTrue(listener1.keyPredicate != null);
202+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeExclusiveStickyKeyConsumerSelector);
203+
predicate1 = (Predicate4HashRangeExclusiveStickyKeyConsumerSelector) listener1.keyPredicate;
204+
Assert.assertEquals(predicate1.getRangeSize(), rangeSize);
205+
rangeMap1 = resolveRangeMap(predicate1);
206+
Assert.assertEquals(rangeMap1.size(), 4);
207+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
208+
Assert.assertEquals(rangeMap1.floorEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
209+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getKey(), Integer.valueOf(10));
210+
Assert.assertEquals(rangeMap1.floorEntry(9).getKey(), Integer.valueOf(0));
211+
// 3. close one consumer.
212+
consumer2.close();
213+
// assert listener1
214+
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
215+
Assert.assertEquals(listener1.trigCount.get(), 3);
216+
Assert.assertTrue(listener1.keyPredicate != null);
217+
Assert.assertTrue(listener1.keyPredicate instanceof Predicate4HashRangeExclusiveStickyKeyConsumerSelector);
218+
predicate1 = (Predicate4HashRangeExclusiveStickyKeyConsumerSelector) listener1.keyPredicate;
219+
Assert.assertEquals(predicate1.getRangeSize(), rangeSize);
220+
rangeMap1 = resolveRangeMap(predicate1);
221+
Assert.assertEquals(rangeMap1.size(), 2);
222+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
223+
Assert.assertEquals(rangeMap1.floorEntry(9).getValue(), StickyKeyConsumerPredicate.SPECIAL_CONSUMER_MARK);
224+
Assert.assertEquals(rangeMap1.ceilingEntry(9).getKey(), Integer.valueOf(10));
225+
Assert.assertEquals(rangeMap1.floorEntry(9).getKey(), Integer.valueOf(0));
226+
// close sources.
227+
consumer1.close();
228+
admin.topics().delete(topicName);
229+
}
230+
231+
private NavigableMap<Integer, String> resolveRangeMap(
232+
Predicate4HashRangeExclusiveStickyKeyConsumerSelector predicate){
233+
try {
234+
Field field = Predicate4HashRangeExclusiveStickyKeyConsumerSelector.class.getDeclaredField("rangeMap");
235+
field.setAccessible(true);
236+
return (NavigableMap<Integer, String>) field.get(predicate);
237+
} catch (Exception e){
238+
fail("should fail");
239+
return null;
240+
}
241+
}
242+
}

0 commit comments

Comments
 (0)