Skip to content

Commit

Permalink
Add testConsumersReconnect test
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 1, 2024
1 parent 1a11b4e commit 22b95a7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -493,4 +494,56 @@ public void testShouldNotChangeMappingWhenConsumerLeavesAndRejoins() {

assertThat(selector.getConsumerKeyHashRanges()).as("ranges shouldn't change").containsExactlyInAnyOrderEntriesOf(expected);
}

@Test
public void testConsumersReconnect() {
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
final String consumerName = "consumer";
final int numOfInitialConsumers = 50;
final int validationPointCount = 200;
final List<Integer> pointsToTest = pointsToTest(validationPointCount);
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < numOfInitialConsumers; i++) {
final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
consumers.add(consumer);
selector.addConsumer(consumer);
}

// Mark original results.
List<Consumer> selectedConsumersBeforeRemove = new ArrayList<>();
for (int i = 0; i < validationPointCount; i++) {
int point = pointsToTest.get(i);
selectedConsumersBeforeRemove.add(selector.select(point));
}

// All consumers leave (in any order)
List<Consumer> randomOrderConsumers = new ArrayList<>(consumers);
Collections.shuffle(randomOrderConsumers);
for (Consumer c : randomOrderConsumers) {
selector.removeConsumer(c);
}

// All consumers reconnect in the same order as originally
for (Consumer c : consumers) {
selector.addConsumer(c);
}

// Check that the same consumers are selected as before
for (int j = 0; j < validationPointCount; j++) {
int point = pointsToTest.get(j);
Consumer selected = selector.select(point);
Consumer expected = selectedConsumersBeforeRemove.get(j);
assertThat(selected.consumerId()).as("validationPoint %d, hash %d", j, point).isEqualTo(expected.consumerId());
}
}

private List<Integer> pointsToTest(int validationPointCount) {
List<Integer> res = new ArrayList<>();
int hashRangeSize = Integer.MAX_VALUE;
final int increment = hashRangeSize / (validationPointCount + 1);
for (int i = 0; i < validationPointCount; i++) {
res.add(i * increment);
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -326,7 +327,7 @@ public void testSkipRedeliverTemporally() {
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1")));
final List<Entry> readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key22")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2")));

try {
Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
Expand Down Expand Up @@ -417,7 +418,7 @@ public void testMessageRedelivery() throws Exception {

// Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2
final List<Entry> allEntries = new ArrayList<>();
allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key22")));
allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2")));
allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1")));
allEntries.forEach(entry -> ((EntryImpl) entry).retain());
Expand Down Expand Up @@ -518,8 +519,8 @@ public void testMessageRedelivery() throws Exception {
persistentDispatcher.readMoreEntries();
}

assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1);
assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2);
assertThat(actualEntriesToConsumer1).containsExactlyElementsOf(expectedEntriesToConsumer1);
assertThat(actualEntriesToConsumer2).containsExactlyElementsOf(expectedEntriesToConsumer2);

allEntries.forEach(entry -> entry.release());
}
Expand Down

0 comments on commit 22b95a7

Please sign in to comment.