From 6b59c35e5fac43fa782a082026a7c56bcc453c38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Thu, 20 Jun 2024 15:18:03 +0800 Subject: [PATCH] [improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908) (cherry picked from commit 5b1f653e65ccd967fd9642e6d6959de4b1b01a63) --- .../server/src/assemble/LICENSE.bin.txt | 3 +- .../shell/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 2 +- pulsar-common/pom.xml | 5 + .../ConcurrentOpenLongPairRangeSet.java | 12 +- .../collections/ConcurrentRoaringBitSet.java | 439 ++++++++++++++++++ 6 files changed, 453 insertions(+), 10 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 1a66ab6d70a2f..3b30a40ff83e9 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -514,8 +514,7 @@ The Apache Software License, Version 2.0 * RxJava - io.reactivex.rxjava3-rxjava-3.0.1.jar * RoaringBitmap - - org.roaringbitmap-RoaringBitmap-0.9.44.jar - - org.roaringbitmap-shims-0.9.44.jar + - org.roaringbitmap-RoaringBitmap-1.1.0.jar * OpenTelemetry - io.opentelemetry-opentelemetry-api-1.38.0.jar - io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 758f2240df069..72a8e5863cf1d 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -383,6 +383,8 @@ The Apache Software License, Version 2.0 - simpleclient_tracer_common-0.16.0.jar - simpleclient_tracer_otel-0.16.0.jar - simpleclient_tracer_otel_agent-0.16.0.jar + * RoaringBitmap + - RoaringBitmap-1.1.0.jar * Log4J - log4j-api-2.23.1.jar - log4j-core-2.23.1.jar diff --git a/pom.xml b/pom.xml index fd5cd34bfd0e6..54ebfb087c9e2 100644 --- a/pom.xml +++ b/pom.xml @@ -313,7 +313,7 @@ flexible messaging model and an intuitive client API. 1.3 0.4 9.1.0 - 0.9.44 + 1.1.0 1.6.1 6.4.0 3.33.0 diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index df211d6ac87b1..95a61c1d187f5 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -244,6 +244,11 @@ awaitility test + + + org.roaringbitmap + RoaringBitmap + diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 72215d7296cc3..b5ad89d1695d4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.mutable.MutableInt; +import org.roaringbitmap.RoaringBitSet; /** * A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of @@ -44,7 +45,7 @@ public class ConcurrentOpenLongPairRangeSet> implements LongPairRangeSet { protected final NavigableMap rangeBitSetMap = new ConcurrentSkipListMap<>(); - private boolean threadSafe = true; + private final boolean threadSafe; private final int bitSetSize; private final LongPairConsumer consumer; @@ -95,9 +96,7 @@ public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, lon // (2) set 0th-index to upper-index in upperRange.getKey() if (isValid(upperKey, upperValue)) { BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet()); - if (rangeBitSet != null) { - rangeBitSet.set(0, (int) upperValue + 1); - } + rangeBitSet.set(0, (int) upperValue + 1); } // No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing // to set @@ -414,7 +413,6 @@ private int getSafeEntry(long value) { } private BitSet createNewBitSet() { - return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize); + return this.threadSafe ? new ConcurrentRoaringBitSet() : new RoaringBitSet(); } - -} +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java new file mode 100644 index 0000000000000..814e58400993b --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import java.util.BitSet; +import java.util.concurrent.locks.StampedLock; +import java.util.stream.IntStream; +import org.roaringbitmap.RoaringBitSet; + +public class ConcurrentRoaringBitSet extends RoaringBitSet { + private final StampedLock rwLock = new StampedLock(); + + public ConcurrentRoaringBitSet() { + super(); + } + + @Override + public boolean get(int bitIndex) { + long stamp = rwLock.tryOptimisticRead(); + boolean isSet = super.get(bitIndex); + if (!rwLock.validate(stamp)) { + stamp = rwLock.readLock(); + try { + isSet = super.get(bitIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return isSet; + } + + @Override + public void set(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.set(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.clear(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.set(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.clear(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear() { + long stamp = rwLock.writeLock(); + try { + super.clear(); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public int nextSetBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int nextSetBit = super.nextSetBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + nextSetBit = super.nextSetBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return nextSetBit; + } + + @Override + public int nextClearBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int nextClearBit = super.nextClearBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + nextClearBit = super.nextClearBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return nextClearBit; + } + + @Override + public int previousSetBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int previousSetBit = super.previousSetBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + previousSetBit = super.previousSetBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return previousSetBit; + } + + @Override + public int previousClearBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int previousClearBit = super.previousClearBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + previousClearBit = super.previousClearBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return previousClearBit; + } + + @Override + public int length() { + long stamp = rwLock.tryOptimisticRead(); + int length = super.length(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + length = super.length(); + } finally { + rwLock.unlockRead(stamp); + } + } + return length; + } + + @Override + public boolean isEmpty() { + long stamp = rwLock.tryOptimisticRead(); + boolean isEmpty = super.isEmpty(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + isEmpty = super.isEmpty(); + } finally { + rwLock.unlockRead(stamp); + } + } + return isEmpty; + } + + @Override + public int cardinality() { + long stamp = rwLock.tryOptimisticRead(); + int cardinality = super.cardinality(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + cardinality = super.cardinality(); + } finally { + rwLock.unlockRead(stamp); + } + } + return cardinality; + } + + @Override + public int size() { + long stamp = rwLock.tryOptimisticRead(); + int size = super.size(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + size = super.size(); + } finally { + rwLock.unlockRead(stamp); + } + } + return size; + } + + @Override + public byte[] toByteArray() { + long stamp = rwLock.tryOptimisticRead(); + byte[] byteArray = super.toByteArray(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + byteArray = super.toByteArray(); + } finally { + rwLock.unlockRead(stamp); + } + } + return byteArray; + } + + @Override + public long[] toLongArray() { + long stamp = rwLock.tryOptimisticRead(); + long[] longArray = super.toLongArray(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + longArray = super.toLongArray(); + } finally { + rwLock.unlockRead(stamp); + } + } + return longArray; + } + + @Override + public void flip(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.flip(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void flip(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.flip(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int bitIndex, boolean value) { + long stamp = rwLock.writeLock(); + try { + super.set(bitIndex, value); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int fromIndex, int toIndex, boolean value) { + long stamp = rwLock.writeLock(); + try { + super.set(fromIndex, toIndex, value); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public BitSet get(int fromIndex, int toIndex) { + long stamp = rwLock.tryOptimisticRead(); + BitSet bitSet = super.get(fromIndex, toIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + bitSet = super.get(fromIndex, toIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return bitSet; + } + + @Override + public boolean intersects(BitSet set) { + long stamp = rwLock.writeLock(); + try { + return super.intersects(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void and(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.and(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void or(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.or(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void xor(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.xor(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void andNot(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.andNot(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + /** + * Returns the clone of the internal wrapped {@code BitSet}. + * This won't be a clone of the {@code ConcurrentBitSet} object. + * + * @return a clone of the internal wrapped {@code BitSet} + */ + @Override + public Object clone() { + long stamp = rwLock.tryOptimisticRead(); + RoaringBitSet clone = (RoaringBitSet) super.clone(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + clone = (RoaringBitSet) super.clone(); + } finally { + rwLock.unlockRead(stamp); + } + } + return clone; + } + + @Override + public String toString() { + long stamp = rwLock.tryOptimisticRead(); + String str = super.toString(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + str = super.toString(); + } finally { + rwLock.unlockRead(stamp); + } + } + return str; + } + + /** + * This operation is not supported on {@code ConcurrentBitSet}. + */ + @Override + public IntStream stream() { + throw new UnsupportedOperationException("stream is not supported"); + } + + public boolean equals(final Object o) { + long stamp = rwLock.tryOptimisticRead(); + boolean isEqual = super.equals(o); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + isEqual = super.equals(o); + } finally { + rwLock.unlockRead(stamp); + } + } + return isEqual; + } + + public int hashCode() { + long stamp = rwLock.tryOptimisticRead(); + int hashCode = super.hashCode(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + hashCode = super.hashCode(); + } finally { + rwLock.unlockRead(stamp); + } + } + return hashCode; + } +}