Skip to content

Commit

Permalink
[fix][test] Flaky-test: testMessageExpiryWithTimestampNonRecoverableE…
Browse files Browse the repository at this point in the history
…xception and testIncorrectClientClock (apache#22489)

(cherry picked from commit d9a43dd)
(cherry picked from commit c590198)
  • Loading branch information
shibd authored and mukesh-ctds committed Apr 17, 2024
1 parent c2c261b commit 7170dd1
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -378,7 +377,7 @@ public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataIntercep
*
* @throws Exception
*/
@Test(groups = "flaky")
@Test
void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {

final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
Expand All @@ -397,11 +396,15 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
for (int i = 0; i < totalEntries; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i));
}
Awaitility.await().untilAsserted(() ->
assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened));

List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);

assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
// The `lastLedgerInfo` should be newly opened, and it does not contain any entries.
// Please refer to: https://github.com/apache/pulsar/pull/22034
assertEquals(lastLedgerInfo.getEntries(), 0);
assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);

// this will make sure that all entries should be deleted
Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
Expand All @@ -411,40 +414,39 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Position previousPos = previousMarkDelete;
retryStrategically(
(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos),
5, 100);
previousMarkDelete = c1.getMarkDeletedPosition();
}

PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition();
assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId());
assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId());
assertTrue(monitor.expireMessages(ttlSeconds));
Awaitility.await().untilAsserted(() -> {
PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition();
// The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo.
assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1);
assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1);
});

c1.close();
ledger.close();
factory.shutdown();

}

@Test(groups = "flaky")
@Test
public void testIncorrectClientClock() throws Exception {
final String ledgerAndCursorName = "testIncorrectClientClock";
int maxTTLSeconds = 1;
int entriesNum = 10;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(1);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
// set client clock to 10 days later
long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10);
for (int i = 0; i < 10; i++) {
for (int i = 0; i < entriesNum; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp));
}
assertEquals(ledger.getLedgersInfoAsList().size(), 10);
Awaitility.await().untilAsserted(() ->
assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened));
// The number of ledgers should be (entriesNum / MaxEntriesPerLedger) + 1
// Please refer to: https://github.com/apache/pulsar/pull/22034
assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1);
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
monitor.expireMessages(maxTTLSeconds);
Expand Down

0 comments on commit 7170dd1

Please sign in to comment.