Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory:Support shrinking in ConcurrentLongLongPairHashMap #3061

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public ReadCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSi

for (int i = 0; i < segmentsCount; i++) {
cacheSegments.add(Unpooled.directBuffer(segmentSize, segmentSize));
cacheIndexes.add(new ConcurrentLongLongPairHashMap(4096, 2 * Runtime.getRuntime().availableProcessors()));
ConcurrentLongLongPairHashMap concurrentLongLongPairHashMap = ConcurrentLongLongPairHashMap.newBuilder()
.expectedItems(4096)
.concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
.build();
cacheIndexes.add(concurrentLongLongPairHashMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public interface EntryConsumer {
void accept(long ledgerId, long entryId, ByteBuf entry);
}

private final ConcurrentLongLongPairHashMap index =
new ConcurrentLongLongPairHashMap(4096, 2 * Runtime.getRuntime().availableProcessors());
private final ConcurrentLongLongPairHashMap index = ConcurrentLongLongPairHashMap.newBuilder()
.expectedItems(4096)
.concurrencyLevel(2 * Runtime.getRuntime().availableProcessors())
.build();

private final ConcurrentLongLongHashMap lastEntryMap =
new ConcurrentLongLongHashMap(4096, 2 * Runtime.getRuntime().availableProcessors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,69 @@ public class ConcurrentLongLongPairHashMap {

private static final long ValueNotFound = -1L;

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;

private final Section[] sections;

public static Builder newBuilder() {
return new Builder();
}

/**
* Builder of ConcurrentLongLongPairHashMap.
*/
public static class Builder {
int expectedItems = DefaultExpectedItems;
int concurrencyLevel = DefaultConcurrencyLevel;
float mapFillFactor = 0.66f;
float mapIdleFactor = 0.15f;
float expandFactor = 2;
float shrinkFactor = 2;
boolean autoShrink = false;

public Builder expectedItems(int expectedItems) {
this.expectedItems = expectedItems;
return this;
}

public Builder concurrencyLevel(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
return this;
}

public Builder mapFillFactor(float mapFillFactor) {
this.mapFillFactor = mapFillFactor;
return this;
}

public Builder mapIdleFactor(float mapIdleFactor) {
this.mapIdleFactor = mapIdleFactor;
return this;
}

public Builder expandFactor(float expandFactor) {
this.expandFactor = expandFactor;
return this;
}

public Builder shrinkFactor(float shrinkFactor) {
this.shrinkFactor = shrinkFactor;
return this;
}

public Builder autoShrink(boolean autoShrink) {
this.autoShrink = autoShrink;
return this;
}

public ConcurrentLongLongPairHashMap build() {
return new ConcurrentLongLongPairHashMap(expectedItems, concurrencyLevel,
mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
}
}

/**
* A BiConsumer Long pair.
*/
Expand All @@ -75,26 +131,20 @@ public interface LongLongPairPredicate {
boolean test(long key1, long key2, long value1, long value2);
}

public ConcurrentLongLongPairHashMap() {
this(DefaultExpectedItems);
}

public ConcurrentLongLongPairHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}

public ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel) {
private ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel, float mapFillFactor, float mapIdleFactor,
boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);

int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = new Section[numSections];

for (int i = 0; i < numSections; i++) {
sections[i] = new Section(perSectionCapacity);
sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
autoShrink, expandFactor, shrinkFactor);
}
}

Expand Down Expand Up @@ -228,14 +278,27 @@ private static final class Section extends StampedLock {
private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;

Section(int capacity) {
private int resizeThresholdUp;
private int resizeThresholdBelow;
private final float mapFillFactor;
private final float mapIdleFactor;
private final float expandFactor;
private final float shrinkFactor;
private boolean autoShrink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set autoShrink to final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

@lordcheng10 lordcheng10 Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed @hangc0276


Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.table = new long[4 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.resizeThreshold = (int) (this.capacity * MapFillFactor);
this.autoShrink = autoShrink;
this.mapFillFactor = mapFillFactor;
this.mapIdleFactor = mapIdleFactor;
this.expandFactor = expandFactor;
this.shrinkFactor = shrinkFactor;
this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
Arrays.fill(table, EmptyKey);
}

Expand Down Expand Up @@ -336,9 +399,10 @@ boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean
bucket = (bucket + 4) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThreshold) {
if (usedBuckets > resizeThresholdUp) {
try {
rehash();
// Expand the hashmap
rehash((int) (capacity * (1 + expandFactor)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If expandFactor == 2, I would expect the size to double each time.

I think this should be:

Suggested change
rehash((int) (capacity * (1 + expandFactor)));
rehash((int) (capacity * expandFactor));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

} finally {
unlockWrite(stamp);
}
Expand Down Expand Up @@ -376,7 +440,16 @@ private boolean remove(long key1, long key2, long value1, long value2, int keyHa
}

} finally {
unlockWrite(stamp);
if (autoShrink && size < resizeThresholdBelow) {
try {
// shrink the hashmap
rehash((int) (capacity * (1 - shrinkFactor)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be like:

Suggested change
rehash((int) (capacity * (1 - shrinkFactor)));
rehash((int) (capacity / shrinkFactor));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.
In addition, should the parameter range check be added? like: @merlimat
shrinkFactor>1
expandFactor>1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.
PTAL,thanks! @merlimat

} finally {
unlockWrite(stamp);
}
} else {
unlockWrite(stamp);
}
}
}

Expand All @@ -388,6 +461,18 @@ private void cleanBucket(int bucket) {
table[bucket + 2] = ValueNotFound;
table[bucket + 3] = ValueNotFound;
--usedBuckets;

// Cleanup all the buckets that were in `DeletedKey` state, so that we can reduce unnecessary expansions
bucket = (bucket - 4) & (table.length - 1);
while (table[bucket] == DeletedKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = EmptyKey;
table[bucket + 2] = ValueNotFound;
table[bucket + 3] = ValueNotFound;
--usedBuckets;

bucket = (bucket - 4) & (table.length - 1);
}
} else {
table[bucket] = DeletedKey;
table[bucket + 1] = DeletedKey;
Expand Down Expand Up @@ -453,9 +538,7 @@ public void forEach(BiConsumerLongPair processor) {
}
}

private void rehash() {
// Expand the hashmap
int newCapacity = capacity * 2;
private void rehash(int newCapacity) {
long[] newTable = new long[4 * newCapacity];
Arrays.fill(newTable, EmptyKey);

Expand All @@ -475,7 +558,8 @@ private void rehash() {
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
resizeThresholdUp = (int) (capacity * mapFillFactor);
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}

private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
Expand Down
Loading