diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 4e2e032388499..ec4ed02e35d72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertSame; @@ -47,6 +48,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -360,7 +362,8 @@ public void testAddRemoveConsumer() throws Exception { // 4. Verify active consumer assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); // get the notified with who is the leader - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 1, true); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); @@ -372,7 +375,8 @@ public void testAddRemoveConsumer() throws Exception { assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); assertEquals(3, consumers.size()); // get notified with who is the leader - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 2, false); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); @@ -387,13 +391,17 @@ public void testAddRemoveConsumer() throws Exception { assertEquals(4, consumers.size()); // all consumers will receive notifications - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 0, true); - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 1, false); - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 1, false); - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 2, false); verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0)); verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); @@ -419,9 +427,11 @@ public void testAddRemoveConsumer() throws Exception { assertEquals(2, consumers.size()); // the remaining consumers will receive notifications - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 1, true); - change = consumerChanges.take(); + change = consumerChanges.poll(10, TimeUnit.SECONDS); + assertNotNull(change); verifyActiveConsumerChange(change, 1, true); // 10. Attempt to remove already removed consumer diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java new file mode 100644 index 0000000000000..dc4d1c4908c48 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import lombok.Getter; + +@NotThreadSafe +public class SegmentedLongArray implements AutoCloseable { + + private static final int SIZE_OF_LONG = 8; + + private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB + private final List buffers = new ArrayList<>(); + + @Getter + private final long initialCapacity; + + @Getter + private long capacity; + + public SegmentedLongArray(long initialCapacity) { + long remainingToAdd = initialCapacity; + + // Add first segment + int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE); + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG); + buffer.writerIndex(sizeToAdd * SIZE_OF_LONG); + buffers.add(buffer); + remainingToAdd -= sizeToAdd; + + // Add the remaining segments, all at full segment size, if necessary + while (remainingToAdd > 0) { + buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG); + buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG); + buffers.add(buffer); + remainingToAdd -= MAX_SEGMENT_SIZE; + } + + this.initialCapacity = initialCapacity; + this.capacity = this.initialCapacity; + } + + public void writeLong(long offset, long value) { + int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE); + int internalIdx = (int) (offset % MAX_SEGMENT_SIZE); + buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value); + } + + public long readLong(long offset) { + int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE); + int internalIdx = (int) (offset % MAX_SEGMENT_SIZE); + return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG); + } + + public void increaseCapacity() { + if (capacity < MAX_SEGMENT_SIZE) { + // Resize the current buffer to bigger capacity + capacity += (capacity <= 256 ? capacity : capacity / 2); + capacity = Math.min(capacity, MAX_SEGMENT_SIZE); + buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG); + buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG); + } else { + // Let's add 1 mode buffer to the list + int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG; + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize); + buffer.writerIndex(bufferSize); + buffers.add(buffer); + capacity += MAX_SEGMENT_SIZE; + } + } + + public void shrink(long newCapacity) { + if (newCapacity >= capacity || newCapacity < initialCapacity) { + return; + } + + long sizeToReduce = capacity - newCapacity; + while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) { + ByteBuf b = buffers.remove(buffers.size() - 1); + b.release(); + capacity -= MAX_SEGMENT_SIZE; + sizeToReduce -= MAX_SEGMENT_SIZE; + } + + if (buffers.size() == 1 && sizeToReduce > 0) { + // We should also reduce the capacity of the first buffer + capacity -= sizeToReduce; + ByteBuf oldBuffer = buffers.get(0); + ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG); + oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG); + oldBuffer.release(); + buffers.set(0, newBuffer); + } + } + + @Override + public void close() { + buffers.forEach(ByteBuf::release); + } + + /** + * The amount of memory used to back the array of longs. + */ + public long bytesCapacity() { + return capacity * SIZE_OF_LONG; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java index 487c2284cffdf..50288247c643c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java @@ -19,9 +19,6 @@ package org.apache.pulsar.common.util.collections; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; /** * Provides a priority-queue implementation specialized on items composed by 3 longs. @@ -29,33 +26,28 @@ *

This class is not thread safe and the items are stored in direct memory. */ public class TripleLongPriorityQueue implements AutoCloseable { - - private static final int SIZE_OF_LONG = 8; private static final int DEFAULT_INITIAL_CAPACITY = 16; private static final float DEFAULT_SHRINK_FACTOR = 0.5f; // Each item is composed of 3 longs private static final int ITEMS_COUNT = 3; - private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG; - /** * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage. */ private static final float RESERVATION_FACTOR = 0.9f; - private ByteBuf buffer; + private final SegmentedLongArray array; - private final int initialCapacity; + // Count of how many (long,long,long) tuples are currently inserted + private long tuplesCount; - private int capacity; - private int size; /** * When size < capacity * shrinkFactor, may trigger shrinking. */ private final float shrinkFactor; - private float shrinkThreshold; + private long shrinkThreshold; /** * Create a new priority queue with default initial capacity. @@ -64,13 +56,12 @@ public TripleLongPriorityQueue() { this(DEFAULT_INITIAL_CAPACITY); } - public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) { + public TripleLongPriorityQueue(long initialCapacity, float shrinkFactor) { + checkArgument(initialCapacity > 0); checkArgument(shrinkFactor > 0); - this.initialCapacity = initialCapacity; - this.capacity = initialCapacity; - this.shrinkThreshold = this.capacity * shrinkFactor; - this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE); - this.size = 0; + this.array = new SegmentedLongArray(initialCapacity * ITEMS_COUNT); + this.tuplesCount = 0; + this.shrinkThreshold = (long) (initialCapacity * shrinkFactor); this.shrinkFactor = shrinkFactor; } @@ -87,7 +78,7 @@ public TripleLongPriorityQueue(int initialCapacity) { */ @Override public void close() { - buffer.release(); + array.close(); } /** @@ -98,13 +89,14 @@ public void close() { * @param n3 */ public void add(long n1, long n2, long n3) { - if (size == capacity) { - increaseCapacity(); + long arrayIdx = tuplesCount * ITEMS_COUNT; + if ((arrayIdx + 2) >= array.getCapacity()) { + array.increaseCapacity(); } - put(size, n1, n2, n3); - siftUp(size); - ++size; + put(tuplesCount, n1, n2, n3); + siftUp(tuplesCount); + ++tuplesCount; } /** @@ -113,8 +105,8 @@ public void add(long n1, long n2, long n3) { *

The tuple will not be extracted */ public long peekN1() { - checkArgument(size != 0); - return buffer.getLong(0); + checkArgument(tuplesCount != 0); + return array.readLong(0); } /** @@ -123,8 +115,8 @@ public long peekN1() { *

The tuple will not be extracted */ public long peekN2() { - checkArgument(size != 0); - return buffer.getLong(0 + 1 * SIZE_OF_LONG); + checkArgument(tuplesCount != 0); + return array.readLong(1); } /** @@ -133,17 +125,17 @@ public long peekN2() { *

The tuple will not be extracted */ public long peekN3() { - checkArgument(size != 0); - return buffer.getLong(0 + 2 * SIZE_OF_LONG); + checkArgument(tuplesCount != 0); + return array.readLong(2); } /** * Removes the first item from the queue. */ public void pop() { - checkArgument(size != 0); - swap(0, size - 1); - size--; + checkArgument(tuplesCount != 0); + swap(0, tuplesCount - 1); + tuplesCount--; siftDown(0); shrinkCapacity(); } @@ -152,132 +144,125 @@ public void pop() { * Returns whether the priority queue is empty. */ public boolean isEmpty() { - return size == 0; + return tuplesCount == 0; } /** * Returns the number of tuples in the priority queue. */ - public int size() { - return size; + public long size() { + return tuplesCount; + } + + /** + * The amount of memory used to back the priority queue. + */ + public long bytesCapacity() { + return array.bytesCapacity(); } /** * Clear all items. */ public void clear() { - this.buffer.clear(); - this.size = 0; + this.tuplesCount = 0; shrinkCapacity(); } - private void increaseCapacity() { - // For bigger sizes, increase by 50% - this.capacity += (capacity <= 256 ? capacity : capacity / 2); - this.shrinkThreshold = this.capacity * shrinkFactor; - buffer.capacity(this.capacity * TUPLE_SIZE); - } - private void shrinkCapacity() { - if (capacity > initialCapacity && size < shrinkThreshold) { - int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR); - if (decreasingSize <= 0) { + if (tuplesCount <= shrinkThreshold && array.getCapacity() > array.getInitialCapacity()) { + long sizeToShrink = (long) (array.getCapacity() * shrinkFactor * RESERVATION_FACTOR); + if (sizeToShrink == 0) { return; } - if (capacity - decreasingSize <= initialCapacity) { - this.capacity = initialCapacity; + + long newCapacity; + if (array.getCapacity() - sizeToShrink <= array.getInitialCapacity()) { + newCapacity = array.getInitialCapacity(); } else { - this.capacity = capacity - decreasingSize; + newCapacity = array.getCapacity() - sizeToShrink; } - this.shrinkThreshold = this.capacity * shrinkFactor; - ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE); - buffer.getBytes(0, newBuffer, size * TUPLE_SIZE); - buffer.release(); - this.buffer = newBuffer; + array.shrink(newCapacity); + this.shrinkThreshold = (long) (array.getCapacity() / (double) ITEMS_COUNT * shrinkFactor); } } - private void siftUp(int idx) { - while (idx > 0) { - int parentIdx = (idx - 1) / 2; - if (compare(idx, parentIdx) >= 0) { + private void siftUp(long tupleIdx) { + while (tupleIdx > 0) { + long parentIdx = (tupleIdx - 1) / 2; + if (compare(tupleIdx, parentIdx) >= 0) { break; } - swap(idx, parentIdx); - idx = parentIdx; + swap(tupleIdx, parentIdx); + tupleIdx = parentIdx; } } - private void siftDown(int idx) { - int half = size / 2; - while (idx < half) { - int left = 2 * idx + 1; - int right = 2 * idx + 2; + private void siftDown(long tupleIdx) { + long half = tuplesCount / 2; + while (tupleIdx < half) { + long left = 2 * tupleIdx + 1; + long right = 2 * tupleIdx + 2; - int swapIdx = idx; + long swapIdx = tupleIdx; - if (compare(idx, left) > 0) { + if (compare(tupleIdx, left) > 0) { swapIdx = left; } - if (right < size && compare(swapIdx, right) > 0) { + if (right < tuplesCount && compare(swapIdx, right) > 0) { swapIdx = right; } - if (swapIdx == idx) { + if (swapIdx == tupleIdx) { return; } - swap(idx, swapIdx); - idx = swapIdx; + swap(tupleIdx, swapIdx); + tupleIdx = swapIdx; } } - private void put(int idx, long n1, long n2, long n3) { - int i = idx * TUPLE_SIZE; - buffer.setLong(i, n1); - buffer.setLong(i + 1 * SIZE_OF_LONG, n2); - buffer.setLong(i + 2 * SIZE_OF_LONG, n3); + private void put(long tupleIdx, long n1, long n2, long n3) { + long idx = tupleIdx * ITEMS_COUNT; + array.writeLong(idx, n1); + array.writeLong(idx + 1, n2); + array.writeLong(idx + 2, n3); } - private int compare(int idx1, int idx2) { - int i1 = idx1 * TUPLE_SIZE; - int i2 = idx2 * TUPLE_SIZE; + private int compare(long tupleIdx1, long tupleIdx2) { + long idx1 = tupleIdx1 * ITEMS_COUNT; + long idx2 = tupleIdx2 * ITEMS_COUNT; - int c1 = Long.compare(buffer.getLong(i1), buffer.getLong(i2)); + int c1 = Long.compare(array.readLong(idx1), array.readLong(idx2)); if (c1 != 0) { return c1; } - int c2 = Long.compare(buffer.getLong(i1 + SIZE_OF_LONG), buffer.getLong(i2 + SIZE_OF_LONG)); + int c2 = Long.compare(array.readLong(idx1 + 1), array.readLong(idx2 + 1)); if (c2 != 0) { return c2; } - return Long.compare(buffer.getLong(i1 + 2 * SIZE_OF_LONG), buffer.getLong(i2 + 2 * SIZE_OF_LONG)); + return Long.compare(array.readLong(idx1 + 2), array.readLong(idx2 + 2)); } - private void swap(int idx1, int idx2) { - int i1 = idx1 * TUPLE_SIZE; - int i2 = idx2 * TUPLE_SIZE; + private void swap(long tupleIdx1, long tupleIdx2) { + long idx1 = tupleIdx1 * ITEMS_COUNT; + long idx2 = tupleIdx2 * ITEMS_COUNT; - long tmp1 = buffer.getLong(i1); - long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG); - long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG); + long tmp1 = array.readLong(idx1); + long tmp2 = array.readLong(idx1 + 1); + long tmp3 = array.readLong(idx1 + 2); - buffer.setLong(i1, buffer.getLong(i2)); - buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG)); - buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG)); - - buffer.setLong(i2, tmp1); - buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2); - buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3); - } + array.writeLong(idx1, array.readLong(idx2)); + array.writeLong(idx1 + 1, array.readLong(idx2 + 1)); + array.writeLong(idx1 + 2, array.readLong(idx2 + 2)); - @VisibleForTesting - ByteBuf getBuffer() { - return buffer; + array.writeLong(idx2, tmp1); + array.writeLong(idx2 + 1, tmp2); + array.writeLong(idx2 + 2, tmp3); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java new file mode 100644 index 0000000000000..efb86fd4f9d48 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; +import lombok.Cleanup; +import org.testng.annotations.Test; + +public class SegmentedLongArrayTest { + + @Test + public void testArray() { + @Cleanup + SegmentedLongArray a = new SegmentedLongArray(4); + assertEquals(a.getCapacity(), 4); + assertEquals(a.bytesCapacity(), 4 * 8); + assertEquals(a.getInitialCapacity(), 4); + + a.writeLong(0, 0); + a.writeLong(1, 1); + a.writeLong(2, 2); + a.writeLong(3, Long.MAX_VALUE); + + try { + a.writeLong(4, Long.MIN_VALUE); + fail("should have failed"); + } catch (IndexOutOfBoundsException e) { + // Expected + } + + a.increaseCapacity(); + + a.writeLong(4, Long.MIN_VALUE); + + assertEquals(a.getCapacity(), 8); + assertEquals(a.bytesCapacity(), 8 * 8); + assertEquals(a.getInitialCapacity(), 4); + + assertEquals(a.readLong(0), 0); + assertEquals(a.readLong(1), 1); + assertEquals(a.readLong(2), 2); + assertEquals(a.readLong(3), Long.MAX_VALUE); + assertEquals(a.readLong(4), Long.MIN_VALUE); + + a.shrink(5); + assertEquals(a.getCapacity(), 5); + assertEquals(a.bytesCapacity(), 5 * 8); + assertEquals(a.getInitialCapacity(), 4); + } + + @Test + public void testLargeArray() { + long initialCap = 3 * 1024 * 1024; + + @Cleanup + SegmentedLongArray a = new SegmentedLongArray(initialCap); + assertEquals(a.getCapacity(), initialCap); + assertEquals(a.bytesCapacity(), initialCap * 8); + assertEquals(a.getInitialCapacity(), initialCap); + + long baseOffset = initialCap - 100; + + a.writeLong(baseOffset, 0); + a.writeLong(baseOffset + 1, 1); + a.writeLong(baseOffset + 2, 2); + a.writeLong(baseOffset + 3, Long.MAX_VALUE); + a.writeLong(baseOffset + 4, Long.MIN_VALUE); + + a.increaseCapacity(); + + assertEquals(a.getCapacity(), 5 * 1024 * 1024); + assertEquals(a.bytesCapacity(), 5 * 1024 * 1024 * 8); + assertEquals(a.getInitialCapacity(), initialCap); + + assertEquals(a.readLong(baseOffset), 0); + assertEquals(a.readLong(baseOffset + 1), 1); + assertEquals(a.readLong(baseOffset + 2), 2); + assertEquals(a.readLong(baseOffset + 3), Long.MAX_VALUE); + assertEquals(a.readLong(baseOffset + 4), Long.MIN_VALUE); + + a.shrink(initialCap); + assertEquals(a.getCapacity(), initialCap); + assertEquals(a.bytesCapacity(), initialCap * 8); + assertEquals(a.getInitialCapacity(), initialCap); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java index bd3aef86ad122..de3b1adb7929d 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java @@ -54,6 +54,35 @@ public void testQueue() { pq.close(); } + @Test + public void testLargeQueue() { + TripleLongPriorityQueue pq = new TripleLongPriorityQueue(); + assertEquals(pq.size(), 0); + + final int N = 3_000_000; + + for (int i = N; i > 0; i--) { + pq.add(i, i * 2L, i * 3L); + } + + assertEquals(pq.size(), N); + assertFalse(pq.isEmpty()); + + for (int i = 1; i <= N; i++) { + assertEquals(pq.peekN1(), i); + assertEquals(pq.peekN2(), i * 2); + assertEquals(pq.peekN3(), i * 3); + + pq.pop(); + + assertEquals(pq.size(), N - i); + } + + pq.clear(); + pq.close(); + } + + @Test public void testCheckForEmpty() { TripleLongPriorityQueue pq = new TripleLongPriorityQueue(); @@ -143,26 +172,24 @@ public void testShrink() throws Exception { TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f); pq.add(0, 0, 0); assertEquals(pq.size(), 1); - assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize); + assertEquals(pq.bytesCapacity(), initialCapacity * tupleSize); // Scale out to capacity * 2 triggerScaleOut(initialCapacity, pq); int scaleCapacity = initialCapacity * 2; - assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize); + assertEquals(pq.bytesCapacity(), scaleCapacity * tupleSize); // Trigger shrinking - for (int i = 0; i < initialCapacity / 2 + 1; i++) { + for (int i = 0; i < initialCapacity / 2 + 2; i++) { pq.pop(); } - int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f); - assertEquals(pq.getBuffer().capacity(), capacity * tupleSize); + int capacity = scaleCapacity - (int)((scaleCapacity ) * 0.5f * 0.9f); + assertTrue(pq.bytesCapacity() < scaleCapacity * tupleSize); // Scale out to capacity * 2 triggerScaleOut(initialCapacity, pq); scaleCapacity = capacity * 2; - assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize); // Trigger shrinking pq.clear(); capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f); - assertEquals(pq.getBuffer().capacity(), capacity * tupleSize); } private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {