Skip to content

Commit

Permalink
[Transaction]stop TC replaying with exception (#12705)
Browse files Browse the repository at this point in the history
When MLTransactionLogImpl replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLTransactionLogImpl will not stop recovering and continue to report the exception.

End replaying when there is no ledger to read or the managerLedger is fenced.

(cherry picked from commit 06f1a91)
  • Loading branch information
liangyepianzhou authored and codelipenghui committed Dec 21, 2021
1 parent 3d7fe33 commit f3bdaec
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,6 +45,8 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
Expand Down Expand Up @@ -73,6 +83,12 @@
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -411,11 +427,11 @@ public void completed(Exception e, long ledgerId, long entryId) {
}

@Test
public void testMaxReadPositionForNormalPublish() throws Exception{
public void testMaxReadPositionForNormalPublish() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
.getTopic(topic, false).get().get();

TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
Expand Down Expand Up @@ -443,7 +459,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
//test publishing txn messages will not change maxReadPosition if don`t commit or abort.
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
Expand Down Expand Up @@ -483,5 +499,62 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());

}
}

@Test
public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
Map<String, String> map = new HashMap<>();
map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
persistentTopic.getManagedLedger().setProperties(map);

ManagedCursor managedCursor = mock(ManagedCursor.class);
doReturn(true).when(managedCursor).hasMoreEntries();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig());
Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
Field field = mlTransactionLogClass.getDeclaredField("cursor");
field.setAccessible(true);
field.set(mlTransactionLog, managedCursor);
field = mlTransactionLogClass.getDeclaredField("managedLedger");
field.setAccessible(true);
field.set(mlTransactionLog, persistentTopic.getManagedLedger());

TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class);
doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker();
doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker);

Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker);
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,19 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
private boolean isReadable = true;

boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
readAsync(100, this);
return true;
return isReadable;
} else {
return false;
}
} else {
return true;
return isReadable;
}
}

Expand All @@ -313,6 +314,11 @@ public Entry get() {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (managedLedgerConfig.isAutoSkipNonRecoverableData()
&& exception instanceof ManagedLedgerException.NonRecoverableLedgerException
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
isReadable = false;
}
log.error("Transaction log init fail error!", exception);
outstandingReadsRequests.decrementAndGet();
}
Expand Down

0 comments on commit f3bdaec

Please sign in to comment.