Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][meta] Bookie Info lost by notification race condition. #20642

Merged
merged 28 commits into from
Jun 30, 2023
Merged
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
*/
package org.apache.pulsar.metadata.bookkeeper;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import static org.apache.pulsar.common.util.FutureUtil.Sequencer;
import static org.apache.pulsar.common.util.FutureUtil.waitForAll;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -42,10 +47,10 @@
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;

@Slf4j
public class PulsarRegistrationClient implements RegistrationClient {
Expand All @@ -56,28 +61,29 @@ public class PulsarRegistrationClient implements RegistrationClient {
private final String bookieRegistrationPath;
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;

private final ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>> bookieServiceInfoCache =
new ConcurrentHashMap();
private final Set<RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet<>();
private final Set<RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet<>();
private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;
private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo;
private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo;
private final FutureUtil.Sequencer<Void> sequencer;

public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);

this.sequencer = Sequencer.create();
this.writableBookieInfo = new ConcurrentHashMap<>();
this.readOnlyBookieInfo = new ConcurrentHashMap<>();
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
// we can disable this feature, in case the BK cluster has only
// static addresses
this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;

this.executor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));

Expand All @@ -91,38 +97,62 @@ public void close() {

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
return getChildren(bookieRegistrationPath);
return getBookiesThenFreshCache(bookieRegistrationPath);
}

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
// this method is meant to return all the known bookies, even the bookies
// that are not in a running state
return getChildren(bookieAllRegistrationPath);
return getBookiesThenFreshCache(bookieAllRegistrationPath);
}

@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
return getChildren(bookieReadonlyRegistrationPath);
return getBookiesThenFreshCache(bookieReadonlyRegistrationPath);
}

private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
/**
* @throws IllegalArgumentException if parameter path is null or empty.
*/
private CompletableFuture<Versioned<Set<BookieId>>> getBookiesThenFreshCache(String path) {
if (path == null || path.isEmpty()) {
return failedFuture(
new IllegalArgumentException(String.format("parameter [path] can not be null or empty.")));
}
return store.getChildren(path)
.thenComposeAsync(children -> {
Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
List<CompletableFuture<?>> bookieInfoUpdated =
new ArrayList<>(bookieIds.size());
final Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
final List<CompletableFuture<?>> bookieInfoUpdated = new ArrayList<>(bookieIds.size());
for (BookieId id : bookieIds) {
// update the cache for new bookies
if (!bookieServiceInfoCache.containsKey(id)) {
if (path.equals(bookieReadonlyRegistrationPath) && readOnlyBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id));
continue;
}
if (path.equals(bookieRegistrationPath) && writableBookieInfo.get(id) == null) {
bookieInfoUpdated.add(readBookieServiceInfoAsync(id));
continue;
}
if (path.equals(bookieAllRegistrationPath)) {
if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) {
// jump to next bookie id
continue;
}
// check writable first
final CompletableFuture<?> revalidateAllBookiesFuture = readBookieServiceInfoAsync(id)
.thenCompose(writableBookieInfo -> writableBookieInfo
.<CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>>>map(
bookieServiceInfo -> completedFuture(null))
// check read-only then
.orElseGet(() -> readBookieInfoAsReadonlyBookie(id)));
bookieInfoUpdated.add(revalidateAllBookiesFuture);
}
}
if (bookieInfoUpdated.isEmpty()) {
return CompletableFuture.completedFuture(bookieIds);
return completedFuture(bookieIds);
} else {
return FutureUtil
.waitForAll(bookieInfoUpdated)
return waitForAll(bookieInfoUpdated)
.thenApply(___ -> bookieIds);
}
})
Expand Down Expand Up @@ -153,42 +183,61 @@ public void unwatchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}

private void handleDeletedBookieNode(Notification n) {
if (n.getType() == NotificationType.Deleted) {
BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
log.info("Bookie {} disappeared", bookieId);
bookieServiceInfoCache.remove(bookieId);
}
/**
* This method will receive metadata store notifications and then update the
* local cache in background sequentially.
*/
private void updatedBookies(Notification n) {
// make the notification callback run sequential in background.
final String path = n.getPath();
if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) {
// ignore unknown path
return;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void handleUpdatedBookieNode(Notification n) {
BookieId bookieId = stripBookieIdFromPath(n.getPath());
if (bookieId != null) {
log.info("Bookie {} info updated", bookieId);
readBookieServiceInfoAsync(bookieId);
if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) {
// ignore root path
return;
}
}

private void updatedBookies(Notification n) {
if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies -> {
readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)));
});
handleDeletedBookieNode(n);
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
handleDeletedBookieNode(n);
}
} else if (n.getType() == NotificationType.Modified) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)
|| n.getPath().startsWith(bookieRegistrationPath)) {
handleUpdatedBookieNode(n);
final BookieId bookieId = stripBookieIdFromPath(n.getPath());
sequencer.sequential(() -> {
switch (n.getType()) {
case Created:
log.info("Bookie {} created. path: {}", stripBookieIdFromPath(n.getPath()), n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
return getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
}
return getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.forEach(w ->
executor.execute(() -> w.onBookiesChanged(bookies))));
case Modified:
if (bookieId == null) {
return completedFuture(null);
}
log.info("Bookie {} modified. path: {}", bookieId, n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
return readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
}
return readBookieServiceInfoAsync(bookieId).thenApply(__ -> null);
case Deleted:
if (bookieId == null) {
return completedFuture(null);
}
log.info("Bookie {} deleted. path: {}", bookieId, n.getPath());
if (path.startsWith(bookieReadonlyRegistrationPath)) {
readOnlyBookieInfo.remove(bookieId);
return completedFuture(null);
}
if (path.startsWith(bookieRegistrationPath)) {
writableBookieInfo.remove(bookieId);
return completedFuture(null);
}
return completedFuture(null);
default:
return completedFuture(null);
}
}
});
}

private static BookieId stripBookieIdFromPath(String path) {
Expand All @@ -200,7 +249,7 @@ private static BookieId stripBookieIdFromPath(String path) {
try {
return BookieId.parse(path.substring(slash + 1));
} catch (IllegalArgumentException e) {
log.warn("Cannot decode bookieId from {}", path, e);
log.warn("Cannot decode bookieId from {}, error: {}", path, e.getMessage());
}
}
return null;
Expand All @@ -227,46 +276,48 @@ public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(Book
// this is because there are a few cases in which some operations on the main thread
// wait for the result. This is due to the fact that resolving the address of a bookie
// is needed in many code paths.
Versioned<BookieServiceInfo> resultFromCache = bookieServiceInfoCache.get(bookieId);
Versioned<BookieServiceInfo> info;
if ((info = writableBookieInfo.get(bookieId)) == null) {
info = readOnlyBookieInfo.get(bookieId);
}
if (log.isDebugEnabled()) {
log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache);
log.debug("getBookieServiceInfo {} -> {}", bookieId, info);
}
if (resultFromCache != null) {
return CompletableFuture.completedFuture(resultFromCache);
if (info != null) {
return completedFuture(info);
} else {
return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
}
}

public CompletableFuture<Void> readBookieServiceInfoAsync(BookieId bookieId) {
String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asWritable)
.thenCompose((Optional<BookieServiceInfo> getResult) -> {
if (getResult.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResult.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get());
bookieServiceInfoCache.put(bookieId, res);
return CompletableFuture.completedFuture(null);
} else {
return readBookieInfoAsReadonlyBookie(bookieId);
}
}
);
public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieServiceInfoAsync(
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
BookieId bookieId) {
final String asWritable = bookieRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.getWithStats(asWritable)
.thenApply((Optional<CacheGetResult<BookieServiceInfo>> bkInfoWithStats) -> {
if (bkInfoWithStats.isPresent()) {
final CacheGetResult<BookieServiceInfo> r = bkInfoWithStats.get();
log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, r.getValue());
writableBookieInfo.put(bookieId,
new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion())));
}
return bkInfoWithStats;
}
);
}

final CompletableFuture<Void> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.get(asReadonly)
.thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
if (getResultAsReadOnly.isPresent()) {
Versioned<BookieServiceInfo> res =
new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId,
getResultAsReadOnly.get());
bookieServiceInfoCache.put(bookieId, res);
final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsReadonlyBookie(
BookieId bookieId) {
final String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
return bookieServiceInfoMetadataCache.getWithStats(asReadonly)
.thenApply((Optional<CacheGetResult<BookieServiceInfo>> bkInfoWithStats) -> {
if (bkInfoWithStats.isPresent()) {
final CacheGetResult<BookieServiceInfo> r = bkInfoWithStats.get();
log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, r.getValue());
readOnlyBookieInfo.put(bookieId,
new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion())));
}
return null;
return bkInfoWithStats;
});
}
}