Skip to content

Commit

Permalink
Fix deadlock caused by KafkaTopicConsumerManager (streamnative#620)
Browse files Browse the repository at this point in the history
Fixes streamnative#618 

### Motivation

See streamnative#618 (comment) for the deadlock analysis.

### Modifications
- Use `ConcurrentHashMap` instead of `ConcurrentLongHashMap`. Though this bug may already be fixed in apache/pulsar#9787, the `ConcurrentHashMap` from Java standard library is more reliable. The possible performance enhancement brought by `ConcurrentLongHashMap` still needs to be proved.
- Use `AtomicBoolean` as `KafkaTopicConsumerManager`'s state instead of read-write lock to avoid `close()` method that tries to acquire write lock blocking.
- Run a single cursor expire task instead one task per channel, since streamnative#404 changed `consumerTopicManagers` to a static field, there's no reason to run a task for each connection.
  • Loading branch information
BewareMyPower committed Jul 25, 2021
1 parent 00e9a40 commit 793ab75
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import io.streamnative.pulsar.handlers.kop.utils.OffsetSearchPredicate;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -33,7 +35,6 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;

/**
* KafkaTopicConsumerManager manages a topic and its related offset cursor.
Expand All @@ -44,32 +45,27 @@ public class KafkaTopicConsumerManager implements Closeable {
private final PersistentTopic topic;
private final KafkaRequestHandler requestHandler;

// the lock for closed status change.
// once closed, should not add new cursor back, since consumers are cleared.
private final ReentrantReadWriteLock rwLock;
private boolean closed;
private final AtomicBoolean closed = new AtomicBoolean(false);

// key is the offset, value is the future of (cursor, offset), whose offset is the last offset in pair.
@Getter
private final ConcurrentLongHashMap<CompletableFuture<Pair<ManagedCursor, Long>>> cursors;
private final Map<Long, CompletableFuture<Pair<ManagedCursor, Long>>> cursors;

// used to track all created cursor, since above consumers may be remove and in fly,
// use this map will not leak cursor when close.
@Getter
private final ConcurrentMap<String, ManagedCursor> createdCursors;
private final Map<String, ManagedCursor> createdCursors;

// track last access time(millis) for offsets <offset, time>
@Getter
private final ConcurrentLongHashMap<Long> lastAccessTimes;
private final Map<Long, Long> lastAccessTimes;

KafkaTopicConsumerManager(KafkaRequestHandler requestHandler, PersistentTopic topic) {
this.topic = topic;
this.cursors = new ConcurrentLongHashMap<>();
this.cursors = new ConcurrentHashMap<>();
this.createdCursors = new ConcurrentHashMap<>();
this.lastAccessTimes = new ConcurrentLongHashMap<>();
this.lastAccessTimes = new ConcurrentHashMap<>();
this.requestHandler = requestHandler;
this.rwLock = new ReentrantReadWriteLock();
this.closed = false;
}

// delete expired cursors, so backlog can be cleared.
Expand All @@ -82,20 +78,13 @@ void deleteExpiredCursor(long current, long expirePeriodMillis) {
}

void deleteOneExpiredCursor(long offset) {
CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture;

// need not do anything, since this tcm already in closing state. and close() will delete every thing.
rwLock.readLock().lock();
try {
if (closed) {
return;
}
cursorFuture = cursors.remove(offset);
lastAccessTimes.remove(offset);
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
return;
}

final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
lastAccessTimes.remove(offset);

if (cursorFuture != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor timed out for offset: {}, cursors cache size: {}",
Expand All @@ -115,6 +104,9 @@ void deleteOneExpiredCursor(long offset) {

// delete passed in cursor.
void deleteOneCursorAsync(ManagedCursor cursor, String reason) {
if (closed.get()) {
return;
}
if (cursor != null) {
topic.getManagedLedger().asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() {
@Override
Expand All @@ -139,56 +131,42 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
// remove from cache, so another same offset read could happen.
// each success remove should have a following add.
public CompletableFuture<Pair<ManagedCursor, Long>> removeCursorFuture(long offset) {
rwLock.readLock().lock();
try {
if (closed) {
return null;
}
lastAccessTimes.remove(offset);
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
if (cursorFuture == null) {
return asyncCreateCursorIfNotExists(offset);
}
if (closed.get()) {
return null;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
}
return cursorFuture;
} finally {
rwLock.readLock().unlock();
lastAccessTimes.remove(offset);
final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = cursors.remove(offset);
if (cursorFuture == null) {
return asyncCreateCursorIfNotExists(offset);
}

if (log.isDebugEnabled()) {
log.debug("[{}] Get cursor for offset: {} in cache. cache size: {}",
requestHandler.ctx.channel(), offset, cursors.size());
}
return cursorFuture;
}

private CompletableFuture<Pair<ManagedCursor, Long>> asyncCreateCursorIfNotExists(long offset) {
rwLock.readLock().lock();
try {
if (closed) {
return null;
}
cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset));

// notice: above would add a <offset, null-Pair>
lastAccessTimes.remove(offset);
return cursors.remove(offset);
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
return null;
}
cursors.putIfAbsent(offset, asyncGetCursorByOffset(offset));

// notice: above would add a <offset, null-Pair>
lastAccessTimes.remove(offset);
return cursors.remove(offset);
}

public void add(long offset, Pair<ManagedCursor, Long> pair) {
checkArgument(offset == pair.getRight(),
"offset not equal. key: " + offset + " value: " + pair.getRight());

rwLock.readLock().lock();
try {
if (closed) {
ManagedCursor managedCursor = pair.getLeft();
deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed");
return;
}
} finally {
rwLock.readLock().unlock();
if (closed.get()) {
ManagedCursor managedCursor = pair.getLeft();
deleteOneCursorAsync(managedCursor, "A race - add cursor back but tcm already closed");
return;
}

final CompletableFuture<Pair<ManagedCursor, Long>> cursorFuture = CompletableFuture.completedFuture(pair);
Expand All @@ -206,49 +184,43 @@ public void add(long offset, Pair<ManagedCursor, Long> pair) {
// called when channel closed.
@Override
public void close() {
final ConcurrentLongHashMap<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose =
new ConcurrentLongHashMap<>();
ConcurrentMap<String, ManagedCursor> cursorsToClose;
rwLock.writeLock().lock();
try {
if (closed) {
return;
}
closed = true;
if (log.isDebugEnabled()) {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
}
cursors.forEach(cursorFuturesToClose::put);
cursors.clear();
lastAccessTimes.clear();
cursorsToClose = new ConcurrentHashMap<>();
createdCursors.forEach(cursorsToClose::put);
createdCursors.clear();
} finally {
rwLock.writeLock().unlock();
if (!closed.compareAndSet(false, true)) {
return;
}

cursorFuturesToClose.values().forEach(cursorFuture -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Close TCM for topic {}.",
requestHandler.ctx.channel(), topic.getName());
}
final List<CompletableFuture<Pair<ManagedCursor, Long>>> cursorFuturesToClose = new ArrayList<>();
cursors.forEach((ignored, cursorFuture) -> cursorFuturesToClose.add(cursorFuture));
cursors.clear();
lastAccessTimes.clear();
final List<ManagedCursor> cursorsToClose = new ArrayList<>();
createdCursors.forEach((ignored, cursor) -> cursorsToClose.add(cursor));
createdCursors.clear();

cursorFuturesToClose.forEach(cursorFuture -> {
cursorFuture.whenComplete((pair, e) -> {
if (e != null || pair == null) {
return;
}
ManagedCursor cursor = pair.getLeft();
deleteOneCursorAsync(cursor, "TopicConsumerManager close");
if (cursor != null) {
cursorsToClose.remove(cursor.getName());
}
});
});
cursorFuturesToClose.clear();

// delete dangling createdCursors
cursorsToClose.values().forEach(cursor ->
cursorsToClose.forEach(cursor ->
deleteOneCursorAsync(cursor, "TopicConsumerManager close but cursor is still outstanding"));
cursorsToClose.clear();
}

private CompletableFuture<Pair<ManagedCursor, Long>> asyncGetCursorByOffset(long offset) {
if (closed.get()) {
// return a null completed future instead of null because the returned value will be put into a Map
return CompletableFuture.completedFuture(null);
}
final ManagedLedger ledger = topic.getManagedLedger();
return ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).thenApply(position -> {
final String cursorName = "kop-consumer-cursor-" + topic.getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class KafkaTopicManager {
// remove expired cursors, so backlog can be cleared.
private static final long checkPeriodMillis = 1 * 60 * 1000;
private static final long expirePeriodMillis = 2 * 60 * 1000;
private final ScheduledFuture<?> cursorExpireTask;
private static volatile ScheduledFuture<?> cursorExpireTask = null;

private final AtomicBoolean closed = new AtomicBoolean(false);

Expand All @@ -83,19 +84,25 @@ public class KafkaTopicManager {
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);

// check expired cursor every 1 min.
this.cursorExpireTask = brokerService.executor().scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule a check of expired cursor",
requestHandler.ctx.channel());
}
consumerTopicManagers.values().forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
initializeCursorExpireTask(brokerService.executor());
}

private static void initializeCursorExpireTask(final ScheduledExecutorService executor) {
if (cursorExpireTask == null) {
synchronized (KafkaTopicManager.class) {
if (cursorExpireTask == null) {
// check expired cursor every 1 min.
cursorExpireTask = executor.scheduleWithFixedDelay(() -> {
long current = System.currentTimeMillis();
consumerTopicManagers.values().forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
}
});
}, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS);
}
});
}, checkPeriodMillis, checkPeriodMillis, TimeUnit.MILLISECONDS);
}
}
}

// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
Expand Down Expand Up @@ -296,8 +303,6 @@ public void close() {
}

try {
this.cursorExpireTask.cancel(true);

closeKafkaTopicConsumerManagers();

topics.keySet().forEach(topicName -> {
Expand Down Expand Up @@ -369,6 +374,12 @@ public static void removeKafkaTopicConsumerManager(String topicName) {
}

public static void closeKafkaTopicConsumerManagers() {
synchronized (KafkaTopicManager.class) {
if (cursorExpireTask != null) {
cursorExpireTask.cancel(true);
cursorExpireTask = null;
}
}
consumerTopicManagers.forEach((topic, tcmFuture) -> {
try {
Optional.ofNullable(tcmFuture.get(300, TimeUnit.SECONDS))
Expand Down

0 comments on commit 793ab75

Please sign in to comment.