Skip to content

Commit

Permalink
[fix][broker] Fixed error when delayed messages trackers state grows …
Browse files Browse the repository at this point in the history
…to >1.5GB (apache#16490)

* Fixed error when delayed messages trackers state grows to >1.5GB

* Fixed spotbugs issues

* Fixed javadocs

* In the constructor, ensure all segments after the first one are of max size

* Use poll to figure out where the test is stuck

* Added SegmentedLongArray specific unit test

* Removed unused imports
  • Loading branch information
merlimat authored and weimob-wuxuanqi committed Jul 14, 2022
1 parent 8c7d00a commit 5621dc7
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertSame;
Expand All @@ -47,6 +48,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
Expand Down Expand Up @@ -360,7 +362,8 @@ public void testAddRemoveConsumer() throws Exception {
// 4. Verify active consumer
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
// get the notified with who is the leader
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));

Expand All @@ -372,7 +375,8 @@ public void testAddRemoveConsumer() throws Exception {
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(3, consumers.size());
// get notified with who is the leader
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
Expand All @@ -387,13 +391,17 @@ public void testAddRemoveConsumer() throws Exception {
assertEquals(4, consumers.size());

// all consumers will receive notifications
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 0, true);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
Expand All @@ -419,9 +427,11 @@ public void testAddRemoveConsumer() throws Exception {
assertEquals(2, consumers.size());

// the remaining consumers will receive notifications
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
change = consumerChanges.take();
change = consumerChanges.poll(10, TimeUnit.SECONDS);
assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);

// 10. Attempt to remove already removed consumer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util.collections;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Getter;

@NotThreadSafe
public class SegmentedLongArray implements AutoCloseable {

private static final int SIZE_OF_LONG = 8;

private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
private final List<ByteBuf> buffers = new ArrayList<>();

@Getter
private final long initialCapacity;

@Getter
private long capacity;

public SegmentedLongArray(long initialCapacity) {
long remainingToAdd = initialCapacity;

// Add first segment
int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= sizeToAdd;

// Add the remaining segments, all at full segment size, if necessary
while (remainingToAdd > 0) {
buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
buffers.add(buffer);
remainingToAdd -= MAX_SEGMENT_SIZE;
}

this.initialCapacity = initialCapacity;
this.capacity = this.initialCapacity;
}

public void writeLong(long offset, long value) {
int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
}

public long readLong(long offset) {
int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
}

public void increaseCapacity() {
if (capacity < MAX_SEGMENT_SIZE) {
// Resize the current buffer to bigger capacity
capacity += (capacity <= 256 ? capacity : capacity / 2);
capacity = Math.min(capacity, MAX_SEGMENT_SIZE);
buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG);
buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG);
} else {
// Let's add 1 mode buffer to the list
int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
buffer.writerIndex(bufferSize);
buffers.add(buffer);
capacity += MAX_SEGMENT_SIZE;
}
}

public void shrink(long newCapacity) {
if (newCapacity >= capacity || newCapacity < initialCapacity) {
return;
}

long sizeToReduce = capacity - newCapacity;
while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) {
ByteBuf b = buffers.remove(buffers.size() - 1);
b.release();
capacity -= MAX_SEGMENT_SIZE;
sizeToReduce -= MAX_SEGMENT_SIZE;
}

if (buffers.size() == 1 && sizeToReduce > 0) {
// We should also reduce the capacity of the first buffer
capacity -= sizeToReduce;
ByteBuf oldBuffer = buffers.get(0);
ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
oldBuffer.release();
buffers.set(0, newBuffer);
}
}

@Override
public void close() {
buffers.forEach(ByteBuf::release);
}

/**
* The amount of memory used to back the array of longs.
*/
public long bytesCapacity() {
return capacity * SIZE_OF_LONG;
}
}
Loading

0 comments on commit 5621dc7

Please sign in to comment.