Skip to content

Commit

Permalink
[improve][ml] Avoid repetitive nested lock for isMessageDeleted in Ma…
Browse files Browse the repository at this point in the history
…nagedCursorImpl (apache#23609)
  • Loading branch information
Denovo1998 authored Nov 19, 2024
1 parent 387a96d commit 895e968
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positi
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
lock.readLock().lock();
try {
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
positions.stream().filter(this::internalIsMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -2345,7 +2345,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
return;
}

if (isMessageDeleted(position)) {
if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
Expand Down Expand Up @@ -3543,13 +3543,19 @@ public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
return position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
return internalIsMessageDeleted(position);
} finally {
lock.readLock().unlock();
}
}

// When this method is called while the external has already acquired a write lock or a read lock,
// it avoids unnecessary lock nesting.
private boolean internalIsMessageDeleted(Position position) {
return position.compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
}

//this method will return a copy of the position's ack set
@Override
public long[] getBatchPositionAckSet(Position position) {
Expand Down

0 comments on commit 895e968

Please sign in to comment.