Skip to content

Commit

Permalink
[fix] [ml] Add entry fail due to race condition about add entry faile…
Browse files Browse the repository at this point in the history
…d/timeout and switch ledger (apache#22221)

(cherry picked from commit b798e7f)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jun 7, 2024
1 parent 8027d5c commit 1baa094
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -242,6 +243,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected volatile long lastAddEntryTimeMs = 0;
private long inactiveLedgerRollOverTimeMs = 0;

/** A signal that may trigger all the subsequent OpAddEntry of current ledger to be failed due to timeout. **/
protected volatile AtomicBoolean currentLedgerTimeoutTriggered;

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
private static final String MIGRATION_STATE_PROPERTY = "migrated";
Expand Down Expand Up @@ -534,6 +538,7 @@ public void operationFailed(MetaStoreException e) {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
Expand Down Expand Up @@ -776,7 +781,8 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
Expand All @@ -792,7 +798,8 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
}
Expand Down Expand Up @@ -844,6 +851,7 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {

// Write into lastLedger
addOperation.setLedger(currentLedger);
addOperation.setTimeoutTriggered(currentLedgerTimeoutTriggered);

++currentLedgerEntries;
currentLedgerSize += addOperation.data.readableBytes();
Expand Down Expand Up @@ -1587,6 +1595,7 @@ public void operationComplete(Void v, Stat stat) {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
Expand Down Expand Up @@ -1670,9 +1679,11 @@ void createNewOpAddEntryForNewLedger() {
if (existsOp != null) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data,
existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
} else {
// This scenario should not happen.
log.warn("[{}] An OpAddEntry's ledger is empty.", name);
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
Expand Down Expand Up @@ -4211,13 +4222,14 @@ private void checkAddTimeout() {
}
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
final long finalAddOpCount = opAddEntry.addOpCount;
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
if (isTimedOut) {
log.error("Failed to add entry for ledger {} in time-out {} sec",
(opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, finalAddOpCount);
log.warn("[{}] Failed to add entry {}:{} in time-out {} sec", this.name,
opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1,
opAddEntry.entryId, timeoutSec);
currentLedgerTimeoutTriggered.set(true);
opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
Expand All @@ -45,7 +47,7 @@
public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
private long entryId;
long entryId;
private int numberOfMessages;

@SuppressWarnings("unused")
Expand All @@ -68,6 +70,9 @@ public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, OpAddEntry.State.class, "state");
volatile State state;

@Setter
private AtomicBoolean timeoutTriggered;

enum State {
OPEN,
INITIATED,
Expand All @@ -76,17 +81,18 @@ enum State {
}

public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback,
Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
Object ctx, AtomicBoolean timeoutTriggered) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages,
AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
AddEntryCallback callback, Object ctx,
AtomicBoolean timeoutTriggered) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
Expand All @@ -95,7 +101,8 @@ public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data
}

private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data,
AddEntryCallback callback, Object ctx) {
AddEntryCallback callback, Object ctx,
AtomicBoolean timeoutTriggered) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
Expand All @@ -109,6 +116,7 @@ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, B
op.startTime = System.nanoTime();
op.state = State.OPEN;
op.payloadProcessorHandle = null;
op.timeoutTriggered = timeoutTriggered;
ml.mbean.addAddEntrySample(op.dataLength);
return op;
}
Expand Down Expand Up @@ -176,7 +184,9 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) {
log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(),
entryId);
OpAddEntry.this.recycle();
// Since there is a thread is coping this object, do not recycle this object to avoid other problems.
// For example: we recycled this object, other thread get a null "opAddEntry.{variable_name}".
// Recycling is not mandatory, JVM GC will collect it.
return;
}

Expand All @@ -200,7 +210,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
lh == null ? -1 : lh.getId(), entryId, dataLength, rc);
}

if (rc != BKException.Code.OK) {
if (rc != BKException.Code.OK || timeoutTriggered.get()) {
handleAddFailure(lh);
} else {
// Trigger addComplete callback in a thread hashed on the managed ledger name
Expand Down Expand Up @@ -307,13 +317,6 @@ private boolean checkAndCompleteOp(Object ctx) {
return false;
}

void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
if (checkAndCompleteOp(ctx)) {
this.close();
this.handleAddFailure(ledger);
}
}

/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
Expand All @@ -333,8 +336,11 @@ void handleAddFailure(final LedgerHandle lh) {
});
}

void close() {
OpAddEntry duplicateAndClose(AtomicBoolean timeoutTriggered) {
STATE_UPDATER.set(OpAddEntry.this, State.CLOSED);
OpAddEntry duplicate =
OpAddEntry.createNoRetainBuffer(ml, data, getNumberOfMessages(), callback, ctx, timeoutTriggered);
return duplicate;
}

public State getState() {
Expand Down Expand Up @@ -389,6 +395,7 @@ public void recycle() {
startTime = -1;
lastInitTime = -1;
payloadProcessorHandle = null;
timeoutTriggered = null;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
Expand Down Expand Up @@ -54,6 +55,8 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book
String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
// ShadowManagedLedgerImpl does not implement add entry timeout yet, so this variable will always be false.
this.currentLedgerTimeoutTriggered = new AtomicBoolean(false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -3184,6 +3185,55 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
public void testAddEntryResponseTimeout() throws Exception {
// Create ML with feature Add Entry Timeout Check.
final ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(2);
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("ml1", config);
final ManagedCursor cursor = ledger.openCursor("c1");
final CollectCtxAddEntryCallback collectCtxAddEntryCallback = new CollectCtxAddEntryCallback();

// Insert a response delay.
bkc.addEntryResponseDelay(8, TimeUnit.SECONDS);

// Add two entries.
final byte[] msg1 = new byte[]{1};
final byte[] msg2 = new byte[]{2};
int ctx1 = 1;
int ctx2 = 2;
ledger.asyncAddEntry(msg1, collectCtxAddEntryCallback, ctx1);
ledger.asyncAddEntry(msg2, collectCtxAddEntryCallback, ctx2);
// Verify all write requests are completed.
Awaitility.await().untilAsserted(() -> {
assertEquals(collectCtxAddEntryCallback.addCompleteCtxList, Arrays.asList(1, 2));
});
Entry entry1 = cursor.readEntries(1).get(0);
assertEquals(entry1.getData(), msg1);
entry1.release();
Entry entry2 = cursor.readEntries(1).get(0);
assertEquals(entry2.getData(), msg2);
entry2.release();

// cleanup.
factory.delete(ledger.name);
}

private static class CollectCtxAddEntryCallback implements AddEntryCallback {

public List<Object> addCompleteCtxList = new BlockingArrayQueue<>();
public List<Object> addFailedCtxList = new BlockingArrayQueue<>();

@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
addCompleteCtxList.add(ctx);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
addFailedCtxList.add(ctx);
}
}

/**
* It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enough
* to create new ledger and add entry successfully.
Expand Down Expand Up @@ -3259,7 +3309,8 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {

List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null, new AtomicBoolean());
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -89,6 +90,7 @@ public static Collection<BookieId> getMockEnsemble() {
}

final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
final Queue<Long> addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();

Expand Down Expand Up @@ -367,6 +369,11 @@ public synchronized void addEntryDelay(long delay, TimeUnit unit) {
addEntryDelaysMillis.add(unit.toMillis(delay));
}

public synchronized void addEntryResponseDelay(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "The delay time must not be negative.");
addEntryResponseDelaysMillis.add(unit.toMillis(delay));
}

static int getExceptionCode(Throwable t) {
if (t instanceof BKException) {
return ((BKException) t).getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx);
} else {
Long responseDelayMillis = bk.addEntryResponseDelaysMillis.poll();
if (responseDelayMillis != null) {
try {
Thread.sleep(responseDelayMillis);
} catch (InterruptedException e) {
}
}
cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx);
}
}, bk.executor);
Expand Down

0 comments on commit 1baa094

Please sign in to comment.