diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 2df9f38531f5d..27c9eed97093b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -134,11 +134,44 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { && consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); + sortRecentlyJoinedConsumersIfNeeded(); } } }); } + private void sortRecentlyJoinedConsumersIfNeeded() { + if (recentlyJoinedConsumers.size() == 1) { + return; + } + boolean sortNeeded = false; + PositionImpl posPre = null; + PositionImpl posAfter = null; + for (Map.Entry entry : recentlyJoinedConsumers.entrySet()) { + if (posPre == null) { + posPre = entry.getValue(); + } else { + posAfter = entry.getValue(); + } + if (posPre != null && posAfter != null) { + if (posPre.compareTo(posAfter) > 0) { + sortNeeded = true; + break; + } + posPre = posAfter; + } + } + + if (sortNeeded) { + List> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet()); + Collections.sort(sortedList, Map.Entry.comparingByValue()); + recentlyJoinedConsumers.clear(); + for (Map.Entry entry : sortedList) { + recentlyJoinedConsumers.put(entry.getKey(), entry.getValue()); + } + } + } + @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { // The consumer must be removed from the selector before calling the superclass removeConsumer method. @@ -548,8 +581,11 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery); } - public LinkedHashMap getRecentlyJoinedConsumers() { - return recentlyJoinedConsumers; + public synchronized LinkedHashMap getRecentlyJoinedConsumers() { + if (recentlyJoinedConsumers == null) { + return null; + } + return new LinkedHashMap<>(recentlyJoinedConsumers); } public Map> getConsumerKeyHashRanges() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index f21efb277ba6e..bf3a79133b034 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -45,7 +45,9 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -171,6 +173,85 @@ public void testAddConsumerWhenClosed() throws Exception { assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty()); } + @Test + public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception { + PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = + new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + + Consumer consumer0 = mock(Consumer.class); + when(consumer0.consumerName()).thenReturn("c0-1"); + Consumer consumer1 = mock(Consumer.class); + when(consumer1.consumerName()).thenReturn("c1"); + Consumer consumer2 = mock(Consumer.class); + when(consumer2.consumerName()).thenReturn("c2"); + Consumer consumer3 = mock(Consumer.class); + when(consumer3.consumerName()).thenReturn("c3"); + Consumer consumer4 = mock(Consumer.class); + when(consumer4.consumerName()).thenReturn("c4"); + Consumer consumer5 = mock(Consumer.class); + when(consumer5.consumerName()).thenReturn("c5"); + Consumer consumer6 = mock(Consumer.class); + when(consumer6.consumerName()).thenReturn("c6"); + + when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L); + when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1)); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(0, 0)); + persistentDispatcher.addConsumer(consumer0).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 1)); + persistentDispatcher.addConsumer(consumer1).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 2)); + persistentDispatcher.addConsumer(consumer2).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 1)); + persistentDispatcher.addConsumer(consumer3).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 3)); + persistentDispatcher.addConsumer(consumer4).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 2)); + persistentDispatcher.addConsumer(consumer5).join(); + + when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(6, 1)); + persistentDispatcher.addConsumer(consumer6).join(); + + assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 6); + + Iterator> itr + = persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator(); + + Map.Entry entry1 = itr.next(); + assertEquals(entry1.getValue(), PositionImpl.get(4, 1)); + assertEquals(entry1.getKey(), consumer1); + + Map.Entry entry2 = itr.next(); + assertEquals(entry2.getValue(), PositionImpl.get(4, 2)); + assertEquals(entry2.getKey(), consumer5); + + Map.Entry entry3 = itr.next(); + assertEquals(entry3.getValue(), PositionImpl.get(5, 1)); + assertEquals(entry3.getKey(), consumer3); + + Map.Entry entry4 = itr.next(); + assertEquals(entry4.getValue(), PositionImpl.get(5, 2)); + assertEquals(entry4.getKey(), consumer2); + + Map.Entry entry5 = itr.next(); + assertEquals(entry5.getValue(), PositionImpl.get(5, 3)); + assertEquals(entry5.getKey(), consumer4); + + Map.Entry entry6 = itr.next(); + assertEquals(entry6.getValue(), PositionImpl.get(6, 1)); + assertEquals(entry6.getKey(), consumer6); + + // cleanup. + persistentDispatcher.close(); + } + @Test public void testSendMarkerMessage() { try {