Skip to content

Commit

Permalink
[Transaction]stop TP replaying with Exception (apache#12700)
Browse files Browse the repository at this point in the history
### Motivation
When MLPendingAckStore replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLPendingAckStore will not stop recovering and continue to report the exception.

### Modifications
End replaying when there is no ledger to read or the managerLedger is fenced.

### Verifying this change
Add a unit test.
  • Loading branch information
liangyepianzhou authored and fangxiaobing committed Dec 19, 2021
1 parent 585d8a4 commit c1ac814
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,10 @@ Set<? extends Position> asyncReplayEntries(
* @return if read position changed
*/
boolean checkAndUpdateReadPositionChanged();

/**
* Checks if the cursor is closed.
* @return whether this cursor is closed.
*/
public boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob
}
}

@Override
public boolean isClosed() {
return state == State.Closed || state == State.Closing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
public boolean checkAndUpdateReadPositionChanged() {
return false;
}

@Override
public boolean isClosed() {
return false;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
Expand Down Expand Up @@ -303,13 +302,12 @@ class PendingAckReplay implements Runnable {
@Override
public void run() {
try {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
if (((ManagedCursorImpl) cursor).isClosed()) {
log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
cursor.getManagedLedger().getName());
return;
}
fillEntryQueueCallback.fillQueue();
if (cursor.isClosed()) {
log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
cursor.getManagedLedger().getName());
return;
}
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
ByteBuf buffer = entry.getDataBuffer();
Expand Down Expand Up @@ -361,15 +359,17 @@ public void run() {

class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private volatile boolean isReadable = true;
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);

void fillQueue() {
boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
readAsync(100, this);
}
}
return isReadable;
}

@Override
Expand All @@ -389,7 +389,12 @@ public Entry get() {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("MLPendingAckStore stat reply fail!", exception);
if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof ManagedLedgerException.NonRecoverableLedgerException
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
isReadable = false;
}
log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception);
outstandingReadsRequests.decrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand All @@ -62,8 +63,10 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -78,6 +81,7 @@
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -551,6 +555,63 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
assertEquals(buffer2.getStats().state, "Ready"));
}

@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("test")
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
producer.newMessage().send();

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic
.createSubscription("test",
CommandSubscribe.InitialPosition.Earliest, false).get();

ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class);
doReturn(true).when(managedCursor).hasMoreEntries();
doReturn(false).when(managedCursor).isClosed();
doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition();
doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

Class<PulsarService> pulsarServiceClass = PulsarService.class;
Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider");
field.setAccessible(true);
field.set(getPulsarServiceList().get(0), pendingAckStoreProvider);

PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle1.getStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription);
Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle2.getStats().state, "Ready"));
}

@Test
public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void start() {
class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
private boolean isReadable = true;
private volatile boolean isReadable = true;

boolean fillQueue() {
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
Expand Down

0 comments on commit c1ac814

Please sign in to comment.