Skip to content

Commit

Permalink
[Fix][Transaction] Fix transaction pendingAckStore asyncMarkDelete (#…
Browse files Browse the repository at this point in the history
…14974)

### Motivation
When we only use transaction to ack message, a message per transaction,
`cursor.asyncMarkDelete` will never be executed, because firstPosition and
 deletePosition both are init as metadataPositions.firstEntry().getKey();
 And if only the firstkey need to delete,  deletePosition also get metadataPositions.firstKey(); irstPosition == deletePosition, never call `cursor.asyncMarkDelete`
### Modification
init firstPosition and deletePosition as PositionImpl.EARLIEST

(cherry picked from commit 8a6ecd7)
  • Loading branch information
liangyepianzhou authored and codelipenghui committed Apr 19, 2022
1 parent 3594e42 commit ede1c6f
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
completableFuture.complete(null);

if (!metadataPositions.isEmpty()) {
PositionImpl firstPosition = metadataPositions.firstEntry().getKey();
PositionImpl deletePosition = metadataPositions.firstEntry().getKey();
PositionImpl deletePosition = null;
while (!metadataPositions.isEmpty()
&& metadataPositions.firstKey() != null
&& subManagedCursor.getPersistentMarkDeletedPosition() != null
Expand All @@ -252,7 +251,7 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
metadataPositions.remove(metadataPositions.firstKey());
}

if (firstPosition != deletePosition) {
if (deletePosition != null) {
PositionImpl finalDeletePosition = deletePosition;
cursor.asyncMarkDelete(deletePosition,
new AsyncCallbacks.MarkDeleteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -892,4 +893,77 @@ public void testAutoCreateSchemaForTransactionSnapshot() throws Exception {
pulsarServiceList.forEach((pulsarService ->
pulsarService.getConfiguration().setAllowAutoUpdateSchemaEnabled(true)));
}

@Test
public void testPendingAckMarkDeletePosition() throws Exception {
String topic = NAMESPACE1 + "/test1";

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer(Schema.BYTES)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
consumer.getSubscription();

PersistentSubscription persistentSubscription = (PersistentSubscription) getPulsarServiceList()
.get(0)
.getBrokerService()
.getTopic(topic, false)
.get()
.get()
.getSubscription("sub");

ManagedCursor subscriptionCursor = persistentSubscription.getCursor();

subscriptionCursor.getMarkDeletedPosition();
//pendingAck add message1 and commit mark, metadata add message1
//PersistentMarkDeletedPosition have not updated
producer.newMessage()
.value("test".getBytes(UTF_8))
.send();
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();

Message<byte[]> message1 = consumer.receive(10, TimeUnit.SECONDS);

consumer.acknowledgeAsync(message1.getMessageId(), transaction);
transaction.commit().get();
//PersistentMarkDeletedPosition of subscription have updated to message1,
//check whether delete markDeletedPosition of pendingAck after append entry to pendingAck
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();

producer.newMessage()
.value("test".getBytes(UTF_8))
.send();
Message<byte[]> message2 = consumer.receive(10, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message2.getMessageId(), transaction);

Awaitility.await().untilAsserted(() -> {
ManagedLedgerInternalStats managedLedgerInternalStats = admin
.transactions()
.getPendingAckInternalStats(topic, "sub", false)
.pendingAckLogStats
.managedLedgerInternalStats;
String [] markDeletePosition = managedLedgerInternalStats.cursors.get("__pending_ack_state")
.markDeletePosition.split(":");
String [] lastConfirmedEntry = managedLedgerInternalStats.lastConfirmedEntry.split(":");
Assert.assertEquals(markDeletePosition[0], lastConfirmedEntry[0]);
//don`t contain commit mark and unCommitted message2
Assert.assertEquals(Integer.parseInt(markDeletePosition[1]),
Integer.parseInt(lastConfirmedEntry[1]) - 2);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void cumulativePendingAckReplayTest() throws Exception {
// in order to check out the pending ack cursor is clear whether or not.
Awaitility.await()
.until(() -> ((PositionImpl) managedCursor.getMarkDeletedPosition())
.compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
.compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == 0);
}

@Test
Expand Down

0 comments on commit ede1c6f

Please sign in to comment.