diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml b/buildtools/src/main/resources/pulsar/suppressions.xml index 7fd94d913f377..d6d38a2dfa0a6 100644 --- a/buildtools/src/main/resources/pulsar/suppressions.xml +++ b/buildtools/src/main/resources/pulsar/suppressions.xml @@ -40,4 +40,5 @@ + diff --git a/conf/broker.conf b/conf/broker.conf index e897f23c39e6a..80fef8519f503 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -339,6 +339,9 @@ delayedDeliveryEnabled=true # Default is 1 second. delayedDeliveryTickTimeMillis=1000 +# Whether to enable acknowledge of batch local index. +acknowledgmentAtBatchIndexLevelEnabled=false + # Enable tracking of replicated subscriptions state across clusters. enableReplicatedSubscriptions=true diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index a48ed04bf8be0..354c791cf8825 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -643,4 +643,9 @@ Set asyncReplayEntries( * Trim delete entries for the given entries */ void trimDeletedEntries(List entries); + + /** + * Get deleted batch indexes list for a batch message. + */ + long[] getDeletedBatchIndexesAsLongArray(PositionImpl position); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index bf23fe719a547..dde67c2632f4c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -42,6 +42,8 @@ public class ManagedLedgerConfig { private boolean createIfMissing = true; private int maxUnackedRangesToPersist = 10000; + private int maxBatchDeletedIndexToPersist = 10000; + private boolean deletionAtBatchIndexLevelEnabled = true; private int maxUnackedRangesToPersistInZk = 1000; private int maxEntriesPerLedger = 50000; private int maxSizePerLedgerMb = 100; @@ -430,6 +432,13 @@ public int getMaxUnackedRangesToPersist() { return maxUnackedRangesToPersist; } + /** + * @return max batch deleted index that will be persisted and recoverd. + */ + public int getMaxBatchDeletedIndexToPersist() { + return maxBatchDeletedIndexToPersist; + } + /** * @param maxUnackedRangesToPersist * max unacked message ranges that will be persisted and receverd. @@ -585,4 +594,12 @@ public void setBookKeeperEnsemblePlacementPolicyProperties( Map bookKeeperEnsemblePlacementPolicyProperties) { this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties; } + + public boolean isDeletionAtBatchIndexLevelEnabled() { + return deletionAtBatchIndexLevelEnabled; + } + + public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) { + this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 43ae5df5fab57..8a3d31f76e302 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -43,11 +43,14 @@ import java.time.Clock; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,6 +93,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer; @@ -156,6 +160,10 @@ public class ManagedCursorImpl implements ManagedCursor { return position; }; private final LongPairRangeSet individualDeletedMessages; + + // Maintain the deletion status for batch messages + // (ledgerId, entryId) -> deletion indexes + private final ConcurrentSkipListMap batchDeletedIndexes; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; @@ -232,6 +240,11 @@ public interface VoidCallback { this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled() ? new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter) : new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter); + if (config.isDeletionAtBatchIndexLevelEnabled()) { + this.batchDeletedIndexes = new ConcurrentSkipListMap<>(); + } else { + this.batchDeletedIndexes = null; + } this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); STATE_UPDATER.set(this, State.Uninitialized); PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0); @@ -379,6 +392,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac if (positionInfo.getIndividualDeletedMessagesCount() > 0) { recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList()); } + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null + && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { + recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); + } recoveredCursor(position, recoveredProperties, lh); callback.operationComplete(); }, null); @@ -398,6 +415,25 @@ private void recoverIndividualDeletedMessages(List i } } + private void recoverBatchDeletedIndexes (List batchDeletedIndexInfoList) { + lock.writeLock().lock(); + try { + this.batchDeletedIndexes.clear(); + batchDeletedIndexInfoList.forEach(batchDeletedIndexInfo -> { + if (batchDeletedIndexInfo.getDeleteSetCount() > 0) { + long[] array = new long[batchDeletedIndexInfo.getDeleteSetCount()]; + for (int i = 0; i < batchDeletedIndexInfo.getDeleteSetList().size(); i++) { + array[i] = batchDeletedIndexInfo.getDeleteSetList().get(i); + } + this.batchDeletedIndexes.put(PositionImpl.get(batchDeletedIndexInfo.getPosition().getLedgerId(), + batchDeletedIndexInfo.getPosition().getEntryId()), BitSetRecyclable.create().resetWords(array)); + } + }); + } finally { + lock.writeLock().unlock(); + } + } + private void recoveredCursor(PositionImpl position, Map properties, LedgerHandle recoveredFromCursorLedger) { // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), @@ -920,6 +956,10 @@ public void operationComplete() { lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(), null, null); individualDeletedMessages.clear(); + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle); + batchDeletedIndexes.clear(); + } PositionImpl oldReadPosition = readPosition; if (oldReadPosition.compareTo(newPosition) >= 0) { @@ -1507,8 +1547,22 @@ public void asyncMarkDelete(final Position position, Map propertie if (log.isDebugEnabled()) { log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } + PositionImpl newPosition = (PositionImpl) position; + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + if (newPosition.ackSet != null) { + batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet)); + newPosition = ledger.getPreviousPosition(newPosition); + } + Map subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, newPosition); + subMap.values().forEach(BitSetRecyclable::recycle); + subMap.clear(); + } else if (newPosition.ackSet != null) { + newPosition = ledger.getPreviousPosition(newPosition); + newPosition.ackSet = null; + } + if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) { if (log.isDebugEnabled()) { log.debug( @@ -1600,6 +1654,11 @@ public void operationComplete() { try { individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()); + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + Map subMap = batchDeletedIndexes.subMap(PositionImpl.earliest, false, PositionImpl.get(mdEntry.newPosition.getLedgerId(), mdEntry.newPosition.getEntryId()), true); + subMap.values().forEach(BitSetRecyclable::recycle); + subMap.clear(); + } } finally { lock.writeLock().unlock(); } @@ -1722,35 +1781,62 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb for (Position pos : positions) { PositionImpl position = (PositionImpl) checkNotNull(pos); - if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) { if (log.isDebugEnabled()) { log.debug( - "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", - ledger.getName(), position, ledger.getLastConfirmedEntry(), name); + "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", + ledger.getName(), position, ledger.getLastConfirmedEntry(), name); } callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx); return; } if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId()) - || position.compareTo(markDeletePosition) <= 0) { + || position.compareTo(markDeletePosition) <= 0) { + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); + if (bitSetRecyclable != null) { + bitSetRecyclable.recycle(); + } + } if (log.isDebugEnabled()) { log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position); } continue; } - - // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make - // the RangeSet recognize the "continuity" between adjacent Positions - PositionImpl previousPosition = ledger.getPreviousPosition(position); - individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), + if (position.ackSet == null) { + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); + if (bitSetRecyclable != null) { + bitSetRecyclable.recycle(); + } + } + // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make + // the RangeSet recognize the "continuity" between adjacent Positions + PositionImpl previousPosition = ledger.getPreviousPosition(position); + individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); - MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); + MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name, + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name, individualDeletedMessages); + } + } else if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet)); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet); + bitSet.and(givenBitSet); + givenBitSet.recycle(); + if (bitSet.isEmpty()) { + PositionImpl previousPosition = ledger.getPreviousPosition(position); + individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), + position.getLedgerId(), position.getEntryId()); + ++messagesConsumedCounter; + BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); + if (bitSetRecyclable != null) { + bitSetRecyclable.recycle(); + } + } } } @@ -2062,6 +2148,9 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio info.addAllProperties(buildPropertiesMap(properties)); if (persistIndividualDeletedMessageRanges) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); + if (config.isDeletionAtBatchIndexLevelEnabled()) { + info.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()); + } } if (log.isDebugEnabled()) { @@ -2306,11 +2395,38 @@ private List buildIndividualDeletedMessageRanges() { } } + private List buildBatchEntryDeletionIndexInfoList() { + if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) { + return Collections.emptyList(); + } + MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo + .newBuilder(); + MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo + .newBuilder(); + List result = Lists.newArrayList(); + Iterator> iterator = batchDeletedIndexes.entrySet().iterator(); + while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) { + Map.Entry entry = iterator.next(); + nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId()); + nestedPositionBuilder.setEntryId(entry.getKey().getEntryId()); + batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build()); + long[] array = entry.getValue().toLongArray(); + List deleteSet = new ArrayList<>(array.length); + for (long l : array) { + deleteSet.add(l); + } + batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet); + result.add(batchDeletedIndexInfoBuilder.build()); + } + return result; + } + void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { PositionImpl position = mdEntry.newPosition; PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) .setEntryId(position.getEntryId()) .addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()) + .addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList()) .addAllProperties(buildPropertiesMap(mdEntry.properties)).build(); @@ -2693,6 +2809,16 @@ private ManagedCursorImpl cursorImpl() { return this; } + @Override + public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) { + if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) { + BitSetRecyclable bitSet = batchDeletedIndexes.get(position); + return bitSet == null ? null : bitSet.toLongArray(); + } else { + return null; + } + } + void updateReadStats(int readEntriesCount, long readEntriesSize) { this.entriesReadCount += readEntriesCount; this.entriesReadSize += readEntriesSize; @@ -2723,7 +2849,5 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { return Math.min(maxEntriesBasedOnSize, maxEntries); } - - private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index f090b5268d233..1c35ef1f6325a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -30,6 +30,7 @@ public class PositionImpl implements Position, Comparable { protected long ledgerId; protected long entryId; + protected long[] ackSet; public static final PositionImpl earliest = new PositionImpl(-1, -1); public static final PositionImpl latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE); @@ -49,6 +50,12 @@ public PositionImpl(long ledgerId, long entryId) { this.entryId = entryId; } + public PositionImpl(long ledgerId, long entryId, long[] ackSet) { + this.ledgerId = ledgerId; + this.entryId = entryId; + this.ackSet = ackSet; + } + public PositionImpl(PositionImpl other) { this.ledgerId = other.ledgerId; this.entryId = other.entryId; @@ -58,6 +65,10 @@ public static PositionImpl get(long ledgerId, long entryId) { return new PositionImpl(ledgerId, entryId); } + public static PositionImpl get(long ledgerId, long entryId, long[] ackSet) { + return new PositionImpl(ledgerId, entryId, ackSet); + } + public static PositionImpl get(PositionImpl other) { return new PositionImpl(other); } diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 151cd69c716ec..8b1ecbf852087 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -68,6 +68,9 @@ message PositionInfo { // Additional custom properties associated with // the current cursor position repeated LongProperty properties = 4; + + // Store which index in the batch message has been deleted + repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5; } message NestedPositionInfo { @@ -80,6 +83,11 @@ message MessageRange { required NestedPositionInfo upperEndpoint = 2; } +message BatchedEntryDeletionIndexInfo { + required NestedPositionInfo position = 1; + repeated int64 deleteSet = 2; +} + // Generic string and long tuple message LongProperty { required string name = 1; @@ -100,5 +108,8 @@ message ManagedCursorInfo { // the current cursor position repeated LongProperty properties = 5; - optional int64 lastActive = 6; + optional int64 lastActive = 6; + + // Store which index in the batch message has been deleted + repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 0191057d7cfb5..ab87e6f3420fd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.testng.annotations.Test; public class ManagedCursorContainerTest { @@ -326,6 +327,10 @@ public void trimDeletedEntries(List entries) { } @Override + public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) { + return new long[0]; + } + public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx) { } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index e3ba438d2aeba..e53bcaed5a46a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -38,6 +38,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -81,12 +82,14 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -3056,6 +3059,138 @@ public void deleteMessagesCheckhMarkDelete() throws Exception { } @Test + public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("test_batch_index_delete"); + ManagedCursor cursor = ledger.openCursor("c1"); + + final int totalEntries = 100; + final Position[] positions = new Position[totalEntries]; + for (int i = 0; i < totalEntries; i++) { + // add entry + positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding)); + } + assertEquals(cursor.getNumberOfEntries(), totalEntries); + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(2).setEnd(4).build())); + List deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(1, deletedIndexes.size()); + Assert.assertEquals(2, deletedIndexes.get(0).getStart()); + Assert.assertEquals(4, deletedIndexes.get(0).getEnd()); + + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(8).build())); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(1, deletedIndexes.size()); + Assert.assertEquals(2, deletedIndexes.get(0).getStart()); + Assert.assertEquals(8, deletedIndexes.get(0).getEnd()); + + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(0).build())); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertEquals(2, deletedIndexes.size()); + Assert.assertEquals(0, deletedIndexes.get(0).getStart()); + Assert.assertEquals(0, deletedIndexes.get(0).getEnd()); + Assert.assertEquals(2, deletedIndexes.get(1).getStart()); + Assert.assertEquals(8, deletedIndexes.get(1).getEnd()); + + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(1).build())); + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(9).setEnd(9).build())); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10); + Assert.assertNull(deletedIndexes); + Assert.assertEquals(positions[0], cursor.getMarkDeletedPosition()); + + deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build())); + cursor.delete(positions[1]); + deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(6).setEnd(8).build())); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[1]), 10); + Assert.assertNull(deletedIndexes); + + deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build())); + cursor.markDelete(positions[3]); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[2]), 10); + Assert.assertNull(deletedIndexes); + + deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(5).build())); + cursor.resetCursor(positions[0]); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[3]), 10); + Assert.assertNull(deletedIndexes); + } + + @Test + public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent"); + ManagedCursor cursor = ledger.openCursor("c1"); + + final int totalEntries = 100; + final Position[] positions = new Position[totalEntries]; + for (int i = 0; i < totalEntries; i++) { + // add entry + positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding)); + } + assertEquals(cursor.getNumberOfEntries(), totalEntries); + deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build())); + deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + + ledger = factory.open("test_batch_indexes_deletion_persistent"); + cursor = ledger.openCursor("c1"); + List deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10); + Assert.assertEquals(deletedIndexes.size(), 1); + Assert.assertEquals(deletedIndexes.get(0).getStart(), 3); + Assert.assertEquals(deletedIndexes.get(0).getEnd(), 6); + Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[4]); + deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build())); + deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10); + Assert.assertNull(deletedIndexes); + Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]); + } + + private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, + List deleteIndexes) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + PositionImpl pos = (PositionImpl) position; + BitSet bitSet = new BitSet(batchSize); + bitSet.set(0, batchSize); + deleteIndexes.forEach(intRange -> { + bitSet.clear(intRange.getStart(), intRange.getEnd() + 1); + }); + pos.ackSet = bitSet.toLongArray(); + + cursor.asyncDelete(pos, + new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); + latch.await(); + pos.ackSet = null; + } + + private List getAckedIndexRange(long[] bitSetLongArray, int batchSize) { + if (bitSetLongArray == null) { + return null; + } + List result = new ArrayList<>(); + BitSet bitSet = BitSet.valueOf(bitSetLongArray); + int nextClearBit = bitSet.nextClearBit(0); + IntRange.Builder builder = IntRange.newBuilder(); + while (nextClearBit != -1 && nextClearBit <= batchSize) { + int nextSetBit = bitSet.nextSetBit(nextClearBit); + if (nextSetBit == -1) { + break; + } + result.add(builder.setStart(nextClearBit).setEnd(nextSetBit - 1).build()); + nextClearBit = bitSet.nextClearBit(nextSetBit); + } + return result; + } + void testReadEntriesOrWaitWithMaxSize() throws Exception { ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxSize"); ManagedCursor c = ledger.openCursor("c"); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a960dbc538b1d..c9689d1c79292 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -176,6 +176,9 @@ public class ServiceConfiguration implements PulsarConfiguration { + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") private long delayedDeliveryTickTimeMillis = 1000; + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") + private boolean acknowledgmentAtBatchIndexLevelEnabled = false; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Enable the WebSocket API service in broker" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index c8da4209033f5..849d626a441ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -27,8 +27,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; @@ -66,7 +68,7 @@ protected AbstractBaseDispatcher(Subscription subscription) { * an object where the total size in messages and bytes will be returned back to the caller */ public void filterEntriesForConsumer(List entries, EntryBatchSizes batchSizes, - SendMessageInfo sendMessageInfo) { + SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor) { int totalMessages = 0; long totalBytes = 0; @@ -102,6 +104,14 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { totalMessages += batchSize; totalBytes += metadataAndPayload.readableBytes(); batchSizes.setBatchSize(i, batchSize); + if (indexesAcks != null && cursor != null) { + long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId())); + if (ackSet != null) { + indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet)); + } else { + indexesAcks.setIndexesAcks(i,null); + } + } } finally { msgMetadata.recycle(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 3b4f645214c3c..3abefc16a1b8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1065,6 +1065,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null); managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); + managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); + future.complete(managedLedgerConfig); }, (exception) -> future.completeExceptionally(exception))); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index eaefa409daa56..d14ea5e6d0359 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -30,6 +30,7 @@ import io.netty.channel.ChannelPromise; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; @@ -57,6 +59,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.common.util.SafeCollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,8 +212,9 @@ public boolean readCompacted() { * * @return a SendMessageInfo object that contains the detail of what was sent to consumer */ - public ChannelPromise sendMessages(final List entries, EntryBatchSizes batchSizes, int totalMessages, - long totalBytes, RedeliveryTracker redeliveryTracker) { + + public ChannelPromise sendMessages(final List entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, + int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) { this.lastConsumedTimestamp = System.currentTimeMillis(); final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); @@ -222,6 +226,9 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba } writePromise.setSuccess(); batchSizes.recyle(); + if (batchIndexesAcks != null) { + batchIndexesAcks.recycle(); + } return writePromise; } @@ -245,7 +252,8 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry); // reduce permit and increment unackedMsg count with total number of messages in batch-msgs - MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages); + int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount(); + MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages); incrementUnackedMessages(totalMessages); msgOut.recordMultipleEvents(totalMessages, totalBytes); msgOutCounter.add(totalMessages); @@ -295,7 +303,8 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba if (redeliveryTracker.contains(position)) { redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); } - ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), ctx.voidPromise()); + ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload, + batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i)), ctx.voidPromise()); messageId.recycle(); messageIdBuilder.recycle(); entry.release(); @@ -304,6 +313,9 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba // Use an empty write here so that we can just tie the flush with the write promise for last entry ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); batchSizes.recyle(); + if (batchIndexesAcks != null) { + batchIndexesAcks.recycle(); + } }); return writePromise; @@ -387,19 +399,30 @@ void messageAcked(CommandAck ack) { log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId); return; } - - MessageIdData msgId = ack.getMessageId(0); - PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + PositionImpl position = PositionImpl.earliest; + if (ack.getMessageIdCount() == 1) { + MessageIdData msgId = ack.getMessageId(0); + if (msgId.getAckSetCount() > 0) { + position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList())); + } else { + position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + } + } subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, properties); } else { // Individual ack List positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdCount(); i++) { MessageIdData msgId = ack.getMessageId(i); - PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + PositionImpl position; + if (msgId.getAckSetCount() > 0) { + position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), SafeCollectionUtils.longListToArray(msgId.getAckSetList())); + } else { + position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); + } positionsAcked.add(position); - if (Subscription.isIndividualAckMode(subType)) { + if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetCount() == 0) { removePendingAcks(position); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java new file mode 100644 index 0000000000000..e41a29092140f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + + +import io.netty.util.Recycler; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +public class EntryBatchIndexesAcks { + + Pair[] indexesAcks = new Pair[100]; + + public void setIndexesAcks(int entryIdx, Pair indexesAcks) { + this.indexesAcks[entryIdx] = indexesAcks; + } + + public long[] getAckSet(int entryIdx) { + Pair pair = indexesAcks[entryIdx]; + return pair == null ? null : pair.getRight(); + } + + public int getTotalAckedIndexCount() { + int count = 0; + for (Pair pair : indexesAcks) { + if (pair != null) { + count += pair.getLeft() - BitSet.valueOf(pair.getRight()).cardinality(); + } + } + return count; + } + + public void recycle() { + handle.recycle(this); + } + + public static EntryBatchIndexesAcks get() { + return RECYCLER.get(); + } + + private EntryBatchIndexesAcks(Recycler.Handle handle) { + this.handle = handle; + } + + private final Recycler.Handle handle; + private static final Recycler RECYCLER = new Recycler() { + @Override + protected EntryBatchIndexesAcks newObject(Handle handle) { + return new EntryBatchIndexesAcks(handle); + } + }; +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 08f5f4c54f0a4..d97e0db28a6b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -201,8 +201,8 @@ public void sendMessages(List entries) { if (consumer != null) { SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); - filterEntriesForConsumer(entries, batchSizes, sendMessageInfo); - consumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(), + filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null); + consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), getRedeliveryTracker()); TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index e1c2e58d4aa81..7db50cc8698e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -62,8 +62,8 @@ public void sendMessages(List entries) { if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) { SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); - filterEntriesForConsumer(entries, batchSizes, sendMessageInfo); - currentConsumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(), + filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null); + currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), getRedeliveryTracker()); } else { entries.forEach(entry -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index c5183cd5f1b95..76ac57166c069 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -79,8 +79,8 @@ public void sendMessages(List entries) { if (consumer != null) { SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size()); - filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo); - consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo.getTotalMessages(), + filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null); + consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), getRedeliveryTracker()); TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index 5d39c20a2441d..94af8063f5ad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -28,7 +28,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 621c6d2cbaa18..75141034555b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -51,6 +51,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTracker; @@ -518,15 +519,16 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { List entriesForThisConsumer = entries.subList(start, start + messagesForC); EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size()); - filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); - c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(), + c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), redeliveryTracker); int msgSent = sendMessageInfo.getTotalMessages(); start += messagesForC; entriesToDispatch -= messagesForC; - TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -msgSent); + TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount())); totalMessagesSent += sendMessageInfo.getTotalMessages(); totalBytesSent += sendMessageInfo.getTotalBytes(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 867c12b253516..4d856e55b755f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; @@ -237,13 +238,14 @@ public synchronized void internalReadEntriesComplete(final List entries, } else { EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); - filterEntriesForConsumer(entries, batchSizes, sendMessageInfo); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); int totalMessages = sendMessageInfo.getTotalMessages(); long totalBytes = sendMessageInfo.getTotalBytes(); currentConsumer - .sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(), + .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), redeliveryTracker) .addListener(future -> { if (future.isSuccess()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 987ecfcbc682e..22e45d508cf9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; @@ -111,9 +112,10 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size()); - filterEntriesForConsumer(subList, batchSizes, sendMessageInfo); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(); + filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); - consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(), + consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> { if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) { readMoreEntries(); @@ -124,7 +126,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entriesWithSameKey.getValue().remove(0); } - TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -sendMessageInfo.getTotalMessages()); + TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); totalMessagesSent += sendMessageInfo.getTotalMessages(); totalBytesSent += sendMessageInfo.getTotalBytes(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 24f02d2e79b0d..041f511603dc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -349,6 +349,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map getNonDurableSubscription(Stri } catch (ManagedLedgerException e) { subscriptionFuture.completeExceptionally(e); } - - return new PersistentSubscription(this, subscriptionName, cursor, false); + return new PersistentSubscription(this, subscriptionName, cursor, false); }); if (!subscriptionFuture.isDone()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 3ca072dda51aa..f7f35fa5c0907 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; @@ -203,7 +204,7 @@ public CompletableFuture closeAsync() { } @Override - void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, List ackSet, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription, messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 110d04c6ee307..44a2966fe04b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1247,7 +1247,7 @@ public void testAckCommand() throws Exception { PositionImpl pos = new PositionImpl(0, 0); - clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), AckType.Individual, + clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual, null, Collections.emptyMap()); channel.writeInbound(clientCommand); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 598f39da6098d..7d1935a038521 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -100,6 +100,7 @@ public void setup() throws Exception { doReturn(channelMock).when(consumerMock).sendMessages( anyList(), any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), anyInt(), anyLong(), any(RedeliveryTracker.class) @@ -142,6 +143,7 @@ public void testSendMarkerMessage() { verify(consumerMock, times(2)).sendMessages( anyList(), any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), totalMessagesCaptor.capture(), anyLong(), any(RedeliveryTracker.class) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java new file mode 100644 index 0000000000000..71305b300cc1e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckDisableTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class BatchMessageIndexAckDisableTest extends ProducerConsumerBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testBatchMessageIndexAckForSharedSubscription() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "testBatchMessageIndexAckForSharedSubscription"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .create(); + + final int messages = 100; + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < messages; i++) { + if (i % 2 == 0) { + consumer.acknowledge(consumer.receive()); + } + } + + List> received = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + Assert.assertEquals(received.size(), 100); + } + + @Test + public void testBatchMessageIndexAckForExclusiveSubscription() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "testBatchMessageIndexAckForExclusiveSubscription"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .create(); + + final int messages = 100; + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < messages; i++) { + if (i == 49) { + consumer.acknowledgeCumulative(consumer.receive()); + } else { + consumer.receive(); + } + } + + //Wait ack send. + Thread.sleep(1000); + consumer.close(); + consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscribe(); + + List> received = new ArrayList<>(100); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + Assert.assertEquals(received.size(), 100); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java new file mode 100644 index 0000000000000..114b9ae3fdc1e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class BatchMessageIndexAckTest extends ProducerConsumerBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testBatchMessageIndexAckForSharedSubscription() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "testBatchMessageIndexAckForSharedSubscription"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .create(); + + final int messages = 100; + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < messages; i++) { + if (i % 2 == 0) { + consumer.acknowledge(consumer.receive()); + } else { + consumer.negativeAcknowledge(consumer.receive()); + } + } + + List> received = new ArrayList<>(50); + for (int i = 0; i < 50; i++) { + received.add(consumer.receive()); + } + + Assert.assertEquals(received.size(), 50); + + Message moreMessage = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNull(moreMessage); + + futures.clear(); + for (int i = 0; i < 50; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < 50; i++) { + received.add(consumer.receive()); + } + + // Ensure the flow permit is work well since the client skip the acked batch index, + // broker also need to handle the available permits. + Assert.assertEquals(received.size(), 100); + } + + @Test + public void testBatchMessageIndexAckForExclusiveSubscription() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "testBatchMessageIndexAckForExclusiveSubscription"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .create(); + + final int messages = 100; + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < messages; i++) { + if (i == 49) { + consumer.acknowledgeCumulative(consumer.receive()); + } else { + consumer.receive(); + } + } + + //Wait ack send. + Thread.sleep(1000); + consumer.close(); + consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscribe(); + + List> received = new ArrayList<>(50); + for (int i = 0; i < 50; i++) { + received.add(consumer.receive()); + } + + Assert.assertEquals(received.size(), 50); + + Message moreMessage = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNull(moreMessage); + + futures.clear(); + for (int i = 0; i < 50; i++) { + futures.add(producer.sendAsync(i)); + } + FutureUtil.waitForAll(futures).get(); + + for (int i = 0; i < 50; i++) { + received.add(consumer.receive()); + } + + // Ensure the flow permit is work well since the client skip the acked batch index, + // broker also need to handle the available permits. + Assert.assertEquals(received.size(), 100); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java index f7020378d5655..76b84dfca30c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -73,7 +73,7 @@ public void testCompactedOutMessages() throws Exception { = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) .subscriptionName("my-subscriber-name").subscribe()) { // shove it in the sideways - consumer.receiveIndividualMessagesFromBatch(metadata, 0, batchBuffer, + consumer.receiveIndividualMessagesFromBatch(metadata, 0, null, batchBuffer, MessageIdData.newBuilder().setLedgerId(1234) .setEntryId(567).build(), consumer.cnx()); Message m = consumer.receive(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index 93600ec176fca..1517f124bc599 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -31,6 +31,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); + void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map properties); + void flush(); @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index f311936b90c95..f97467ff4ec05 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -26,6 +26,7 @@ public class BatchMessageIdImpl extends MessageIdImpl { private final static int NO_BATCH = -1; private final int batchIndex; + private final int batchSize; private final transient BatchMessageAcker acker; @@ -36,12 +37,13 @@ private BatchMessageIdImpl() { } public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) { - this(ledgerId, entryId, partitionIndex, batchIndex, BatchMessageAckerDisabled.INSTANCE); + this(ledgerId, entryId, partitionIndex, batchIndex, 0, BatchMessageAckerDisabled.INSTANCE); } - public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, BatchMessageAcker acker) { + public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize, BatchMessageAcker acker) { super(ledgerId, entryId, partitionIndex); this.batchIndex = batchIndex; + this.batchSize = batchSize; this.acker = acker; } @@ -50,9 +52,11 @@ public BatchMessageIdImpl(MessageIdImpl other) { if (other instanceof BatchMessageIdImpl) { BatchMessageIdImpl otherId = (BatchMessageIdImpl) other; this.batchIndex = otherId.batchIndex; + this.batchSize = otherId.batchSize; this.acker = otherId.acker; } else { this.batchIndex = NO_BATCH; + this.batchSize = 0; this.acker = BatchMessageAckerDisabled.INSTANCE; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index b08a97c06effb..6dde157ce8482 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -384,7 +384,7 @@ protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayloa } ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId()); if (consumer != null) { - consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this); + consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), cmdMessage.getAckSetList(), headersAndPayload, this); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 220bd76e52b4e..662222fd4af82 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -76,6 +77,7 @@ import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; @@ -95,6 +97,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.SafeCollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -520,6 +523,9 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack ackType); } } else { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(), + batchMessageId.getBatchSize(), ackType, properties); // other messages in batch are still pending ack. return CompletableFuture.completedFuture(null); } @@ -985,7 +991,7 @@ void activeConsumerChanged(boolean isActive) { }); } - void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, List ackSet, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); @@ -1077,7 +1083,7 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade } } else { // handle batch message enqueuing; uncompressed payload has all messages in batch - receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx); + receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx); uncompressedPayload.release(); msgMetadata.recycle(); @@ -1168,7 +1174,7 @@ private void interceptAndComplete(final Message message, final CompletableFut listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage)); } - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload, + void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, List ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { int batchSize = msgMetadata.getNumMessagesInBatch(); @@ -1214,8 +1220,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv continue; } + if (ackSet != null && BitSet.valueOf(SafeCollectionUtils.longListToArray(ackSet)).get(i)) { + singleMessagePayload.release(); + singleMessageMetadataBuilder.recycle(); + ++skippedMessages; + continue; + } + BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), - messageId.getEntryId(), getPartitionIndex(), i, acker); + messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker); final MessageImpl message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount); @@ -1448,7 +1461,7 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC } private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) { - ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), AckType.Individual, + ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual, validationError, Collections.emptyMap()); currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise()); increaseAvailablePermits(currentCnx); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 3c987d95cd74c..9061ac30bd61a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -45,6 +45,11 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { + // no-op + } + @Override public void flush() { // no-op diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 95ca4cf91e0d3..927594c3ff69c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -23,8 +23,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -32,11 +34,13 @@ import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; /** * Group the acknowledgements for a certain time and then sends them out in a single protobuf command. @@ -57,16 +61,21 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * Latest cumulative ack sent to broker */ private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest; + private volatile BitSetRecyclable lastCumulativeAckSet = null; private volatile boolean cumulativeAckFlushRequired = false; private static final AtomicReferenceFieldUpdater LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck"); + private static final AtomicReferenceFieldUpdater LAST_CUMULATIVE_ACK_SET_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, BitSetRecyclable.class, "lastCumulativeAckSet"); + /** * This is a set of all the individual acks that the application has issued and that were not already sent to * broker. */ private final ConcurrentSkipListSet pendingIndividualAcks; + private final ConcurrentHashMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; @@ -74,6 +83,7 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum EventLoopGroup eventLoopGroup) { this.consumer = consumer; this.pendingIndividualAcks = new ConcurrentSkipListSet<>(); + this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); if (acknowledgementGroupTimeMicros > 0) { @@ -103,7 +113,7 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map= MAX_ACK_GROUP_SIZE) { flush(); } } } - private void doCumulativeAck(MessageIdImpl msgId) { + @Override + public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map properties) { + if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) { + doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties); + } else if (ackType == AckType.Cumulative) { + BitSetRecyclable bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + bitSet.clear(0, batchIndex + 1); + doCumulativeAck(msgId, bitSet); + } else if (ackType == AckType.Individual) { + ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( + new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> { + ConcurrentBitSetRecyclable value = ConcurrentBitSetRecyclable.create(); + value.set(0, batchSize + 1); + value.clear(batchIndex); + return value; + }); + bitSet.clear(batchIndex); + if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) { + flush(); + } + } + } + + private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) { // Handle concurrent updates from different threads while (true) { MessageIdImpl lastCumlativeAck = this.lastCumulativeAck; + BitSetRecyclable lastBitSet = this.lastCumulativeAckSet; if (msgId.compareTo(lastCumlativeAck) > 0) { - if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId)) { + if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet)) { + if (lastBitSet != null) { + try { + lastBitSet.recycle(); + } catch (Exception ignore) { + // no-op + } + } // Successfully updated the last cumulative ack. Next flush iteration will send this to broker. cumulativeAckFlushRequired = true; return; @@ -142,13 +185,32 @@ private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map properties) { + ClientCnx cnx = consumer.getClientCnx(); + + if (cnx == null) { + return false; + } + BitSetRecyclable bitSet = BitSetRecyclable.create(); + bitSet.set(0, batchSize); + if (ackType == AckType.Cumulative) { + bitSet.clear(0, batchIndex + 1); + } else { + bitSet.clear(batchIndex); + } + + final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties); + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + return true; + } + /** * Flush all the pending acks and send them to the broker */ @@ -164,7 +226,7 @@ public void flush() { boolean shouldFlush = false; if (cumulativeAckFlushRequired) { - ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, + ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap()); cnx.ctx().write(cmd, cnx.ctx().voidPromise()); shouldFlush=true; @@ -172,22 +234,18 @@ public void flush() { } // Flush all individual acks + List> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { // We can send 1 single protobuf command with all individual acks - List> entriesToAck = new ArrayList<>(pendingIndividualAcks.size()); while (true) { MessageIdImpl msgId = pendingIndividualAcks.pollFirst(); if (msgId == null) { break; } - entriesToAck.add(Pair.of(msgId.getLedgerId(), msgId.getEntryId())); + entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null)); } - - cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck), - cnx.ctx().voidPromise()); - shouldFlush = true; } else { // When talking to older brokers, send the acknowledgements individually while (true) { @@ -196,17 +254,33 @@ public void flush() { break; } - cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), + cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise()); shouldFlush = true; } } } + if (!pendingIndividualBatchIndexAcks.isEmpty()) { + Iterator> iterator = pendingIndividualBatchIndexAcks.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue())); + iterator.remove(); + } + } + + if (entriesToAck.size() > 0) { + cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck), + cnx.ctx().voidPromise()); + shouldFlush = true; + } + if (shouldFlush) { if (log.isDebugEnabled()) { - log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}", - consumer, lastCumulativeAck, pendingIndividualAcks); + log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", + consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks); } cnx.ctx().flush(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index a0de0704c2d9a..79e368e7e2703 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Lock; @@ -36,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -177,6 +179,7 @@ protected void triggerListener(int numMessages) { @Override void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, + List ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { log.warn( "Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java index c32a59376f9df..29207378e6666 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java @@ -76,7 +76,7 @@ public void equalsTest() { public void deserializationTest() { // initialize BitSet with null BatchMessageAcker ackerDisabled = new BatchMessageAcker(null, 0); - BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, ackerDisabled); + BatchMessageIdImpl batchMsgId = new BatchMessageIdImpl(0, 0, 0, 0, 0, ackerDisabled); ObjectWriter writer = ObjectMapperFactory.create().writerWithDefaultPrettyPrinter(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 9e0bf860fe74f..d3b9f3cba802b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -1133,6 +1133,11 @@ public interface MessageIdDataOrBuilder // optional int32 batch_index = 4 [default = -1]; boolean hasBatchIndex(); int getBatchIndex(); + + // repeated int64 ack_set = 5; + java.util.List getAckSetList(); + int getAckSetCount(); + long getAckSet(int index); } public static final class MessageIdData extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -1209,11 +1214,26 @@ public int getBatchIndex() { return batchIndex_; } + // repeated int64 ack_set = 5; + public static final int ACK_SET_FIELD_NUMBER = 5; + private java.util.List ackSet_; + public java.util.List + getAckSetList() { + return ackSet_; + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + private void initFields() { ledgerId_ = 0L; entryId_ = 0L; partition_ = -1; batchIndex_ = -1; + ackSet_ = java.util.Collections.emptyList();; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1252,6 +1272,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeInt32(4, batchIndex_); } + for (int i = 0; i < ackSet_.size(); i++) { + output.writeInt64(5, ackSet_.get(i)); + } } private int memoizedSerializedSize = -1; @@ -1276,6 +1299,15 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeInt32Size(4, batchIndex_); } + { + int dataSize = 0; + for (int i = 0; i < ackSet_.size(); i++) { + dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt64SizeNoTag(ackSet_.get(i)); + } + size += dataSize; + size += 1 * getAckSetList().size(); + } memoizedSerializedSize = size; return size; } @@ -1397,6 +1429,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); batchIndex_ = -1; bitField0_ = (bitField0_ & ~0x00000008); + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -1446,6 +1480,11 @@ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData buildPartial() to_bitField0_ |= 0x00000008; } result.batchIndex_ = batchIndex_; + if (((bitField0_ & 0x00000010) == 0x00000010)) { + ackSet_ = java.util.Collections.unmodifiableList(ackSet_); + bitField0_ = (bitField0_ & ~0x00000010); + } + result.ackSet_ = ackSet_; result.bitField0_ = to_bitField0_; return result; } @@ -1464,6 +1503,16 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdD if (other.hasBatchIndex()) { setBatchIndex(other.getBatchIndex()); } + if (!other.ackSet_.isEmpty()) { + if (ackSet_.isEmpty()) { + ackSet_ = other.ackSet_; + bitField0_ = (bitField0_ & ~0x00000010); + } else { + ensureAckSetIsMutable(); + ackSet_.addAll(other.ackSet_); + } + + } return this; } @@ -1521,6 +1570,11 @@ public Builder mergeFrom( batchIndex_ = input.readInt32(); break; } + case 40: { + ensureAckSetIsMutable(); + ackSet_.add(input.readInt64()); + break; + } } } } @@ -1611,6 +1665,51 @@ public Builder clearBatchIndex() { return this; } + // repeated int64 ack_set = 5; + private java.util.List ackSet_ = java.util.Collections.emptyList();; + private void ensureAckSetIsMutable() { + if (!((bitField0_ & 0x00000010) == 0x00000010)) { + ackSet_ = new java.util.ArrayList(ackSet_); + bitField0_ |= 0x00000010; + } + } + public java.util.List + getAckSetList() { + return java.util.Collections.unmodifiableList(ackSet_); + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + public Builder setAckSet( + int index, long value) { + ensureAckSetIsMutable(); + ackSet_.set(index, value); + + return this; + } + public Builder addAckSet(long value) { + ensureAckSetIsMutable(); + ackSet_.add(value); + + return this; + } + public Builder addAllAckSet( + java.lang.Iterable values) { + ensureAckSetIsMutable(); + super.addAll(values, ackSet_); + + return this; + } + public Builder clearAckSet() { + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000010); + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageIdData) } @@ -17748,6 +17847,11 @@ public interface CommandMessageOrBuilder // optional uint32 redelivery_count = 3 [default = 0]; boolean hasRedeliveryCount(); int getRedeliveryCount(); + + // repeated int64 ack_set = 4; + java.util.List getAckSetList(); + int getAckSetCount(); + long getAckSet(int index); } public static final class CommandMessage extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -17814,10 +17918,25 @@ public int getRedeliveryCount() { return redeliveryCount_; } + // repeated int64 ack_set = 4; + public static final int ACK_SET_FIELD_NUMBER = 4; + private java.util.List ackSet_; + public java.util.List + getAckSetList() { + return ackSet_; + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + private void initFields() { consumerId_ = 0L; messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); redeliveryCount_ = 0; + ackSet_ = java.util.Collections.emptyList();; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17857,6 +17976,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeUInt32(3, redeliveryCount_); } + for (int i = 0; i < ackSet_.size(); i++) { + output.writeInt64(4, ackSet_.get(i)); + } } private int memoizedSerializedSize = -1; @@ -17877,6 +17999,15 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt32Size(3, redeliveryCount_); } + { + int dataSize = 0; + for (int i = 0; i < ackSet_.size(); i++) { + dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeInt64SizeNoTag(ackSet_.get(i)); + } + size += dataSize; + size += 1 * getAckSetList().size(); + } memoizedSerializedSize = size; return size; } @@ -17996,6 +18127,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000002); redeliveryCount_ = 0; bitField0_ = (bitField0_ & ~0x00000004); + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -18041,6 +18174,11 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage buildPartial( to_bitField0_ |= 0x00000004; } result.redeliveryCount_ = redeliveryCount_; + if (((bitField0_ & 0x00000008) == 0x00000008)) { + ackSet_ = java.util.Collections.unmodifiableList(ackSet_); + bitField0_ = (bitField0_ & ~0x00000008); + } + result.ackSet_ = ackSet_; result.bitField0_ = to_bitField0_; return result; } @@ -18056,6 +18194,16 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandMes if (other.hasRedeliveryCount()) { setRedeliveryCount(other.getRedeliveryCount()); } + if (!other.ackSet_.isEmpty()) { + if (ackSet_.isEmpty()) { + ackSet_ = other.ackSet_; + bitField0_ = (bitField0_ & ~0x00000008); + } else { + ensureAckSetIsMutable(); + ackSet_.addAll(other.ackSet_); + } + + } return this; } @@ -18117,6 +18265,11 @@ public Builder mergeFrom( redeliveryCount_ = input.readUInt32(); break; } + case 32: { + ensureAckSetIsMutable(); + ackSet_.add(input.readInt64()); + break; + } } } } @@ -18208,6 +18361,51 @@ public Builder clearRedeliveryCount() { return this; } + // repeated int64 ack_set = 4; + private java.util.List ackSet_ = java.util.Collections.emptyList();; + private void ensureAckSetIsMutable() { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { + ackSet_ = new java.util.ArrayList(ackSet_); + bitField0_ |= 0x00000008; + } + } + public java.util.List + getAckSetList() { + return java.util.Collections.unmodifiableList(ackSet_); + } + public int getAckSetCount() { + return ackSet_.size(); + } + public long getAckSet(int index) { + return ackSet_.get(index); + } + public Builder setAckSet( + int index, long value) { + ensureAckSetIsMutable(); + ackSet_.set(index, value); + + return this; + } + public Builder addAckSet(long value) { + ensureAckSetIsMutable(); + ackSet_.add(value); + + return this; + } + public Builder addAllAckSet( + java.lang.Iterable values) { + ensureAckSetIsMutable(); + super.addAll(values, ackSet_); + + return this; + } + public Builder clearAckSet() { + ackSet_ = java.util.Collections.emptyList();; + bitField0_ = (bitField0_ & ~0x00000008); + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0e1a1f9e68426..cbc4a39293c79 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -39,7 +39,7 @@ import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -117,6 +117,9 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.SafeCollectionUtils; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; @@ -437,13 +440,16 @@ public static void skipMessageMetadata(ByteBuf buffer) { } public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, - ByteBuf metadataAndPayload) { + ByteBuf metadataAndPayload, long[] ackSet) { CommandMessage.Builder msgBuilder = CommandMessage.newBuilder(); msgBuilder.setConsumerId(consumerId); msgBuilder.setMessageId(messageId); if (redeliveryCount > 0) { msgBuilder.setRedeliveryCount(redeliveryCount); } + if (ackSet != null) { + msgBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet)); + } CommandMessage msg = msgBuilder.build(); BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); BaseCommand cmd = cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build(); @@ -859,7 +865,8 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg, return res; } - public static ByteBuf newMultiMessageAck(long consumerId, List> entries) { + public static ByteBuf newMultiMessageAck(long consumerId, + List> entries) { CommandAck.Builder ackBuilder = CommandAck.newBuilder(); ackBuilder.setConsumerId(consumerId); ackBuilder.setAckType(AckType.Individual); @@ -867,14 +874,19 @@ public static ByteBuf newMultiMessageAck(long consumerId, List> int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); - long entryId = entries.get(i).getRight(); - + long entryId = entries.get(i).getMiddle(); + ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); MessageIdData.Builder messageIdDataBuilder = MessageIdData.newBuilder(); messageIdDataBuilder.setLedgerId(ledgerId); messageIdDataBuilder.setEntryId(entryId); + if (bitSet != null) { + messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(bitSet.toLongArray())); + } MessageIdData messageIdData = messageIdDataBuilder.build(); ackBuilder.addMessageId(messageIdData); - + if (bitSet != null) { + bitSet.recycle(); + } messageIdDataBuilder.recycle(); } @@ -890,12 +902,12 @@ public static ByteBuf newMultiMessageAck(long consumerId, List> return res; } - public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType, + public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType, ValidationError validationError, Map properties) { - return newAck(consumerId, ledgerId, entryId, ackType, validationError, properties, 0, 0); + return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, 0, 0); } - public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType, + public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType, ValidationError validationError, Map properties, long txnIdLeastBits, long txnIdMostBits) { CommandAck.Builder ackBuilder = CommandAck.newBuilder(); @@ -904,6 +916,9 @@ public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckTy MessageIdData.Builder messageIdDataBuilder = MessageIdData.newBuilder(); messageIdDataBuilder.setLedgerId(ledgerId); messageIdDataBuilder.setEntryId(entryId); + if (ackSet != null) { + messageIdDataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet.toLongArray())); + } MessageIdData messageIdData = messageIdDataBuilder.build(); ackBuilder.addMessageId(messageIdData); if (validationError != null) { @@ -923,6 +938,9 @@ public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckTy ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack)); ack.recycle(); + if (ackSet != null) { + ackSet.recycle(); + } ackBuilder.recycle(); messageIdDataBuilder.recycle(); messageIdData.recycle(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java new file mode 100644 index 0000000000000..21488fb3e5ac8 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Safe collection utils. + */ +public class SafeCollectionUtils { + + public static List longArrayToList(long[] array) { + return array == null || array.length == 0 ? Collections.emptyList() + : Arrays.stream(array).boxed().collect(Collectors.toList()); + } + + public static long[] longListToArray(List list) { + return list == null || list.size() == 0 ? new long[0] : list.stream().mapToLong(l->l).toArray(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java new file mode 100644 index 0000000000000..285416ac606b2 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java @@ -0,0 +1,1209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.LongBuffer; +import java.util.Arrays; +import java.util.BitSet; + +/** + * This this copy of {@link BitSet}. + * Provides {@link BitSetRecyclable#resetWords(long[])} method and leverage with netty recycler. + */ +public class BitSetRecyclable implements Cloneable, java.io.Serializable { + /* + * BitSets are packed into arrays of "words." Currently a word is + * a long, which consists of 64 bits, requiring 6 address bits. + * The choice of word size is determined purely by performance concerns. + */ + private final static int ADDRESS_BITS_PER_WORD = 6; + private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private final static int BIT_INDEX_MASK = BITS_PER_WORD - 1; + + /* Used to shift left or right for a partial word mask */ + private static final long WORD_MASK = 0xffffffffffffffffL; + + /** + * @serialField bits long[] + * + * The bits in this BitSet. The ith bit is stored in bits[i/64] at + * bit position i % 64 (where bit position 0 refers to the least + * significant bit and 63 refers to the most significant bit). + */ + private static final ObjectStreamField[] serialPersistentFields = { + new ObjectStreamField("bits", long[].class), + }; + + /** + * The internal field corresponding to the serialField "bits". + */ + private long[] words; + + /** + * The number of words in the logical size of this BitSet. + */ + private transient int wordsInUse = 0; + + /** + * Whether the size of "words" is user-specified. If so, we assume + * the user knows what he's doing and try harder to preserve it. + */ + private transient boolean sizeIsSticky = false; + + /* use serialVersionUID from JDK 1.0.2 for interoperability */ + private static final long serialVersionUID = 7997698588986878753L; + + /** + * Given a bit index, return word index containing it. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> ADDRESS_BITS_PER_WORD; + } + + /** + * Every public method must preserve these invariants. + */ + private void checkInvariants() { + assert(wordsInUse == 0 || words[wordsInUse - 1] != 0); + assert(wordsInUse >= 0 && wordsInUse <= words.length); + assert(wordsInUse == words.length || words[wordsInUse] == 0); + } + + /** + * Sets the field wordsInUse to the logical size in words of the bit set. + * WARNING:This method assumes that the number of words actually in use is + * less than or equal to the current value of wordsInUse! + */ + private void recalculateWordsInUse() { + // Traverse the bitset until a used word is found + int i; + for (i = wordsInUse-1; i >= 0; i--) + if (words[i] != 0) + break; + + wordsInUse = i+1; // The new logical size + } + + /** + * Creates a new bit set. All bits are initially {@code false}. + */ + public BitSetRecyclable() { + initWords(BITS_PER_WORD); + sizeIsSticky = false; + } + + /** + * Creates a bit set whose initial size is large enough to explicitly + * represent bits with indices in the range {@code 0} through + * {@code nbits-1}. All bits are initially {@code false}. + * + * @param nbits the initial size of the bit set + * @throws NegativeArraySizeException if the specified initial size + * is negative + */ + public BitSetRecyclable(int nbits) { + // nbits can't be negative; size 0 is OK + if (nbits < 0) + throw new NegativeArraySizeException("nbits < 0: " + nbits); + + initWords(nbits); + sizeIsSticky = true; + } + + private void initWords(int nbits) { + words = new long[wordIndex(nbits-1) + 1]; + } + + /** + * Creates a bit set using words as the internal representation. + * The last word (if there is one) must be non-zero. + */ + private BitSetRecyclable(long[] words) { + this.words = words; + this.wordsInUse = words.length; + checkInvariants(); + } + + /** + * Returns a new bit set containing all the bits in the given long array. + * + *

More precisely, + *
{@code BitSet.valueOf(longs).get(n) == ((longs[n/64] & (1L<<(n%64))) != 0)} + *
for all {@code n < 64 * longs.length}. + * + *

This method is equivalent to + * {@code BitSet.valueOf(LongBuffer.wrap(longs))}. + * + * @param longs a long array containing a little-endian representation + * of a sequence of bits to be used as the initial bits of the + * new bit set + * @return a {@code BitSet} containing all the bits in the long array + * @since 1.7 + */ + public static BitSetRecyclable valueOf(long[] longs) { + int n; + for (n = longs.length; n > 0 && longs[n - 1] == 0; n--) + ; + return new BitSetRecyclable(Arrays.copyOf(longs, n)); + } + + /** + * Returns a new bit set containing all the bits in the given long + * buffer between its position and limit. + * + *

More precisely, + *
{@code BitSet.valueOf(lb).get(n) == ((lb.get(lb.position()+n/64) & (1L<<(n%64))) != 0)} + *
for all {@code n < 64 * lb.remaining()}. + * + *

The long buffer is not modified by this method, and no + * reference to the buffer is retained by the bit set. + * + * @param lb a long buffer containing a little-endian representation + * of a sequence of bits between its position and limit, to be + * used as the initial bits of the new bit set + * @return a {@code BitSet} containing all the bits in the buffer in the + * specified range + * @since 1.7 + */ + public static BitSetRecyclable valueOf(LongBuffer lb) { + lb = lb.slice(); + int n; + for (n = lb.remaining(); n > 0 && lb.get(n - 1) == 0; n--) + ; + long[] words = new long[n]; + lb.get(words); + return new BitSetRecyclable(words); + } + + /** + * Returns a new bit set containing all the bits in the given byte array. + * + *

More precisely, + *
{@code BitSet.valueOf(bytes).get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)} + *
for all {@code n < 8 * bytes.length}. + * + *

This method is equivalent to + * {@code BitSet.valueOf(ByteBuffer.wrap(bytes))}. + * + * @param bytes a byte array containing a little-endian + * representation of a sequence of bits to be used as the + * initial bits of the new bit set + * @return a {@code BitSet} containing all the bits in the byte array + * @since 1.7 + */ + public static BitSetRecyclable valueOf(byte[] bytes) { + return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes)); + } + + /** + * Returns a new bit set containing all the bits in the given byte + * buffer between its position and limit. + * + *

More precisely, + *
{@code BitSet.valueOf(bb).get(n) == ((bb.get(bb.position()+n/8) & (1<<(n%8))) != 0)} + *
for all {@code n < 8 * bb.remaining()}. + * + *

The byte buffer is not modified by this method, and no + * reference to the buffer is retained by the bit set. + * + * @param bb a byte buffer containing a little-endian representation + * of a sequence of bits between its position and limit, to be + * used as the initial bits of the new bit set + * @return a {@code BitSet} containing all the bits in the buffer in the + * specified range + * @since 1.7 + */ + public static BitSetRecyclable valueOf(ByteBuffer bb) { + bb = bb.slice().order(ByteOrder.LITTLE_ENDIAN); + int n; + for (n = bb.remaining(); n > 0 && bb.get(n - 1) == 0; n--) + ; + long[] words = new long[(n + 7) / 8]; + bb.limit(n); + int i = 0; + while (bb.remaining() >= 8) + words[i++] = bb.getLong(); + for (int remaining = bb.remaining(), j = 0; j < remaining; j++) + words[i] |= (bb.get() & 0xffL) << (8 * j); + return new BitSetRecyclable(words); + } + + /** + * Returns a new byte array containing all the bits in this bit set. + * + *

More precisely, if + *
{@code byte[] bytes = s.toByteArray();} + *
then {@code bytes.length == (s.length()+7)/8} and + *
{@code s.get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)} + *
for all {@code n < 8 * bytes.length}. + * + * @return a byte array containing a little-endian representation + * of all the bits in this bit set + * @since 1.7 + */ + public byte[] toByteArray() { + int n = wordsInUse; + if (n == 0) + return new byte[0]; + int len = 8 * (n-1); + for (long x = words[n - 1]; x != 0; x >>>= 8) + len++; + byte[] bytes = new byte[len]; + ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < n - 1; i++) + bb.putLong(words[i]); + for (long x = words[n - 1]; x != 0; x >>>= 8) + bb.put((byte) (x & 0xff)); + return bytes; + } + + /** + * Returns a new long array containing all the bits in this bit set. + * + *

More precisely, if + *
{@code long[] longs = s.toLongArray();} + *
then {@code longs.length == (s.length()+63)/64} and + *
{@code s.get(n) == ((longs[n/64] & (1L<<(n%64))) != 0)} + *
for all {@code n < 64 * longs.length}. + * + * @return a long array containing a little-endian representation + * of all the bits in this bit set + * @since 1.7 + */ + public long[] toLongArray() { + return Arrays.copyOf(words, wordsInUse); + } + + /** + * Ensures that the BitSet can hold enough words. + * @param wordsRequired the minimum acceptable number of words. + */ + private void ensureCapacity(int wordsRequired) { + if (words.length < wordsRequired) { + // Allocate larger of doubled size or required size + int request = Math.max(2 * words.length, wordsRequired); + words = Arrays.copyOf(words, request); + sizeIsSticky = false; + } + } + + /** + * Ensures that the BitSet can accommodate a given wordIndex, + * temporarily violating the invariants. The caller must + * restore the invariants before returning to the user, + * possibly using recalculateWordsInUse(). + * @param wordIndex the index to be accommodated. + */ + private void expandTo(int wordIndex) { + int wordsRequired = wordIndex+1; + if (wordsInUse < wordsRequired) { + ensureCapacity(wordsRequired); + wordsInUse = wordsRequired; + } + } + + /** + * Checks that fromIndex ... toIndex is a valid range of bit indices. + */ + private static void checkRange(int fromIndex, int toIndex) { + if (fromIndex < 0) + throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex); + if (toIndex < 0) + throw new IndexOutOfBoundsException("toIndex < 0: " + toIndex); + if (fromIndex > toIndex) + throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + + " > toIndex: " + toIndex); + } + + /** + * Sets the bit at the specified index to the complement of its + * current value. + * + * @param bitIndex the index of the bit to flip + * @throws IndexOutOfBoundsException if the specified index is negative + * @since 1.4 + */ + public void flip(int bitIndex) { + if (bitIndex < 0) + throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex); + + int wordIndex = wordIndex(bitIndex); + expandTo(wordIndex); + + words[wordIndex] ^= (1L << bitIndex); + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Sets each bit from the specified {@code fromIndex} (inclusive) to the + * specified {@code toIndex} (exclusive) to the complement of its current + * value. + * + * @param fromIndex index of the first bit to flip + * @param toIndex index after the last bit to flip + * @throws IndexOutOfBoundsException if {@code fromIndex} is negative, + * or {@code toIndex} is negative, or {@code fromIndex} is + * larger than {@code toIndex} + * @since 1.4 + */ + public void flip(int fromIndex, int toIndex) { + checkRange(fromIndex, toIndex); + + if (fromIndex == toIndex) + return; + + int startWordIndex = wordIndex(fromIndex); + int endWordIndex = wordIndex(toIndex - 1); + expandTo(endWordIndex); + + long firstWordMask = WORD_MASK << fromIndex; + long lastWordMask = WORD_MASK >>> -toIndex; + if (startWordIndex == endWordIndex) { + // Case 1: One word + words[startWordIndex] ^= (firstWordMask & lastWordMask); + } else { + // Case 2: Multiple words + // Handle first word + words[startWordIndex] ^= firstWordMask; + + // Handle intermediate words, if any + for (int i = startWordIndex+1; i < endWordIndex; i++) + words[i] ^= WORD_MASK; + + // Handle last word + words[endWordIndex] ^= lastWordMask; + } + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Sets the bit at the specified index to {@code true}. + * + * @param bitIndex a bit index + * @throws IndexOutOfBoundsException if the specified index is negative + * @since JDK1.0 + */ + public void set(int bitIndex) { + if (bitIndex < 0) + throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex); + + int wordIndex = wordIndex(bitIndex); + expandTo(wordIndex); + + words[wordIndex] |= (1L << bitIndex); // Restores invariants + + checkInvariants(); + } + + /** + * Sets the bit at the specified index to the specified value. + * + * @param bitIndex a bit index + * @param value a boolean value to set + * @throws IndexOutOfBoundsException if the specified index is negative + * @since 1.4 + */ + public void set(int bitIndex, boolean value) { + if (value) + set(bitIndex); + else + clear(bitIndex); + } + + /** + * Sets the bits from the specified {@code fromIndex} (inclusive) to the + * specified {@code toIndex} (exclusive) to {@code true}. + * + * @param fromIndex index of the first bit to be set + * @param toIndex index after the last bit to be set + * @throws IndexOutOfBoundsException if {@code fromIndex} is negative, + * or {@code toIndex} is negative, or {@code fromIndex} is + * larger than {@code toIndex} + * @since 1.4 + */ + public void set(int fromIndex, int toIndex) { + checkRange(fromIndex, toIndex); + + if (fromIndex == toIndex) + return; + + // Increase capacity if necessary + int startWordIndex = wordIndex(fromIndex); + int endWordIndex = wordIndex(toIndex - 1); + expandTo(endWordIndex); + + long firstWordMask = WORD_MASK << fromIndex; + long lastWordMask = WORD_MASK >>> -toIndex; + if (startWordIndex == endWordIndex) { + // Case 1: One word + words[startWordIndex] |= (firstWordMask & lastWordMask); + } else { + // Case 2: Multiple words + // Handle first word + words[startWordIndex] |= firstWordMask; + + // Handle intermediate words, if any + for (int i = startWordIndex+1; i < endWordIndex; i++) + words[i] = WORD_MASK; + + // Handle last word (restores invariants) + words[endWordIndex] |= lastWordMask; + } + + checkInvariants(); + } + + /** + * Sets the bits from the specified {@code fromIndex} (inclusive) to the + * specified {@code toIndex} (exclusive) to the specified value. + * + * @param fromIndex index of the first bit to be set + * @param toIndex index after the last bit to be set + * @param value value to set the selected bits to + * @throws IndexOutOfBoundsException if {@code fromIndex} is negative, + * or {@code toIndex} is negative, or {@code fromIndex} is + * larger than {@code toIndex} + * @since 1.4 + */ + public void set(int fromIndex, int toIndex, boolean value) { + if (value) + set(fromIndex, toIndex); + else + clear(fromIndex, toIndex); + } + + /** + * Sets the bit specified by the index to {@code false}. + * + * @param bitIndex the index of the bit to be cleared + * @throws IndexOutOfBoundsException if the specified index is negative + * @since JDK1.0 + */ + public void clear(int bitIndex) { + if (bitIndex < 0) + throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex); + + int wordIndex = wordIndex(bitIndex); + if (wordIndex >= wordsInUse) + return; + + words[wordIndex] &= ~(1L << bitIndex); + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Sets the bits from the specified {@code fromIndex} (inclusive) to the + * specified {@code toIndex} (exclusive) to {@code false}. + * + * @param fromIndex index of the first bit to be cleared + * @param toIndex index after the last bit to be cleared + * @throws IndexOutOfBoundsException if {@code fromIndex} is negative, + * or {@code toIndex} is negative, or {@code fromIndex} is + * larger than {@code toIndex} + * @since 1.4 + */ + public void clear(int fromIndex, int toIndex) { + checkRange(fromIndex, toIndex); + + if (fromIndex == toIndex) + return; + + int startWordIndex = wordIndex(fromIndex); + if (startWordIndex >= wordsInUse) + return; + + int endWordIndex = wordIndex(toIndex - 1); + if (endWordIndex >= wordsInUse) { + toIndex = length(); + endWordIndex = wordsInUse - 1; + } + + long firstWordMask = WORD_MASK << fromIndex; + long lastWordMask = WORD_MASK >>> -toIndex; + if (startWordIndex == endWordIndex) { + // Case 1: One word + words[startWordIndex] &= ~(firstWordMask & lastWordMask); + } else { + // Case 2: Multiple words + // Handle first word + words[startWordIndex] &= ~firstWordMask; + + // Handle intermediate words, if any + for (int i = startWordIndex+1; i < endWordIndex; i++) + words[i] = 0; + + // Handle last word + words[endWordIndex] &= ~lastWordMask; + } + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Sets all of the bits in this BitSet to {@code false}. + * + * @since 1.4 + */ + public void clear() { + while (wordsInUse > 0) + words[--wordsInUse] = 0; + } + + /** + * Returns the value of the bit with the specified index. The value + * is {@code true} if the bit with the index {@code bitIndex} + * is currently set in this {@code BitSet}; otherwise, the result + * is {@code false}. + * + * @param bitIndex the bit index + * @return the value of the bit with the specified index + * @throws IndexOutOfBoundsException if the specified index is negative + */ + public boolean get(int bitIndex) { + if (bitIndex < 0) + throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex); + + checkInvariants(); + + int wordIndex = wordIndex(bitIndex); + return (wordIndex < wordsInUse) + && ((words[wordIndex] & (1L << bitIndex)) != 0); + } + + /** + * Returns a new {@code BitSet} composed of bits from this {@code BitSet} + * from {@code fromIndex} (inclusive) to {@code toIndex} (exclusive). + * + * @param fromIndex index of the first bit to include + * @param toIndex index after the last bit to include + * @return a new {@code BitSet} from a range of this {@code BitSet} + * @throws IndexOutOfBoundsException if {@code fromIndex} is negative, + * or {@code toIndex} is negative, or {@code fromIndex} is + * larger than {@code toIndex} + * @since 1.4 + */ + public BitSetRecyclable get(int fromIndex, int toIndex) { + checkRange(fromIndex, toIndex); + + checkInvariants(); + + int len = length(); + + // If no set bits in range return empty bitset + if (len <= fromIndex || fromIndex == toIndex) + return new BitSetRecyclable(0); + + // An optimization + if (toIndex > len) + toIndex = len; + + BitSetRecyclable result = new BitSetRecyclable(toIndex - fromIndex); + int targetWords = wordIndex(toIndex - fromIndex - 1) + 1; + int sourceIndex = wordIndex(fromIndex); + boolean wordAligned = ((fromIndex & BIT_INDEX_MASK) == 0); + + // Process all words but the last word + for (int i = 0; i < targetWords - 1; i++, sourceIndex++) + result.words[i] = wordAligned ? words[sourceIndex] : + (words[sourceIndex] >>> fromIndex) | + (words[sourceIndex+1] << -fromIndex); + + // Process the last word + long lastWordMask = WORD_MASK >>> -toIndex; + result.words[targetWords - 1] = + ((toIndex-1) & BIT_INDEX_MASK) < (fromIndex & BIT_INDEX_MASK) + ? /* straddles source words */ + ((words[sourceIndex] >>> fromIndex) | + (words[sourceIndex+1] & lastWordMask) << -fromIndex) + : + ((words[sourceIndex] & lastWordMask) >>> fromIndex); + + // Set wordsInUse correctly + result.wordsInUse = targetWords; + result.recalculateWordsInUse(); + result.checkInvariants(); + + return result; + } + + /** + * Returns the index of the first bit that is set to {@code true} + * that occurs on or after the specified starting index. If no such + * bit exists then {@code -1} is returned. + * + *

To iterate over the {@code true} bits in a {@code BitSet}, + * use the following loop: + * + *

 {@code
+     * for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
+     *     // operate on index i here
+     *     if (i == Integer.MAX_VALUE) {
+     *         break; // or (i+1) would overflow
+     *     }
+     * }}
+ * + * @param fromIndex the index to start checking from (inclusive) + * @return the index of the next set bit, or {@code -1} if there + * is no such bit + * @throws IndexOutOfBoundsException if the specified index is negative + * @since 1.4 + */ + public int nextSetBit(int fromIndex) { + if (fromIndex < 0) + throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex); + + checkInvariants(); + + int u = wordIndex(fromIndex); + if (u >= wordsInUse) + return -1; + + long word = words[u] & (WORD_MASK << fromIndex); + + while (true) { + if (word != 0) + return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word); + if (++u == wordsInUse) + return -1; + word = words[u]; + } + } + + /** + * Returns the index of the first bit that is set to {@code false} + * that occurs on or after the specified starting index. + * + * @param fromIndex the index to start checking from (inclusive) + * @return the index of the next clear bit + * @throws IndexOutOfBoundsException if the specified index is negative + * @since 1.4 + */ + public int nextClearBit(int fromIndex) { + // Neither spec nor implementation handle bitsets of maximal length. + // See 4816253. + if (fromIndex < 0) + throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex); + + checkInvariants(); + + int u = wordIndex(fromIndex); + if (u >= wordsInUse) + return fromIndex; + + long word = ~words[u] & (WORD_MASK << fromIndex); + + while (true) { + if (word != 0) + return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word); + if (++u == wordsInUse) + return wordsInUse * BITS_PER_WORD; + word = ~words[u]; + } + } + + /** + * Returns the index of the nearest bit that is set to {@code true} + * that occurs on or before the specified starting index. + * If no such bit exists, or if {@code -1} is given as the + * starting index, then {@code -1} is returned. + * + *

To iterate over the {@code true} bits in a {@code BitSet}, + * use the following loop: + * + *

 {@code
+     * for (int i = bs.length(); (i = bs.previousSetBit(i-1)) >= 0; ) {
+     *     // operate on index i here
+     * }}
+ * + * @param fromIndex the index to start checking from (inclusive) + * @return the index of the previous set bit, or {@code -1} if there + * is no such bit + * @throws IndexOutOfBoundsException if the specified index is less + * than {@code -1} + * @since 1.7 + */ + public int previousSetBit(int fromIndex) { + if (fromIndex < 0) { + if (fromIndex == -1) + return -1; + throw new IndexOutOfBoundsException( + "fromIndex < -1: " + fromIndex); + } + + checkInvariants(); + + int u = wordIndex(fromIndex); + if (u >= wordsInUse) + return length() - 1; + + long word = words[u] & (WORD_MASK >>> -(fromIndex+1)); + + while (true) { + if (word != 0) + return (u+1) * BITS_PER_WORD - 1 - Long.numberOfLeadingZeros(word); + if (u-- == 0) + return -1; + word = words[u]; + } + } + + /** + * Returns the index of the nearest bit that is set to {@code false} + * that occurs on or before the specified starting index. + * If no such bit exists, or if {@code -1} is given as the + * starting index, then {@code -1} is returned. + * + * @param fromIndex the index to start checking from (inclusive) + * @return the index of the previous clear bit, or {@code -1} if there + * is no such bit + * @throws IndexOutOfBoundsException if the specified index is less + * than {@code -1} + * @since 1.7 + */ + public int previousClearBit(int fromIndex) { + if (fromIndex < 0) { + if (fromIndex == -1) + return -1; + throw new IndexOutOfBoundsException( + "fromIndex < -1: " + fromIndex); + } + + checkInvariants(); + + int u = wordIndex(fromIndex); + if (u >= wordsInUse) + return fromIndex; + + long word = ~words[u] & (WORD_MASK >>> -(fromIndex+1)); + + while (true) { + if (word != 0) + return (u+1) * BITS_PER_WORD -1 - Long.numberOfLeadingZeros(word); + if (u-- == 0) + return -1; + word = ~words[u]; + } + } + + /** + * Returns the "logical size" of this {@code BitSet}: the index of + * the highest set bit in the {@code BitSet} plus one. Returns zero + * if the {@code BitSet} contains no set bits. + * + * @return the logical size of this {@code BitSet} + * @since 1.2 + */ + public int length() { + if (wordsInUse == 0) + return 0; + + return BITS_PER_WORD * (wordsInUse - 1) + + (BITS_PER_WORD - Long.numberOfLeadingZeros(words[wordsInUse - 1])); + } + + /** + * Returns true if this {@code BitSet} contains no bits that are set + * to {@code true}. + * + * @return boolean indicating whether this {@code BitSet} is empty + * @since 1.4 + */ + public boolean isEmpty() { + return wordsInUse == 0; + } + + /** + * Returns true if the specified {@code BitSet} has any bits set to + * {@code true} that are also set to {@code true} in this {@code BitSet}. + * + * @param set {@code BitSet} to intersect with + * @return boolean indicating whether this {@code BitSet} intersects + * the specified {@code BitSet} + * @since 1.4 + */ + public boolean intersects(BitSetRecyclable set) { + for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--) + if ((words[i] & set.words[i]) != 0) + return true; + return false; + } + + /** + * Returns the number of bits set to {@code true} in this {@code BitSet}. + * + * @return the number of bits set to {@code true} in this {@code BitSet} + * @since 1.4 + */ + public int cardinality() { + int sum = 0; + for (int i = 0; i < wordsInUse; i++) + sum += Long.bitCount(words[i]); + return sum; + } + + /** + * Performs a logical AND of this target bit set with the + * argument bit set. This bit set is modified so that each bit in it + * has the value {@code true} if and only if it both initially + * had the value {@code true} and the corresponding bit in the + * bit set argument also had the value {@code true}. + * + * @param set a bit set + */ + public void and(BitSetRecyclable set) { + if (this == set) + return; + + while (wordsInUse > set.wordsInUse) + words[--wordsInUse] = 0; + + // Perform logical AND on words in common + for (int i = 0; i < wordsInUse; i++) + words[i] &= set.words[i]; + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Performs a logical OR of this bit set with the bit set + * argument. This bit set is modified so that a bit in it has the + * value {@code true} if and only if it either already had the + * value {@code true} or the corresponding bit in the bit set + * argument has the value {@code true}. + * + * @param set a bit set + */ + public void or(BitSetRecyclable set) { + if (this == set) + return; + + int wordsInCommon = Math.min(wordsInUse, set.wordsInUse); + + if (wordsInUse < set.wordsInUse) { + ensureCapacity(set.wordsInUse); + wordsInUse = set.wordsInUse; + } + + // Perform logical OR on words in common + for (int i = 0; i < wordsInCommon; i++) + words[i] |= set.words[i]; + + // Copy any remaining words + if (wordsInCommon < set.wordsInUse) + System.arraycopy(set.words, wordsInCommon, + words, wordsInCommon, + wordsInUse - wordsInCommon); + + // recalculateWordsInUse() is unnecessary + checkInvariants(); + } + + /** + * Performs a logical XOR of this bit set with the bit set + * argument. This bit set is modified so that a bit in it has the + * value {@code true} if and only if one of the following + * statements holds: + *
    + *
  • The bit initially has the value {@code true}, and the + * corresponding bit in the argument has the value {@code false}. + *
  • The bit initially has the value {@code false}, and the + * corresponding bit in the argument has the value {@code true}. + *
+ * + * @param set a bit set + */ + public void xor(BitSetRecyclable set) { + int wordsInCommon = Math.min(wordsInUse, set.wordsInUse); + + if (wordsInUse < set.wordsInUse) { + ensureCapacity(set.wordsInUse); + wordsInUse = set.wordsInUse; + } + + // Perform logical XOR on words in common + for (int i = 0; i < wordsInCommon; i++) + words[i] ^= set.words[i]; + + // Copy any remaining words + if (wordsInCommon < set.wordsInUse) + System.arraycopy(set.words, wordsInCommon, + words, wordsInCommon, + set.wordsInUse - wordsInCommon); + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Clears all of the bits in this {@code BitSet} whose corresponding + * bit is set in the specified {@code BitSet}. + * + * @param set the {@code BitSet} with which to mask this + * {@code BitSet} + * @since 1.2 + */ + public void andNot(BitSetRecyclable set) { + // Perform logical (a & !b) on words in common + for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--) + words[i] &= ~set.words[i]; + + recalculateWordsInUse(); + checkInvariants(); + } + + /** + * Returns the hash code value for this bit set. The hash code depends + * only on which bits are set within this {@code BitSet}. + * + *

The hash code is defined to be the result of the following + * calculation: + *

 {@code
+     * public int hashCode() {
+     *     long h = 1234;
+     *     long[] words = toLongArray();
+     *     for (int i = words.length; --i >= 0; )
+     *         h ^= words[i] * (i + 1);
+     *     return (int)((h >> 32) ^ h);
+     * }}
+ * Note that the hash code changes if the set of bits is altered. + * + * @return the hash code value for this bit set + */ + public int hashCode() { + long h = 1234; + for (int i = wordsInUse; --i >= 0; ) + h ^= words[i] * (i + 1); + + return (int)((h >> 32) ^ h); + } + + /** + * Returns the number of bits of space actually in use by this + * {@code BitSet} to represent bit values. + * The maximum element in the set is the size - 1st element. + * + * @return the number of bits currently in this bit set + */ + public int size() { + return words.length * BITS_PER_WORD; + } + + /** + * Compares this object against the specified object. + * The result is {@code true} if and only if the argument is + * not {@code null} and is a {@code Bitset} object that has + * exactly the same set of bits set to {@code true} as this bit + * set. That is, for every nonnegative {@code int} index {@code k}, + *
((BitSet)obj).get(k) == this.get(k)
+ * must be true. The current sizes of the two bit sets are not compared. + * + * @param obj the object to compare with + * @return {@code true} if the objects are the same; + * {@code false} otherwise + * @see #size() + */ + public boolean equals(Object obj) { + if (!(obj instanceof BitSetRecyclable)) + return false; + if (this == obj) + return true; + + BitSetRecyclable set = (BitSetRecyclable) obj; + + checkInvariants(); + set.checkInvariants(); + + if (wordsInUse != set.wordsInUse) + return false; + + // Check words in use by both BitSets + for (int i = 0; i < wordsInUse; i++) + if (words[i] != set.words[i]) + return false; + + return true; + } + + /** + * Cloning this {@code BitSet} produces a new {@code BitSet} + * that is equal to it. + * The clone of the bit set is another bit set that has exactly the + * same bits set to {@code true} as this bit set. + * + * @return a clone of this bit set + * @see #size() + */ + public Object clone() { + if (! sizeIsSticky) + trimToSize(); + + try { + BitSetRecyclable result = (BitSetRecyclable) super.clone(); + result.words = words.clone(); + result.checkInvariants(); + return result; + } catch (CloneNotSupportedException e) { + throw new InternalError(e); + } + } + + /** + * Attempts to reduce internal storage used for the bits in this bit set. + * Calling this method may, but is not required to, affect the value + * returned by a subsequent call to the {@link #size()} method. + */ + private void trimToSize() { + if (wordsInUse != words.length) { + words = Arrays.copyOf(words, wordsInUse); + checkInvariants(); + } + } + + /** + * Save the state of the {@code BitSet} instance to a stream (i.e., + * serialize it). + */ + private void writeObject(ObjectOutputStream s) + throws IOException { + + checkInvariants(); + + if (! sizeIsSticky) + trimToSize(); + + ObjectOutputStream.PutField fields = s.putFields(); + fields.put("bits", words); + s.writeFields(); + } + + /** + * Reconstitute the {@code BitSet} instance from a stream (i.e., + * deserialize it). + */ + private void readObject(ObjectInputStream s) + throws IOException, ClassNotFoundException { + + ObjectInputStream.GetField fields = s.readFields(); + words = (long[]) fields.get("bits", null); + + // Assume maximum length then find real length + // because recalculateWordsInUse assumes maintenance + // or reduction in logical size + wordsInUse = words.length; + recalculateWordsInUse(); + sizeIsSticky = (words.length > 0 && words[words.length-1] == 0L); // heuristic + checkInvariants(); + } + + /** + * Returns a string representation of this bit set. For every index + * for which this {@code BitSet} contains a bit in the set + * state, the decimal representation of that index is included in + * the result. Such indices are listed in order from lowest to + * highest, separated by ", " (a comma and a space) and + * surrounded by braces, resulting in the usual mathematical + * notation for a set of integers. + * + *

Example: + *

+     * BitSet drPepper = new BitSet();
+ * Now {@code drPepper.toString()} returns "{@code {}}". + *
+     * drPepper.set(2);
+ * Now {@code drPepper.toString()} returns "{@code {2}}". + *
+     * drPepper.set(4);
+     * drPepper.set(10);
+ * Now {@code drPepper.toString()} returns "{@code {2, 4, 10}}". + * + * @return a string representation of this bit set + */ + public String toString() { + checkInvariants(); + + int numBits = (wordsInUse > 128) ? + cardinality() : wordsInUse * BITS_PER_WORD; + StringBuilder b = new StringBuilder(6*numBits + 2); + b.append('{'); + + int i = nextSetBit(0); + if (i != -1) { + b.append(i); + while (true) { + if (++i < 0) break; + if ((i = nextSetBit(i)) < 0) break; + int endOfRun = nextClearBit(i); + do { b.append(", ").append(i); } + while (++i != endOfRun); + } + } + + b.append('}'); + return b.toString(); + } + + public BitSetRecyclable resetWords(long[] words) { + int n; + for (n = words.length; n > 0 && words[n - 1] == 0; n--) + ; + long[] longs = Arrays.copyOf(words, n); + this.words = longs; + this.wordsInUse = longs.length; + checkInvariants(); + return this; + } + + private Handle recyclerHandle = null; + + private static final Recycler RECYCLER = new Recycler() { + protected BitSetRecyclable newObject(Recycler.Handle recyclerHandle) { + return new BitSetRecyclable(recyclerHandle); + } + }; + + private BitSetRecyclable(Handle recyclerHandle) { + this(); + this.recyclerHandle = recyclerHandle; + } + + public static BitSetRecyclable create() { + return RECYCLER.get(); + } + + public void recycle() { + if (recyclerHandle != null) { + this.clear(); + recyclerHandle.recycle(this); + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java index 8f80d03a60eb4..24decbce6e309 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java @@ -29,6 +29,10 @@ public class ConcurrentBitSet extends BitSet { private static final long serialVersionUID = 1L; private final StampedLock rwLock = new StampedLock(); + public ConcurrentBitSet() { + super(); + } + /** * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range * {@code 0} through {@code nbits-1}. All bits are initially {@code false}. diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java new file mode 100644 index 0000000000000..8e787c1ea6ebb --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; + +/** + * Safe multithreaded version of {@code BitSet} and leverage netty recycler. + */ +public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { + + private final Handle recyclerHandle; + + private static final Recycler RECYCLER = new Recycler() { + protected ConcurrentBitSetRecyclable newObject(Handle recyclerHandle) { + return new ConcurrentBitSetRecyclable(recyclerHandle); + } + }; + + private ConcurrentBitSetRecyclable(Handle recyclerHandle) { + super(); + this.recyclerHandle = recyclerHandle; + } + + public static ConcurrentBitSetRecyclable create() { + return RECYCLER.get(); + } + + public void recycle() { + this.clear(); + recyclerHandle.recycle(this); + } +} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b4b7bd97f3f60..a0aa8a73476e9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -54,6 +54,7 @@ message MessageIdData { required uint64 entryId = 2; optional int32 partition = 3 [default = -1]; optional int32 batch_index = 4 [default = -1]; + repeated int64 ack_set = 5; } message KeyValue { @@ -458,6 +459,7 @@ message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; optional uint32 redelivery_count = 3 [default = 0]; + repeated int64 ack_set = 4; } message CommandAck { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java new file mode 100644 index 0000000000000..0f42f35608a76 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class BitSetRecyclableRecyclableTest { + + @Test + public void testRecycle() { + BitSetRecyclable bitset1 = BitSetRecyclable.create(); + bitset1.set(3); + bitset1.recycle(); + BitSetRecyclable bitset2 = BitSetRecyclable.create(); + BitSetRecyclable bitset3 = BitSetRecyclable.create(); + Assert.assertSame(bitset2, bitset1); + Assert.assertFalse(bitset2.get(3)); + Assert.assertNotSame(bitset3, bitset1); + } + + @Test + public void testResetWords() { + BitSetRecyclable bitset1 = BitSetRecyclable.create(); + BitSetRecyclable bitset2 = BitSetRecyclable.create(); + bitset1.set(256); + bitset2.set(128); + bitset1.resetWords(bitset2.toLongArray()); + Assert.assertTrue(bitset1.get(128)); + Assert.assertFalse(bitset1.get(256)); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java new file mode 100644 index 0000000000000..b037c705fae1e --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ConcurrentBitSetRecyclableTest { + + @Test + public void testRecycle() { + ConcurrentBitSetRecyclable bitset1 = ConcurrentBitSetRecyclable.create(); + bitset1.set(3); + bitset1.recycle(); + ConcurrentBitSetRecyclable bitset2 = ConcurrentBitSetRecyclable.create(); + ConcurrentBitSetRecyclable bitset3 = ConcurrentBitSetRecyclable.create(); + Assert.assertSame(bitset2, bitset1); + Assert.assertFalse(bitset2.get(3)); + Assert.assertNotSame(bitset3, bitset1); + } +}