Skip to content

Commit

Permalink
[fix][broker] Fix NPE causing dispatching to stop when using Key_Shar…
Browse files Browse the repository at this point in the history
…ed mode and allowOutOfOrderDelivery=true (#22533)
  • Loading branch information
poorbarcode authored Apr 19, 2024
1 parent 7aedb6b commit 2badcf6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ private int getAvailablePermits(Consumer c) {

@Override
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
// So skip this filter out.
if (isAllowOutOfOrderDelivery()) {
return src;
}
if (src.isEmpty()) {
return src;
}
Expand Down Expand Up @@ -501,6 +506,11 @@ protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarde
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
// So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
if (isAllowOutOfOrderDelivery()) {
return true;
}
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,14 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
admin.topics().delete(topic, false);
}

@DataProvider(name = "allowKeySharedOutOfOrder")
public Object[][] allowKeySharedOutOfOrder() {
return new Object[][]{
{true},
{false}
};
}

/**
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
* 1. Start 3 consumers:
Expand All @@ -1755,8 +1763,8 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
* - no repeated Read-and-discard.
* - at last, all messages will be received.
*/
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
@Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
final int messagesSentPerTime = 100;
final Set<Integer> totalReceivedMessages = new TreeSet<>();
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand All @@ -1775,6 +1783,8 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
log.info("Published message :{}", messageId);
}

KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
.setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
// 1. Start 3 consumers and make ack holes.
// - one consumer will be closed and trigger a messages redeliver.
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
Expand All @@ -1785,18 +1795,21 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
List<Message> msgList1 = new ArrayList<>();
List<Message> msgList2 = new ArrayList<>();
Expand Down Expand Up @@ -1845,6 +1858,7 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
.subscriptionName(subName)
.receiverQueueSize(1000)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(keySharedPolicy)
.subscribe();
consumerWillBeClose.close();

Expand Down

0 comments on commit 2badcf6

Please sign in to comment.