diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index a32625926e72c..306b6398b5c50 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -18,13 +18,18 @@ */ package org.apache.pulsar.metadata.bookkeeper; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; +import static org.apache.pulsar.common.util.FutureUtil.Sequencer; +import static org.apache.pulsar.common.util.FutureUtil.waitForAll; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -42,10 +47,10 @@ import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.Notification; -import org.apache.pulsar.metadata.api.NotificationType; @Slf4j public class PulsarRegistrationClient implements RegistrationClient { @@ -56,20 +61,22 @@ public class PulsarRegistrationClient implements RegistrationClient { private final String bookieRegistrationPath; private final String bookieAllRegistrationPath; private final String bookieReadonlyRegistrationPath; - - private final ConcurrentHashMap> bookieServiceInfoCache = - new ConcurrentHashMap(); private final Set writableBookiesWatchers = new CopyOnWriteArraySet<>(); private final Set readOnlyBookiesWatchers = new CopyOnWriteArraySet<>(); private final MetadataCache bookieServiceInfoMetadataCache; private final ScheduledExecutorService executor; + private final Map> writableBookieInfo; + private final Map> readOnlyBookieInfo; + private final FutureUtil.Sequencer sequencer; public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) { this.store = store; this.ledgersRootPath = ledgersRootPath; this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE); - + this.sequencer = Sequencer.create(); + this.writableBookieInfo = new ConcurrentHashMap<>(); + this.readOnlyBookieInfo = new ConcurrentHashMap<>(); // Following Bookie Network Address Changes is an expensive operation // as it requires additional ZooKeeper watches // we can disable this feature, in case the BK cluster has only @@ -77,7 +84,6 @@ public PulsarRegistrationClient(MetadataStore store, this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE; this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; - this.executor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client")); @@ -91,38 +97,62 @@ public void close() { @Override public CompletableFuture>> getWritableBookies() { - return getChildren(bookieRegistrationPath); + return getBookiesThenFreshCache(bookieRegistrationPath); } @Override public CompletableFuture>> getAllBookies() { // this method is meant to return all the known bookies, even the bookies // that are not in a running state - return getChildren(bookieAllRegistrationPath); + return getBookiesThenFreshCache(bookieAllRegistrationPath); } @Override public CompletableFuture>> getReadOnlyBookies() { - return getChildren(bookieReadonlyRegistrationPath); + return getBookiesThenFreshCache(bookieReadonlyRegistrationPath); } - private CompletableFuture>> getChildren(String path) { + /** + * @throws IllegalArgumentException if parameter path is null or empty. + */ + private CompletableFuture>> getBookiesThenFreshCache(String path) { + if (path == null || path.isEmpty()) { + return failedFuture( + new IllegalArgumentException("parameter [path] can not be null or empty.")); + } return store.getChildren(path) .thenComposeAsync(children -> { - Set bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children); - List> bookieInfoUpdated = - new ArrayList<>(bookieIds.size()); + final Set bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children); + final List> bookieInfoUpdated = new ArrayList<>(bookieIds.size()); for (BookieId id : bookieIds) { // update the cache for new bookies - if (!bookieServiceInfoCache.containsKey(id)) { - bookieInfoUpdated.add(readBookieServiceInfoAsync(id)); + if (path.equals(bookieReadonlyRegistrationPath) && readOnlyBookieInfo.get(id) == null) { + bookieInfoUpdated.add(readBookieInfoAsReadonlyBookie(id)); + continue; + } + if (path.equals(bookieRegistrationPath) && writableBookieInfo.get(id) == null) { + bookieInfoUpdated.add(readBookieInfoAsWritableBookie(id)); + continue; + } + if (path.equals(bookieAllRegistrationPath)) { + if (writableBookieInfo.get(id) != null || readOnlyBookieInfo.get(id) != null) { + // jump to next bookie id + continue; + } + // check writable first + final CompletableFuture revalidateAllBookiesFuture = readBookieInfoAsWritableBookie(id) + .thenCompose(writableBookieInfo -> writableBookieInfo + .>>>map( + bookieServiceInfo -> completedFuture(null)) + // check read-only then + .orElseGet(() -> readBookieInfoAsReadonlyBookie(id))); + bookieInfoUpdated.add(revalidateAllBookiesFuture); } } if (bookieInfoUpdated.isEmpty()) { - return CompletableFuture.completedFuture(bookieIds); + return completedFuture(bookieIds); } else { - return FutureUtil - .waitForAll(bookieInfoUpdated) + return waitForAll(bookieInfoUpdated) .thenApply(___ -> bookieIds); } }) @@ -153,42 +183,67 @@ public void unwatchReadOnlyBookies(RegistrationListener registrationListener) { readOnlyBookiesWatchers.remove(registrationListener); } - private void handleDeletedBookieNode(Notification n) { - if (n.getType() == NotificationType.Deleted) { - BookieId bookieId = stripBookieIdFromPath(n.getPath()); - if (bookieId != null) { - log.info("Bookie {} disappeared", bookieId); - bookieServiceInfoCache.remove(bookieId); - } + /** + * This method will receive metadata store notifications and then update the + * local cache in background sequentially. + */ + private void updatedBookies(Notification n) { + // make the notification callback run sequential in background. + final String path = n.getPath(); + if (!path.startsWith(bookieReadonlyRegistrationPath) && !path.startsWith(bookieRegistrationPath)) { + // ignore unknown path + return; } - } - - private void handleUpdatedBookieNode(Notification n) { - BookieId bookieId = stripBookieIdFromPath(n.getPath()); - if (bookieId != null) { - log.info("Bookie {} info updated", bookieId); - readBookieServiceInfoAsync(bookieId); + if (path.equals(bookieReadonlyRegistrationPath) || path.equals(bookieRegistrationPath)) { + // ignore root path + return; } - } - - private void updatedBookies(Notification n) { - if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) { - if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) { - getReadOnlyBookies().thenAccept(bookies -> { - readOnlyBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))); - }); - handleDeletedBookieNode(n); - } else if (n.getPath().startsWith(bookieRegistrationPath)) { - getWritableBookies().thenAccept(bookies -> - writableBookiesWatchers.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)))); - handleDeletedBookieNode(n); - } - } else if (n.getType() == NotificationType.Modified) { - if (n.getPath().startsWith(bookieReadonlyRegistrationPath) - || n.getPath().startsWith(bookieRegistrationPath)) { - handleUpdatedBookieNode(n); + final BookieId bookieId = stripBookieIdFromPath(n.getPath()); + sequencer.sequential(() -> { + switch (n.getType()) { + case Created: + log.info("Bookie {} created. path: {}", bookieId, n.getPath()); + if (path.startsWith(bookieReadonlyRegistrationPath)) { + return getReadOnlyBookies().thenAccept(bookies -> + readOnlyBookiesWatchers.forEach(w -> + executor.execute(() -> w.onBookiesChanged(bookies)))); + } + return getWritableBookies().thenAccept(bookies -> + writableBookiesWatchers.forEach(w -> + executor.execute(() -> w.onBookiesChanged(bookies)))); + case Modified: + if (bookieId == null) { + return completedFuture(null); + } + log.info("Bookie {} modified. path: {}", bookieId, n.getPath()); + if (path.startsWith(bookieReadonlyRegistrationPath)) { + return readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null); + } + return readBookieInfoAsWritableBookie(bookieId).thenApply(__ -> null); + case Deleted: + if (bookieId == null) { + return completedFuture(null); + } + log.info("Bookie {} deleted. path: {}", bookieId, n.getPath()); + if (path.startsWith(bookieReadonlyRegistrationPath)) { + readOnlyBookieInfo.remove(bookieId); + return getReadOnlyBookies().thenAccept(bookies -> { + readOnlyBookiesWatchers.forEach(w -> + executor.execute(() -> w.onBookiesChanged(bookies))); + }); + } + if (path.startsWith(bookieRegistrationPath)) { + writableBookieInfo.remove(bookieId); + return getWritableBookies().thenAccept(bookies -> { + writableBookiesWatchers.forEach(w -> + executor.execute(() -> w.onBookiesChanged(bookies))); + }); + } + return completedFuture(null); + default: + return completedFuture(null); } - } + }); } private static BookieId stripBookieIdFromPath(String path) { @@ -200,7 +255,7 @@ private static BookieId stripBookieIdFromPath(String path) { try { return BookieId.parse(path.substring(slash + 1)); } catch (IllegalArgumentException e) { - log.warn("Cannot decode bookieId from {}", path, e); + log.warn("Cannot decode bookieId from {}, error: {}", path, e.getMessage()); } } return null; @@ -227,46 +282,48 @@ public CompletableFuture> getBookieServiceInfo(Book // this is because there are a few cases in which some operations on the main thread // wait for the result. This is due to the fact that resolving the address of a bookie // is needed in many code paths. - Versioned resultFromCache = bookieServiceInfoCache.get(bookieId); + Versioned info; + if ((info = writableBookieInfo.get(bookieId)) == null) { + info = readOnlyBookieInfo.get(bookieId); + } if (log.isDebugEnabled()) { - log.debug("getBookieServiceInfo {} -> {}", bookieId, resultFromCache); + log.debug("getBookieServiceInfo {} -> {}", bookieId, info); } - if (resultFromCache != null) { - return CompletableFuture.completedFuture(resultFromCache); + if (info != null) { + return completedFuture(info); } else { return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException()); } } - public CompletableFuture readBookieServiceInfoAsync(BookieId bookieId) { - String asWritable = bookieRegistrationPath + "/" + bookieId; - return bookieServiceInfoMetadataCache.get(asWritable) - .thenCompose((Optional getResult) -> { - if (getResult.isPresent()) { - Versioned res = - new Versioned<>(getResult.get(), new LongVersion(-1)); - log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, getResult.get()); - bookieServiceInfoCache.put(bookieId, res); - return CompletableFuture.completedFuture(null); - } else { - return readBookieInfoAsReadonlyBookie(bookieId); - } - } - ); + public CompletableFuture>> readBookieInfoAsWritableBookie( + BookieId bookieId) { + final String asWritable = bookieRegistrationPath + "/" + bookieId; + return bookieServiceInfoMetadataCache.getWithStats(asWritable) + .thenApply((Optional> bkInfoWithStats) -> { + if (bkInfoWithStats.isPresent()) { + final CacheGetResult r = bkInfoWithStats.get(); + log.info("Update BookieInfoCache (writable bookie) {} -> {}", bookieId, r.getValue()); + writableBookieInfo.put(bookieId, + new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion()))); + } + return bkInfoWithStats; + } + ); } - final CompletableFuture readBookieInfoAsReadonlyBookie(BookieId bookieId) { - String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; - return bookieServiceInfoMetadataCache.get(asReadonly) - .thenApply((Optional getResultAsReadOnly) -> { - if (getResultAsReadOnly.isPresent()) { - Versioned res = - new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1)); - log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, - getResultAsReadOnly.get()); - bookieServiceInfoCache.put(bookieId, res); + final CompletableFuture>> readBookieInfoAsReadonlyBookie( + BookieId bookieId) { + final String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId; + return bookieServiceInfoMetadataCache.getWithStats(asReadonly) + .thenApply((Optional> bkInfoWithStats) -> { + if (bkInfoWithStats.isPresent()) { + final CacheGetResult r = bkInfoWithStats.get(); + log.info("Update BookieInfoCache (readonly bookie) {} -> {}", bookieId, r.getValue()); + readOnlyBookieInfo.put(bookieId, + new Versioned<>(r.getValue(), new LongVersion(r.getStat().getVersion()))); } - return null; + return bkInfoWithStats; }); } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java new file mode 100644 index 0000000000000..bcbf41addbae3 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/FaultInjectableZKRegistrationManager.java @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.metadata.bookkeeper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; +import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY; +import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID; +import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException; +import org.apache.bookkeeper.bookie.BookieException.CookieExistException; +import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException; +import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BKException.BKInterruptedException; +import org.apache.bookkeeper.client.BKException.MetaStoreException; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.RegistrationClient; +import org.apache.bookkeeper.discover.RegistrationManager; +import org.apache.bookkeeper.discover.ZKRegistrationClient; +import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; +import org.apache.bookkeeper.meta.LayoutManager; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.ZkLayoutManager; +import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; +import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.versioning.LongVersion; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +/** + * Fault injectable ZK registration manager. + * Copy from #{@link org.apache.bookkeeper.discover.ZKRegistrationManager}. + */ +@Slf4j +public class FaultInjectableZKRegistrationManager implements RegistrationManager { + + private static final Function EXCEPTION_FUNC = cause -> { + if (cause instanceof BKException) { + log.error("Failed to get bookie list : ", cause); + return (BKException) cause; + } else if (cause instanceof InterruptedException) { + log.error("Interrupted reading bookie list : ", cause); + return new BKInterruptedException(); + } else { + return new MetaStoreException(); + } + }; + + private final ServerConfiguration conf; + private final ZooKeeper zk; + private final List zkAcls; + private final LayoutManager layoutManager; + + private volatile boolean zkRegManagerInitialized = false; + + // ledgers root path + private final String ledgersRootPath; + // cookie path + private final String cookiePath; + // registration paths + protected final String bookieRegistrationPath; + protected final String bookieReadonlyRegistrationPath; + // session timeout in milliseconds + private final int zkTimeoutMs; + private final List listeners = new ArrayList<>(); + private Function hookOnRegisterReadOnly; + + public FaultInjectableZKRegistrationManager(ServerConfiguration conf, + ZooKeeper zk) { + this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf)); + } + + public FaultInjectableZKRegistrationManager(ServerConfiguration conf, + ZooKeeper zk, + String ledgersRootPath) { + this.conf = conf; + this.zk = zk; + this.zkAcls = ZkUtils.getACLs(conf); + this.ledgersRootPath = ledgersRootPath; + this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE; + this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; + this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; + this.zkTimeoutMs = conf.getZkTimeout(); + + this.layoutManager = new ZkLayoutManager( + zk, + ledgersRootPath, + zkAcls); + + this.zk.register(event -> { + if (!zkRegManagerInitialized) { + // do nothing until first registration + return; + } + // Check for expired connection. + if (event.getType().equals(EventType.None) + && event.getState().equals(KeeperState.Expired)) { + listeners.forEach(RegistrationListener::onRegistrationExpired); + } + }); + } + + @Override + public void close() { + // no-op + } + + /** + * Returns the CookiePath of the bookie in the ZooKeeper. + * + * @param bookieId bookie id + * @return + */ + public String getCookiePath(BookieId bookieId) { + return this.cookiePath + "/" + bookieId; + } + + // + // Registration Management + // + + /** + * Check existence of regPath and wait it expired if possible. + * + * @param regPath reg node path. + * @return true if regPath exists, otherwise return false + * @throws IOException if can't create reg path + */ + protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException { + final CountDownLatch prevNodeLatch = new CountDownLatch(1); + Watcher zkPrevRegNodewatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // Check for prev znode deletion. Connection expiration is + // not handling, since bookie has logic to shutdown. + if (EventType.NodeDeleted == event.getType()) { + prevNodeLatch.countDown(); + } + } + }; + try { + Stat stat = zk.exists(regPath, zkPrevRegNodewatcher); + if (null != stat) { + // if the ephemeral owner isn't current zookeeper client + // wait for it to be expired. + if (stat.getEphemeralOwner() != zk.getSessionId()) { + log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:" + + " {} ms for znode deletion", regPath, zkTimeoutMs); + // waiting for the previous bookie reg znode deletion + if (!prevNodeLatch.await(zkTimeoutMs, TimeUnit.MILLISECONDS)) { + throw new NodeExistsException(regPath); + } else { + return false; + } + } + return true; + } else { + return false; + } + } catch (KeeperException ke) { + log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke); + throw new IOException("ZK exception checking and wait ephemeral znode " + + regPath + " expired", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie); + throw new IOException("Interrupted checking and wait ephemeral znode " + + regPath + " expired", ie); + } + } + + @Override + public void registerBookie(BookieId bookieId, boolean readOnly, + BookieServiceInfo bookieServiceInfo) throws BookieException { + if (!readOnly) { + String regPath = bookieRegistrationPath + "/" + bookieId; + doRegisterBookie(regPath, bookieServiceInfo); + } else { + doRegisterReadOnlyBookie(bookieId, bookieServiceInfo); + } + } + + @VisibleForTesting + static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) { + if (log.isDebugEnabled()) { + log.debug("serialize BookieServiceInfo {}", bookieServiceInfo); + } + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + BookieServiceInfoFormat.Builder builder = BookieServiceInfoFormat.newBuilder(); + List bsiEndpoints = bookieServiceInfo.getEndpoints().stream() + .map(e -> { + return BookieServiceInfoFormat.Endpoint.newBuilder() + .setId(e.getId()) + .setPort(e.getPort()) + .setHost(e.getHost()) + .setProtocol(e.getProtocol()) + .addAllAuth(e.getAuth()) + .addAllExtensions(e.getExtensions()) + .build(); + }) + .collect(Collectors.toList()); + + builder.addAllEndpoints(bsiEndpoints); + builder.putAllProperties(bookieServiceInfo.getProperties()); + + builder.build().writeTo(os); + return os.toByteArray(); + } catch (IOException err) { + log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo); + throw new RuntimeException(err); + } + } + + private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException { + // ZK ephemeral node for this Bookie. + try { + if (!checkRegNodeAndWaitExpired(regPath)) { + // Create the ZK ephemeral node for this Bookie. + zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL); + zkRegManagerInitialized = true; + } + } catch (KeeperException ke) { + log.error("ZK exception registering ephemeral Znode for Bookie!", ke); + // Throw an IOException back up. This will cause the Bookie + // constructor to error out. Alternatively, we could do a System + // exit here as this is a fatal error. + throw new MetadataStoreException(ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie); + // Throw an IOException back up. This will cause the Bookie + // constructor to error out. Alternatively, we could do a System + // exit here as this is a fatal error. + throw new MetadataStoreException(ie); + } catch (IOException e) { + throw new MetadataStoreException(e); + } + } + + private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo bookieServiceInfo) + throws BookieException { + try { + if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) { + try { + zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo), + zkAcls, CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // this node is just now created by someone. + } + } + String regPath = bookieReadonlyRegistrationPath + "/" + bookieId; + doRegisterBookie(regPath, bookieServiceInfo); + // clear the write state + regPath = bookieRegistrationPath + "/" + bookieId; + try { + if (hookOnRegisterReadOnly != null) { + hookOnRegisterReadOnly.apply(null); + } + // Clear the current registered node + zk.delete(regPath, -1); + } catch (KeeperException.NoNodeException nne) { + log.warn("No writable bookie registered node {} when transitioning to readonly", + regPath, nne); + } + } catch (KeeperException | InterruptedException e) { + throw new MetadataStoreException(e); + } + } + + @Override + public void unregisterBookie(BookieId bookieId, boolean readOnly) throws BookieException { + String regPath; + if (!readOnly) { + regPath = bookieRegistrationPath + "/" + bookieId; + } else { + regPath = bookieReadonlyRegistrationPath + "/" + bookieId; + } + doUnregisterBookie(regPath); + } + + private void doUnregisterBookie(String regPath) throws BookieException { + try { + zk.delete(regPath, -1); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new MetadataStoreException(ie); + } catch (KeeperException e) { + throw new MetadataStoreException(e); + } + } + + // + // Cookie Management + // + + @Override + public void writeCookie(BookieId bookieId, + Versioned cookieData) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + if (Version.NEW == cookieData.getVersion()) { + if (zk.exists(cookiePath, false) == null) { + try { + zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT); + } catch (NodeExistsException nne) { + log.info("More than one bookie tried to create {} at once. Safe to ignore.", + cookiePath); + } + } + zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT); + } else { + if (!(cookieData.getVersion() instanceof LongVersion)) { + throw new BookieIllegalOpException("Invalid version type, expected it to be LongVersion"); + } + zk.setData( + zkPath, + cookieData.getValue(), + (int) ((LongVersion) cookieData.getVersion()).getLongVersion()); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, ie); + } catch (NoNodeException nne) { + throw new CookieNotFoundException(bookieId.toString()); + } catch (NodeExistsException nee) { + throw new CookieExistException(bookieId.toString()); + } catch (KeeperException e) { + throw new MetadataStoreException("Failed to write cookie for bookie " + bookieId); + } + } + + @Override + public Versioned readCookie(BookieId bookieId) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + Stat stat = zk.exists(zkPath, false); + byte[] data = zk.getData(zkPath, false, stat); + // sets stat version from ZooKeeper + LongVersion version = new LongVersion(stat.getVersion()); + return new Versioned<>(data, version); + } catch (NoNodeException nne) { + throw new CookieNotFoundException(bookieId.toString()); + } catch (KeeperException | InterruptedException e) { + throw new MetadataStoreException("Failed to read cookie for bookie " + bookieId); + } + } + + @Override + public void removeCookie(BookieId bookieId, Version version) throws BookieException { + String zkPath = getCookiePath(bookieId); + try { + zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion()); + } catch (NoNodeException e) { + throw new CookieNotFoundException(bookieId.toString()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e); + } catch (KeeperException e) { + throw new MetadataStoreException("Failed to delete cookie for bookie " + bookieId); + } + + log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId); + } + + + @Override + public String getClusterInstanceId() throws BookieException { + String instanceId = null; + try { + if (zk.exists(ledgersRootPath, null) == null) { + log.error("BookKeeper metadata doesn't exist in zookeeper. " + + "Has the cluster been initialized? " + + "Try running bin/bookkeeper shell metaformat"); + throw new KeeperException.NoNodeException("BookKeeper metadata"); + } + try { + byte[] data = zk.getData(ledgersRootPath + "/" + + INSTANCEID, false, null); + instanceId = new String(data, UTF_8); + } catch (KeeperException.NoNodeException e) { + log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification"); + } + } catch (KeeperException | InterruptedException e) { + throw new MetadataStoreException("Failed to get cluster instance id", e); + } + return instanceId; + } + + @Override + public boolean prepareFormat() throws Exception { + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); + // Create ledgers root node if not exists + if (!ledgerRootExists) { + ZkUtils.createFullPathOptimistic(zk, ledgersRootPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, + CreateMode.PERSISTENT); + } + // create available bookies node if not exists + if (!availableNodeExists) { + zk.create(bookieRegistrationPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); + } + + // create readonly bookies node if not exists + if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { + zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT); + } + + return ledgerRootExists; + } + + @Override + public boolean initNewCluster() throws Exception { + String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); + String instanceIdPath = ledgersRootPath + "/" + INSTANCEID; + log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers, + ledgersRootPath); + + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + + if (ledgerRootExists) { + log.error("Ledger root path: {} already exists", ledgersRootPath); + return false; + } + + List multiOps = Lists.newArrayListWithExpectedSize(4); + + // Create ledgers root node + multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); + + // create available bookies node + multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); + + // create readonly bookies node + multiOps.add(Op.create( + bookieReadonlyRegistrationPath, + EMPTY_BYTE_ARRAY, + zkAcls, + CreateMode.PERSISTENT)); + + // create INSTANCEID + String instanceId = UUID.randomUUID().toString(); + multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8), + zkAcls, CreateMode.PERSISTENT)); + + // execute the multi ops + zk.multi(multiOps); + + // creates the new layout and stores in zookeeper + AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); + + log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers, + ledgersRootPath, instanceId); + return true; + } + + @Override + public boolean nukeExistingCluster() throws Exception { + String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); + log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}", + zkServers, ledgersRootPath); + + boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); + if (!ledgerRootExists) { + log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, " + + "so exiting nuke operation", ledgersRootPath, zkServers); + return true; + } + + boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); + try (RegistrationClient regClient = new ZKRegistrationClient( + zk, + ledgersRootPath, + null, + false + )) { + if (availableNodeExists) { + Collection rwBookies = FutureUtils + .result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue(); + if (rwBookies != null && !rwBookies.isEmpty()) { + log.error("Bookies are still up and connected to this cluster, " + + "stop all bookies before nuking the cluster"); + return false; + } + + boolean readonlyNodeExists = null != zk.exists(bookieReadonlyRegistrationPath, false); + if (readonlyNodeExists) { + Collection roBookies = FutureUtils + .result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue(); + if (roBookies != null && !roBookies.isEmpty()) { + log.error("Readonly Bookies are still up and connected to this cluster, " + + "stop all bookies before nuking the cluster"); + return false; + } + } + } + } + + LedgerManagerFactory ledgerManagerFactory = + AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); + return ledgerManagerFactory.validateAndNukeExistingCluster(conf, layoutManager); + } + + @Override + public boolean format() throws Exception { + // Clear underreplicated ledgers + try { + ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("underreplicated ledgers root path node not exists in zookeeper to delete"); + } + } + + // Clear underreplicatedledger locks + try { + ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/' + + BookKeeperConstants.UNDER_REPLICATION_LOCK); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("underreplicatedledger locks node not exists in zookeeper to delete"); + } + } + + // Clear the cookies + try { + ZKUtil.deleteRecursive(zk, cookiePath); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("cookies node not exists in zookeeper to delete"); + } + } + + // Clear the INSTANCEID + try { + zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1); + } catch (KeeperException.NoNodeException e) { + if (log.isDebugEnabled()) { + log.debug("INSTANCEID not exists in zookeeper to delete"); + } + } + + // create INSTANCEID + String instanceId = UUID.randomUUID().toString(); + zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, + instanceId.getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); + + log.info("Successfully formatted BookKeeper metadata"); + return true; + } + + @Override + public boolean isBookieRegistered(BookieId bookieId) throws BookieException { + String regPath = bookieRegistrationPath + "/" + bookieId; + String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + bookieId; + try { + return ((null != zk.exists(regPath, false)) || (null != zk.exists(readonlyRegPath, false))); + } catch (KeeperException e) { + log.error("ZK exception while checking registration ephemeral znodes for BookieId: {}", bookieId, e); + throw new MetadataStoreException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("InterruptedException while checking registration ephemeral znodes for BookieId: {}", bookieId, + e); + throw new MetadataStoreException(e); + } + } + + @Override + public void addRegistrationListener(RegistrationListener listener) { + listeners.add(listener); + } + + public void betweenRegisterReadOnlyBookie(Function fn) { + hookOnRegisterReadOnly = fn; + } +} diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java index f599451c00710..4dcbcda3d9078 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java @@ -42,9 +42,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.AbstractConfiguration; -import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.discover.RegistrationClient; -import org.apache.bookkeeper.discover.RegistrationManager; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.discover.*; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.versioning.Version; @@ -52,6 +51,7 @@ import org.apache.pulsar.metadata.BaseMetadataStoreTest; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.zookeeper.ZooKeeper; import org.awaitility.Awaitility; import org.testng.annotations.Test; @@ -126,7 +126,7 @@ public void testGetReadonlyBookies(String provider, Supplier urlSupplier public void testGetBookieServiceInfo(String provider, Supplier urlSupplier) throws Exception { @Cleanup MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), - MetadataStoreConfig.builder().fsyncEnable(false).build()); + MetadataStoreConfig.builder().fsyncEnable(false).build()); String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); @@ -168,10 +168,10 @@ public void testGetBookieServiceInfo(String provider, Supplier urlSuppli getAndVerifyAllBookies(rc, addresses); Awaitility.await().untilAsserted(() -> { - for (BookieId address : addresses) { - BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); - compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address)); - }}); + for (BookieId address : addresses) { + BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); + compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address)); + }}); // shutdown the bookies (but keep the cookie) for (BookieId address : addresses) { @@ -184,12 +184,12 @@ public void testGetBookieServiceInfo(String provider, Supplier urlSuppli // getBookieServiceInfo should fail with BKBookieHandleNotAvailableException Awaitility.await().untilAsserted(() -> { - for (BookieId address : addresses) { - assertTrue( - expectThrows(ExecutionException.class, () -> { - rc.getBookieServiceInfo(address).get(); - }).getCause() instanceof BKException.BKBookieHandleNotAvailableException); - }}); + for (BookieId address : addresses) { + assertTrue( + expectThrows(ExecutionException.class, () -> { + rc.getBookieServiceInfo(address).get(); + }).getCause() instanceof BKException.BKBookieHandleNotAvailableException); + }}); // restart the bookies, all writable @@ -241,12 +241,12 @@ public void testGetBookieServiceInfo(String provider, Supplier urlSuppli .await() .ignoreExceptionsMatching(e -> e.getCause() instanceof BKException.BKBookieHandleNotAvailableException) .untilAsserted(() -> { - // verify that infos are updated - for (BookieId address : addresses) { - BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); - compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address)); - } - }); + // verify that infos are updated + for (BookieId address : addresses) { + BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue(); + compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(address)); + } + }); } @@ -318,7 +318,7 @@ private void testWatchBookiesSuccess(String provider, Supplier urlSuppli @Cleanup MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), - MetadataStoreConfig.builder().fsyncEnable(false).build()); + MetadataStoreConfig.builder().fsyncEnable(false).build()); String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); @@ -357,4 +357,88 @@ private void testWatchBookiesSuccess(String provider, Supplier urlSuppli }); } + + @Test + public void testNetworkDelayWithBkZkManager() throws Throwable { + final String zksConnectionString = zks.getConnectionString(); + final String ledgersRoot = "/test/ledgers-" + UUID.randomUUID(); + // prepare registration manager + ZooKeeper zk = new ZooKeeper(zksConnectionString, 5000, null); + final ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setZkLedgersRootPath(ledgersRoot); + final FaultInjectableZKRegistrationManager rm = new FaultInjectableZKRegistrationManager(serverConfiguration, zk); + rm.prepareFormat(); + // prepare registration client + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(zksConnectionString, + MetadataStoreConfig.builder().fsyncEnable(false).build()); + @Cleanup + RegistrationClient rc1 = new PulsarRegistrationClient(store, ledgersRoot); + @Cleanup + RegistrationClient rc2 = new PulsarRegistrationClient(store, ledgersRoot); + + final List addresses = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + addresses.add(BookieId.parse("BOOKIE-" + i)); + } + final Map bookieServiceInfos = new HashMap<>(); + + int port = 223; + for (BookieId address : addresses) { + BookieServiceInfo info = new BookieServiceInfo(); + BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); + endpoint.setAuth(Collections.emptyList()); + endpoint.setExtensions(Collections.emptyList()); + endpoint.setId("id"); + endpoint.setHost("localhost"); + endpoint.setPort(port++); + endpoint.setProtocol("bookie-rpc"); + info.setEndpoints(Arrays.asList(endpoint)); + bookieServiceInfos.put(address, info); + rm.registerBookie(address, false, info); + // write the cookie + rm.writeCookie(address, new Versioned<>(new byte[0], Version.NEW)); + } + + // trigger loading the BookieServiceInfo in the local cache + getAndVerifyAllBookies(rc1, addresses); + getAndVerifyAllBookies(rc2, addresses); + + Awaitility.await().untilAsserted(() -> { + for (BookieId address : addresses) { + compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + } + }); + + // verified the init status. + + + // mock network delay + rm.betweenRegisterReadOnlyBookie(__ -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + }); + + for (int i = 0; i < addresses.size() / 2; i++) { + final BookieId bkId = addresses.get(i); + // turn some bookies to be read only. + rm.registerBookie(bkId, true, bookieServiceInfos.get(bkId)); + } + + Awaitility.await().untilAsserted(() -> { + for (BookieId address : addresses) { + compareBookieServiceInfo(rc1.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + compareBookieServiceInfo(rc2.getBookieServiceInfo(address).get().getValue(), + bookieServiceInfos.get(address)); + } + }); + } }