Skip to content

Commit

Permalink
Handle concurrent open cursor requests (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
sboobna authored and merlimat committed Oct 6, 2016
1 parent adb461c commit f526016
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
Expand Down Expand Up @@ -70,6 +71,7 @@

import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
Expand Down Expand Up @@ -111,6 +113,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;

// This map is used for concurrent open cursor requests, where the 2nd request will attach a listener to the
// uninitialized cursor future from the 1st request
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;

final EntryCache entryCache;

/**
Expand Down Expand Up @@ -184,6 +190,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.mbean = new ManagedLedgerMBeanImpl(this);
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.updateCursorRateLimit = RateLimiter.create(1);

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
Expand Down Expand Up @@ -540,6 +547,15 @@ public synchronized void asyncOpenCursor(final String cursorName, final OpenCurs
return;
}

if (uninitializedCursors.containsKey(cursorName)) {
uninitializedCursors.get(cursorName).thenAccept(cursor -> {
callback.openCursorComplete(cursor, ctx);
}).exceptionally(ex -> {
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
return null;
});
return;
}
ManagedCursor cachedCursor = cursors.get(cursorName);
if (cachedCursor != null) {
if (log.isDebugEnabled()) {
Expand All @@ -554,21 +570,30 @@ public synchronized void asyncOpenCursor(final String cursorName, final OpenCurs
log.debug("[{}] Creating new cursor: {}", name, cursorName);
}
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
cursor.initialize(getLastPosition(), new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
cursor.setActive();
cursors.add(cursor);

// Update the ack position (ignoring entries that were written while the cursor was being created)
cursor.initializeCursorPosition(getLastPositionAndCounter());

synchronized (this) {
cursors.add(cursor);
uninitializedCursors.remove(cursorName).complete(cursor);
}
callback.openCursorComplete(cursor, ctx);
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to open cursor: {}", name, cursor);

synchronized (this) {
uninitializedCursors.remove(cursorName).completeExceptionally(exception);
}
callback.openCursorFailed(exception, ctx);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -2059,6 +2061,63 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
}

@Test
public void testConcurrentOpenCursor() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor");

final AtomicReference<ManagedCursor> cursor1 = new AtomicReference<>(null);
final AtomicReference<ManagedCursor> cursor2 = new AtomicReference<>(null);
final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch latch = new CountDownLatch(2);

cachedExecutor.execute(() -> {
try {
barrier.await();
} catch (Exception e) {
}
ledger.asyncOpenCursor("c1", new OpenCursorCallback() {

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}

@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursor1.set(cursor);
latch.countDown();
}
}, null);
});

cachedExecutor.execute(() -> {
try {
barrier.await();
} catch (Exception e) {
}
ledger.asyncOpenCursor("c1", new OpenCursorCallback() {

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}

@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursor2.set(cursor);
latch.countDown();
}
}, null);
});

latch.await();
assertNotNull(cursor1.get());
assertNotNull(cursor2.get());
assertEquals(cursor1.get(), cursor2.get());

ledger.close();
}

public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name").setSequenceId(0).build();
Expand Down

0 comments on commit f526016

Please sign in to comment.