Skip to content

Commit

Permalink
[fix] Fix Reader can be stuck from transaction aborted messages. (apa…
Browse files Browse the repository at this point in the history
…che#22610)

(cherry picked from commit 7e88463)
(cherry picked from commit f516a85)
  • Loading branch information
dao-jun authored and nikhil-ctds committed May 15, 2024
1 parent ee7c61b commit d38cc54
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ public static CompletableFuture<Position> asyncGetLastValidPosition(final Manage
final Predicate<Entry> predicate,
final PositionImpl startPosition) {
CompletableFuture<Position> future = new CompletableFuture<>();
if (!ledger.isValidPosition(startPosition)) {
future.complete(startPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
}
internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
return future;
}

private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
final Predicate<Entry> predicate,
final PositionImpl position,
final CompletableFuture<Position> future) {
if (!ledger.isValidPosition(position)) {
future.complete(position);
return;
}
ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
Expand All @@ -60,12 +60,7 @@ public void readEntryComplete(Entry entry, Object ctx) {
return;
}
PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
if (!ledger.isValidPosition(previousPosition)) {
future.complete(previousPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate,
ledger.getPreviousPosition((PositionImpl) position), future);
}
internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3317,18 +3317,18 @@ public Position getLastPosition() {

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
PositionImpl maxReadPosition = getMaxReadPosition();
// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
// so return `maxRedPosition` directly.
if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
return CompletableFuture.completedFuture(maxReadPosition);
} else {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
return !Markers.isServerOnlyMarker(md);
}, maxReadPosition);
}
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1898,4 +1898,73 @@ public void testReadCommittedWithCompaction() throws Exception{
Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}


@Test
public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/testPersistentTopicGetLastDispatchablePositionWithTxn";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();

BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService();
PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get();


// send a normal message
String body = UUID.randomUUID().toString();
MessageIdImpl msgId = (MessageIdImpl) producer.send(body);

// send 3 txn messages
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();

// get last dispatchable position
PositionImpl lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get();
// the last dispatchable position should be the message id of the normal message
assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));

// abort the txn
txn.abort().get(5, TimeUnit.SECONDS);

// get last dispatchable position
lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get();
// the last dispatchable position should be the message id of the normal message
assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));


@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
Transaction txn1 = pulsarClient.newTransaction().build().get();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
List<Message<String>> messages = new ArrayList<>();
while (reader.hasMessageAvailable()) {
messages.add(reader.readNext());
}
assertEquals(messages.size(), 1);
assertEquals(messages.get(0).getValue(), body);

txn1.abort().get(5, TimeUnit.SECONDS);

@Cleanup
Reader<String> reader1 = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
List<Message<String>> messages1 = new ArrayList<>();
while (reader1.hasMessageAvailable()) {
messages1.add(reader1.readNext());
}
assertEquals(messages1.size(), 1);
assertEquals(messages1.get(0).getValue(), body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID);
assertGetLastMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
Expand All @@ -291,25 +291,37 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();

// |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
// expectedLastMessageID1 == 1:4

// |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();

// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
// expectedLastMessageID2 == 1:6
MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send();

// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID);
assertGetLastMessageId(consumer, expectedLastMessageID);

// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
assertGetLastMessageId(consumer, expectedLastMessageID1);

// 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
// 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID2);
assertGetLastMessageId(consumer, expectedLastMessageID1);

// Handle the case of the maxReadPosition < lastPosition, but it's an aborted transactional message.
Transaction txn3 = pulsarClient.newTransaction()
.build()
.get();
producer.newMessage(txn3).send();
assertGetLastMessageId(consumer, expectedLastMessageID1);
txn3.abort().get(5, TimeUnit.SECONDS);
assertGetLastMessageId(consumer, expectedLastMessageID1);
}

/**
Expand Down Expand Up @@ -368,7 +380,7 @@ private void triggerLedgerSwitch(String topicName) throws Exception{
});
}

private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception {
private void assertGetLastMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId());
assertEquals(expected.getLedgerId(), actual.getLedgerId());
Expand Down

0 comments on commit d38cc54

Please sign in to comment.