Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Make operations on individualDeletedMessages in lock scope #22966

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;

/**
* Configuration class for a ManagedLedger.
Expand Down Expand Up @@ -282,7 +282,7 @@ public ManagedLedgerConfig setPassword(String password) {
}

/**
* should use {@link ConcurrentOpenLongPairRangeSet} to store unacked ranges.
* should use {@link OpenLongPairRangeSet} to store unacked ranges.
* @return
*/
public boolean isUnackedRangesOpenCacheSetEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,12 @@ public Map<String, Long> getProperties() {

@Override
public boolean isCursorDataFullyPersistable() {
return individualDeletedMessages.size() <= getConfig().getMaxUnackedRangesToPersist();
lock.readLock().lock();
try {
return individualDeletedMessages.size() <= getConfig().getMaxUnackedRangesToPersist();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -1099,7 +1104,12 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() {

@Override
public int getTotalNonContiguousDeletedMessagesRange() {
return individualDeletedMessages.size();
lock.readLock().lock();
try {
return individualDeletedMessages.size();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -2383,8 +2393,9 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
boolean empty = individualDeletedMessages.isEmpty();
lock.writeLock().unlock();
if (individualDeletedMessages.isEmpty()) {
if (empty) {
callback.deleteComplete(ctx);
}
}
Expand Down Expand Up @@ -2661,10 +2672,15 @@ public void operationFailed(MetaStoreException e) {
}

private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& getConfig().getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore();
lock.readLock().lock();
try {
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& getConfig().getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > getConfig().getMaxUnackedRangesToPersistInMetadataStore();
} finally {
lock.readLock().unlock();
}
}

private void persistPositionMetaStore(long cursorsLedgerId, Position position, Map<String, Long> properties,
Expand Down Expand Up @@ -3451,8 +3467,13 @@ public LongPairRangeSet<Position> getIndividuallyDeletedMessagesSet() {
}

public boolean isMessageDeleted(Position position) {
return position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
lock.readLock().lock();
try {
return position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
} finally {
lock.readLock().unlock();
}
}

//this method will return a copy of the position's ack set
Expand All @@ -3477,13 +3498,19 @@ public long[] getBatchPositionAckSet(Position position) {
* @return next available position
*/
public Position getNextAvailablePosition(Position position) {
Range<Position> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
Position nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0) ? nextPosition : position.getNext();
lock.readLock().lock();
try {
Range<Position> range = individualDeletedMessages.rangeContaining(position.getLedgerId(),
position.getEntryId());
if (range != null) {
Position nextPosition = range.upperEndpoint().getNext();
return (nextPosition != null && nextPosition.compareTo(position) > 0)
? nextPosition : position.getNext();
}
return position.getNext();
} finally {
lock.readLock().unlock();
}
return position.getNext();
}

public Position getNextLedgerPosition(long currentLedgerId) {
Expand Down Expand Up @@ -3534,7 +3561,12 @@ public ManagedLedger getManagedLedger() {

@Override
public Range<Position> getLastIndividualDeletedRange() {
return individualDeletedMessages.lastRange();
lock.readLock().lock();
try {
return individualDeletedMessages.lastRange();
} finally {
lock.readLock().unlock();
}
}

@Override
Expand Down Expand Up @@ -3664,15 +3696,20 @@ public ManagedLedgerConfig getConfig() {
public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
lock.readLock().lock();
try {
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
}
} finally {
lock.readLock().unlock();
}
if (batchDeletedIndexes != null) {
for (Map.Entry<Position, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Collection;
import java.util.List;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;

/**
* Wraps other Range classes, and adds LRU, marking dirty data and other features on this basis.
Expand Down Expand Up @@ -55,7 +55,7 @@ public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
this.config = managedCursor.getManagedLedger().getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
? new OpenLongPairRangeSet<>(rangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
}
Expand Down Expand Up @@ -148,16 +148,16 @@ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upper

@VisibleForTesting
void add(Range<LongPair> range) {
if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) {
if (!(rangeSet instanceof OpenLongPairRangeSet)) {
throw new UnsupportedOperationException("Only ConcurrentOpenLongPairRangeSet support this method");
}
((ConcurrentOpenLongPairRangeSet<T>) rangeSet).add(range);
((OpenLongPairRangeSet<T>) rangeSet).add(range);
}

@VisibleForTesting
void remove(Range<T> range) {
if (rangeSet instanceof ConcurrentOpenLongPairRangeSet) {
((ConcurrentOpenLongPairRangeSet<T>) rangeSet).remove((Range<LongPair>) range);
if (rangeSet instanceof OpenLongPairRangeSet) {
((OpenLongPairRangeSet<T>) rangeSet).remove((Range<LongPair>) range);
} else {
((DefaultRangeSet<T>) rangeSet).remove(range);
}
Expand Down
Loading