Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][ml] Filter out deleted entries before read entries from ledger. #21739

Merged
merged 19 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
Expand Down Expand Up @@ -949,6 +951,8 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
// Skip deleted entries.
skipCondition = skipCondition == null ? this::isMessageDeleted : skipCondition.or(this::isMessageDeleted);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition, skipCondition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4539,4 +4539,4 @@ public Position getTheSlowestNonDurationReadPosition() {
}
return theSlowestNonDurableReadPosition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -70,4 +71,9 @@ public MLDataFormats.ManagedLedgerInfo.LedgerInfo getCurrentLedgerInfo() {
public long getNumberOfEntries(Range<PositionImpl> range) {
return this.ledger.getNumberOfEntries(range);
}

@Override
public boolean isMessageDeleted(Position position) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -65,13 +66,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -766,7 +769,7 @@ void testResetCursor() throws Exception {
@Test(timeOut = 20000)
void testResetCursor1() throws Exception {
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
ManagedCursor cursor = ledger.openCursor("trc1");
PositionImpl actualEarliest = (PositionImpl) ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Expand Down Expand Up @@ -2286,7 +2289,7 @@ void testFindNewestMatchingEdgeCase1() throws Exception {

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
assertNull(c1.findNewestMatching(
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -2595,7 +2598,7 @@ public void findEntryComplete(Position position, Object ctx) {

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
Object ctx) {
result.exception = exception;
counter.countDown();
}
Expand All @@ -2621,7 +2624,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
}

void internalTestFindNewestMatchingAllEntries(final String name, final int entriesPerLedger,
final int expectedEntryId) throws Exception {
final int expectedEntryId) throws Exception {
final String ledgerAndCursorName = name;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
Expand Down Expand Up @@ -2715,7 +2718,7 @@ void testReplayEntries() throws Exception {
assertTrue((Arrays.equals(entries.get(0).getData(), "entry1".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry3".getBytes(Encoding)))
|| (Arrays.equals(entries.get(0).getData(), "entry3".getBytes(Encoding))
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
&& Arrays.equals(entries.get(1).getData(), "entry1".getBytes(Encoding))));
entries.forEach(Entry::release);

// 3. Fail on reading non-existing position
Expand Down Expand Up @@ -3142,7 +3145,7 @@ public void operationFailed(ManagedLedgerException exception) {

try {
bkc.openLedgerNoRecovery(ledgerId, DigestType.fromApiDigestType(mlConfig.getDigestType()),
mlConfig.getPassword());
mlConfig.getPassword());
fail("ledger should have deleted due to update-cursor failure");
} catch (BKException e) {
// ok
Expand Down Expand Up @@ -3761,17 +3764,17 @@ private void deleteBatchIndex(ManagedCursor cursor, Position position, int batch
pos.ackSet = bitSet.toLongArray();

cursor.asyncDelete(pos,
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}
new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();
pos.ackSet = null;
}
Expand Down Expand Up @@ -4484,5 +4487,202 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}


@Test
public void testReadEntriesWithSkipDeletedEntries() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testReadEntriesWithSkipDeletedEntries");
ledger = Mockito.spy(ledger);
List<Long> actualReadEntryIds = new ArrayList<>();
Mockito.doAnswer(inv -> {
long start = inv.getArgument(1);
long end = inv.getArgument(2);
for (long i = start; i <= end; i++) {
actualReadEntryIds.add(i);
}
return inv.callRealMethod();
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");

int entries = 20;
Position maxReadPosition = null;
Map<Integer, Position> map = new HashMap<>();
for (int i = 0; i < entries; i++) {
maxReadPosition = ledger.addEntry(new byte[1024]);
map.put(i, maxReadPosition);
}


Set<Position> deletedPositions = new HashSet<>();
deletedPositions.add(map.get(1));
deletedPositions.add(map.get(4));
deletedPositions.add(map.get(5));
deletedPositions.add(map.get(8));
deletedPositions.add(map.get(9));
deletedPositions.add(map.get(10));
deletedPositions.add(map.get(15));
deletedPositions.add(map.get(17));
deletedPositions.add(map.get(19));
cursor.delete(deletedPositions);

CompletableFuture<Void> f0 = new CompletableFuture<>();
List<Entry> readEntries = new ArrayList<>();
cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f0.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f0.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());

f0.get();

CompletableFuture<Void> f1 = new CompletableFuture<>();
cursor.asyncReadEntries(5, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f1.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f1.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());


f1.get();
CompletableFuture<Void> f2 = new CompletableFuture<>();
cursor.asyncReadEntries(100, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f2.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f2.completeExceptionally(exception);
}
}, null, PositionImpl.get(maxReadPosition.getLedgerId(), maxReadPosition.getEntryId()).getNext());

f2.get();

Position cursorReadPosition = cursor.getReadPosition();
Position expectReadPosition = maxReadPosition.getNext();
assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId()
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());

assertEquals(readEntries.size(), actualReadEntryIds.size());
assertEquals(entries - deletedPositions.size(), actualReadEntryIds.size());
for (Entry entry : readEntries) {
long entryId = entry.getEntryId();
assertTrue(actualReadEntryIds.contains(entryId));
}
}


@Test
public void testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("testReadEntriesWithSkipDeletedEntriesAndWithSkipConditions");
ledger = Mockito.spy(ledger);

List<Long> actualReadEntryIds = new ArrayList<>();
Mockito.doAnswer(inv -> {
long start = inv.getArgument(1);
long end = inv.getArgument(2);
for (long i = start; i <= end; i++) {
actualReadEntryIds.add(i);
}
return inv.callRealMethod();
})
.when(ledger)
.asyncReadEntry(Mockito.any(ReadHandle.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(), Mockito.any());
@Cleanup
ManagedCursor cursor = ledger.openCursor("c");

int entries = 20;
Position maxReadPosition0 = null;
Map<Integer, Position> map = new HashMap<>();
for (int i = 0; i < entries; i++) {
maxReadPosition0 = ledger.addEntry(new byte[1024]);
map.put(i, maxReadPosition0);
}

PositionImpl maxReadPosition =
PositionImpl.get(maxReadPosition0.getLedgerId(), maxReadPosition0.getEntryId()).getNext();

Set<Position> deletedPositions = new HashSet<>();
deletedPositions.add(map.get(1));
deletedPositions.add(map.get(3));
deletedPositions.add(map.get(5));
cursor.delete(deletedPositions);

Set<Long> skippedPositions = new HashSet<>();
skippedPositions.add(map.get(6).getEntryId());
skippedPositions.add(map.get(7).getEntryId());
skippedPositions.add(map.get(8).getEntryId());
skippedPositions.add(map.get(11).getEntryId());
skippedPositions.add(map.get(15).getEntryId());
skippedPositions.add(map.get(16).getEntryId());

Predicate<PositionImpl> skipCondition = position -> skippedPositions.contains(position.getEntryId());
List<Entry> readEntries = new ArrayList<>();

CompletableFuture<Void> f0 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkip(10, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f0.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f0.completeExceptionally(exception);
}
}, null, maxReadPosition, skipCondition);

f0.get();
CompletableFuture<Void> f1 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkip(100, -1L, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
readEntries.addAll(entries);
f1.complete(null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
f1.completeExceptionally(exception);
}
}, null, maxReadPosition, skipCondition);
f1.get();


assertEquals(actualReadEntryIds.size(), readEntries.size());
assertEquals(entries - deletedPositions.size() - skippedPositions.size(), actualReadEntryIds.size());
for (Entry entry : readEntries) {
long entryId = entry.getEntryId();
assertTrue(actualReadEntryIds.contains(entryId));
}

Position cursorReadPosition = cursor.getReadPosition();
Position expectReadPosition = maxReadPosition;
assertTrue(cursorReadPosition.getLedgerId() == expectReadPosition.getLedgerId()
&& cursorReadPosition.getEntryId() == expectReadPosition.getEntryId());
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Loading