diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index a237314331662..e4975d97cea0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -21,6 +21,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -28,6 +34,8 @@ import io.netty.buffer.Unpooled; import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -37,6 +45,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; @@ -73,6 +83,12 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; +import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -411,11 +427,11 @@ public void completed(Exception e, long ledgerId, long entryId) { } @Test - public void testMaxReadPositionForNormalPublish() throws Exception{ + public void testMaxReadPositionForNormalPublish() throws Exception { String topic = "persistent://" + NAMESPACE1 + "/NormalPublish"; admin.topics().createNonPartitionedTopic(topic); PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() - .getTopic(topic, false).get().get(); + .getTopic(topic, false).get().get(); TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false) @@ -443,7 +459,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{ .sendTimeout(0, TimeUnit.SECONDS) .create(); - Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady())); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady())); //test publishing txn messages will not change maxReadPosition if don`t commit or abort. Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); @@ -483,5 +499,62 @@ public void testMaxReadPositionForNormalPublish() throws Exception{ Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId()); Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId()); - } + } + + @Test + public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ + String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable"; + admin.topics().createNonPartitionedTopic(topic); + + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(topic, false).get().get(); + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); + Map map = new HashMap<>(); + map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1"); + persistentTopic.getManagedLedger().setProperties(map); + + ManagedCursor managedCursor = mock(ManagedCursor.class); + doReturn(true).when(managedCursor).hasMoreEntries(); + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), + null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + MLTransactionLogImpl mlTransactionLog = + new MLTransactionLogImpl(new TransactionCoordinatorID(1), null, + persistentTopic.getManagedLedger().getConfig()); + Class mlTransactionLogClass = MLTransactionLogImpl.class; + Field field = mlTransactionLogClass.getDeclaredField("cursor"); + field.setAccessible(true); + field.set(mlTransactionLog, managedCursor); + field = mlTransactionLogClass.getDeclaredField("managedLedger"); + field.setAccessible(true); + field.set(mlTransactionLog, persistentTopic.getManagedLedger()); + + TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class); + doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker(); + doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction(); + TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class); + doNothing().when(timeoutTracker).start(); + MLTransactionMetadataStore metadataStore1 = + new MLTransactionMetadataStore(new TransactionCoordinatorID(1), + mlTransactionLog, timeoutTracker, transactionRecoverTracker); + + Awaitility.await().untilAsserted(() -> + assertEquals(metadataStore1.getCoordinatorStats().state, "Ready")); + + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + MLTransactionMetadataStore metadataStore2 = + new MLTransactionMetadataStore(new TransactionCoordinatorID(1), + mlTransactionLog, timeoutTracker, transactionRecoverTracker); + Awaitility.await().untilAsserted(() -> + assertEquals(metadataStore2.getCoordinatorStats().state, "Ready")); + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index c0442753e7a7b..2d11d984b8655 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -281,18 +281,19 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { private final AtomicLong outstandingReadsRequests = new AtomicLong(0); + private boolean isReadable = true; boolean fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); readAsync(100, this); - return true; + return isReadable; } else { return false; } } else { - return true; + return isReadable; } } @@ -313,6 +314,11 @@ public Entry get() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + if (managedLedgerConfig.isAutoSkipNonRecoverableData() + && exception instanceof ManagedLedgerException.NonRecoverableLedgerException + || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { + isReadable = false; + } log.error("Transaction log init fail error!", exception); outstandingReadsRequests.decrementAndGet(); }