Skip to content

Commit

Permalink
test: fix some related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyepianzhou committed Mar 12, 2024
1 parent 312bf5c commit 8446294
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2647,7 +2647,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
if (slowestReaderPosition != null) {
// The slowest reader position is the mark delete position.
// If the slowest reader position point the last entry in the ledger x,
// then the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
// the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
&& ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,16 @@ void readTwice() throws Exception {

@Test
void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception {
final int entryCount = 10;
final int entryCount = 9;
final String cursorName = "c1";
final String mlName = "ml_test";
final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
// Avoid creating new empty ledger after the last ledger is full and remove fail future.
final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig);

ManagedCursor cursor = ml.openCursor("c1");
Position lastEntry = null;
for (int i = 0; i < 10; i++) {
for (int i = 0; i < entryCount; i++) {
lastEntry = ml.addEntry(("entry-" + i).getBytes(Encoding));
}

Expand Down Expand Up @@ -809,7 +810,7 @@ void testResetCursor1() throws Exception {
assertEquals(firstInNext, cursor.getReadPosition());
moveStatus.set(false);

// reset to a non exist larger ledger should point to the first non-exist entry in the last ledger
// reset to a non exist larger ledger should point to the first non-exist entry in the next ledger
PositionImpl latest = new PositionImpl(last.getLedgerId() + 2, 0);
try {
cursor.resetCursor(latest);
Expand All @@ -818,11 +819,13 @@ void testResetCursor1() throws Exception {
log.warn("error in reset cursor", e.getCause());
}
assertTrue(moveStatus.get());
PositionImpl lastPos = new PositionImpl(last.getLedgerId(), last.getEntryId() + 1);
assertEquals(lastPos, cursor.getReadPosition());
PositionImpl lastPos = new PositionImpl(last.getLedgerId() + 1, 0);
Awaitility.await().untilAsserted(() -> {
assertEquals(lastPos, cursor.getReadPosition());
});
moveStatus.set(false);

// reset to latest should point to the first non-exist entry in the last ledger
// reset to latest should point to the first non-exist entry in the next ledger
PositionImpl anotherLast = PositionImpl.LATEST;
try {
cursor.resetCursor(anotherLast);
Expand Down Expand Up @@ -1701,7 +1704,7 @@ void testMarkDeleteTwice(boolean useOpenRangeSet) throws Exception {

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void testSkipEntries(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet).setMaxEntriesPerLedger(2));
Position pos;

Expand All @@ -1715,14 +1718,19 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception {
pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding));

// Wait new empty ledger created completely.
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 2);
});

// skip entries in same ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 1);

// skip entries until end of ledger
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getReadPosition(), pos.getNext());
assertEquals(c1.getReadPosition(), new PositionImpl(ledger.currentLedger.getId(), 0));
assertEquals(c1.getMarkDeletedPosition(), pos);

// skip entries across ledgers
Expand All @@ -1737,7 +1745,10 @@ void testSkipEntries(boolean useOpenRangeSet) throws Exception {
c1.skipEntries(10, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertFalse(c1.hasMoreEntries());
assertEquals(c1.getReadPosition(), pos.getNext());
// We can not check the ledger id because a cursor leger can be created.
Awaitility.await().untilAsserted(() -> {
assertEquals(c1.getReadPosition().getEntryId(), 0);
});
assertEquals(c1.getMarkDeletedPosition(), pos);
}

Expand All @@ -1759,7 +1770,7 @@ void testSkipEntriesWithIndividualDeletedMessages(boolean useOpenRangeSet) throw

c1.skipEntries(3, IndividualDeletedEntries.Exclude);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getReadPosition(), pos5.getNext());
assertEquals(c1.getReadPosition(), new PositionImpl(pos5.getLedgerId() + 1, 0));
assertEquals(c1.getMarkDeletedPosition(), pos5);

pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testGetManagedLedgerInfoWithClose() throws Exception {

ManagedLedgerInfo info = factory.getManagedLedgerInfo("testGetManagedLedgerInfo");

assertEquals(info.ledgers.size(), 4);
assertEquals(info.ledgers.size(), 5);

assertEquals(info.ledgers.get(0).ledgerId, 3);
assertEquals(info.ledgers.get(1).ledgerId, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,9 +1122,13 @@ public void testTrimmer() throws Exception {

cursor.markDelete(lastPosition);

while (ledger.getNumberOfEntries() != 2) {
Thread.sleep(10);
}
Awaitility.await().untilAsserted(() -> {
// The number of entries in the ledger should not contain the entry in the mark delete position.
// last position is the position of entry-3.
// cursor.markDelete(lastPosition);
// only entry-4 is left in the ledger.
assertEquals(ledger.getNumberOfEntries(), 1);
});
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -2438,7 +2442,7 @@ public void testRetentionSize() throws Exception {

Awaitility.await().untilAsserted(() -> {
assertTrue(ml.getTotalSize() <= retentionSizeInMB * 1024 * 1024);
assertEquals(ml.getLedgersInfoAsList().size(), 5);
assertEquals(ml.getLedgersInfoAsList().size(), 6);
});
}

Expand Down Expand Up @@ -2696,9 +2700,17 @@ public void testGetNextValidPosition() throws Exception {

assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
assertEquals(ledger.getNextValidPosition(p1), p2);
assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId() + 1, 0));
});
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)),
PositionImpl.get(p3.getLedgerId() + 1, 0));
});
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)),
PositionImpl.get(p3.getLedgerId() + 1, 0));
});
}

/**
Expand Down Expand Up @@ -3037,19 +3049,22 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{
String content = "entry" + i; // 5 bytes
ledger.addEntry(content.getBytes());
}
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.currentLedgerSize, 0);
assertEquals(ledger.ledgers.size(), 1);
});
// Open Cursor also adds cursor into activeCursor-container
ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest);
ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest);

// Since getReadPosition returns the next position, we decrease the entryId by 1
PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition();
PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition();

Pair<PositionImpl, Long> latestPositionAndCounter = ledger.getLastPositionAndCounter();
Pair<PositionImpl, Long> earliestPositionAndCounter = ledger.getFirstPositionAndCounter();

assertEquals(latestPositionAndCounter.getLeft().getNext(), p1);
assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2);
// The read position is the valid next position of the last position instead of the next position.
assertEquals(ledger.getNextValidPosition(latestPositionAndCounter.getLeft()), latestCursor.getReadPosition());
assertEquals(ledger.getNextValidPosition(earliestPositionAndCounter.getLeft()), p2);

assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries);
assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false));
Expand Down Expand Up @@ -3472,7 +3487,8 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
ledger.addEntry(new byte[1024 * 1024]);
}

Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(),
msgNum / 2 + 1));
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());

Expand All @@ -3487,6 +3503,9 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(completableFuture);
completableFuture.get();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}
Expand Down Expand Up @@ -3652,8 +3671,12 @@ public void testInvalidateReadHandleWhenDeleteLedger() throws Exception {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
Awaitility.await().untilAsserted(() -> {
log.error("ledger.ledgerCache.size() : " + ledger.ledgerCache.size());
assertEquals(ledger.ledgerCache.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
});

cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Expand Down Expand Up @@ -3682,15 +3705,15 @@ public void testLockReleaseWhenTrimLedger() throws Exception {
}
List<Entry> entryList = cursor.readEntries(entries);
assertEquals(entryList.size(), entries);
assertEquals(ledger.ledgers.size(), entries);
assertEquals(ledger.ledgerCache.size(), entries - 1);
assertEquals(ledger.ledgers.size() - 1, entries);
assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
cursor.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
// Cleanup fails because ManagedLedgerNotFoundException is thrown
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), entries);
assertEquals(ledger.ledgerCache.size(), entries - 1);
assertEquals(ledger.ledgers.size() - 1, entries);
assertEquals(ledger.ledgerCache.size() - 1, entries - 1);
});
// The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
Awaitility.await().untilAsserted(() ->
Expand All @@ -3716,25 +3739,25 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
}
List<Entry> entryList = cursor.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 3);
cursor.clearBacklog();
cursor2.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});

// Verify the ReadHandle can be reopened.
ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest);
entryList = cursor3.readEntries(3);
assertEquals(entryList.size(), 3);
assertEquals(ledger.ledgerCache.size(), 2);
assertEquals(ledger.ledgerCache.size(), 3);
cursor3.clearBacklog();
ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.ledgers.size(), 3);
assertEquals(ledger.ledgers.size(), 4);
assertEquals(ledger.ledgerCache.size(), 0);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,12 +589,12 @@ void subscribeToEarliestPositionWithImmediateDeletion() throws Exception {

/* Position p1 = */ ledger.addEntry("entry-1".getBytes());
/* Position p2 = */ ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
/* Position p3 = */ ledger.addEntry("entry-3".getBytes());

Thread.sleep(300);
ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
assertEquals(c1.getReadPosition(), p3);
assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(5, -1));
assertEquals(c1.getReadPosition(), new PositionImpl(6, 0));
assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(6, -1));
}

@Test // (timeOut = 20000)
Expand Down Expand Up @@ -723,18 +723,19 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
CompletableFuture<Void> promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();

assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
// The mark delete position has moved to position 4:1, and the ledger 4 only has one entry,
// so the ledger 4 can be deleted. nonDurableCursor should has the same backlog with durable cursor.
assertEquals(nonDurableCursor.getNumberOfEntries(), 5);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 5);

c1.close();
ledger.deleteCursor(c1.getName());
promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();

assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);

ledger.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
TopicStats stats = getTopicStats(topic1);
// overall backlogSize should be zero because we only have readers
assertEquals(stats.getBacklogSize(), 0, "backlog size is [" + stats.getBacklogSize() + "]");
// non-durable mes should still
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
// All the full ledgers should be deleted.
assertEquals(nonDurableSubscriptionBacklog, 0,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
MessageIdImpl messageId = null;
try {
Expand All @@ -341,7 +341,7 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
assertEquals(internalStats.ledgers.size(), 1);

// check if it's the expected ledger id given MAX_ENTRIES_PER_LEDGER
assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId());
assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId() + 1);
});
// check reader can still read without error

Expand Down Expand Up @@ -501,6 +501,7 @@ private long getReadEntries(String topic1) {
@Test
public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientException, InterruptedException {
config.setPreciseTimeBasedBacklogQuotaCheck(false);
config.setManagedLedgerMaxEntriesPerLedger(6);
final String namespace = "prop/ns-quota";
assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
final int sizeLimitBytes = 15 * 1024 * 1024;
Expand Down Expand Up @@ -592,6 +593,7 @@ public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientE
assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);
expectedAge = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedAge, within(1L));
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
}
}

Expand Down
Loading

0 comments on commit 8446294

Please sign in to comment.