Skip to content

Commit

Permalink
Enable ZooKeeper client to establish connection in read-only mode (ap…
Browse files Browse the repository at this point in the history
…ache#4244)

### Motivation

If the system property `readonlymode.enabled` is set to true on a ZooKeeper server, read-only mode is enabled. Data can be read from the server in read-only mode even if that server is split from the quorum.
https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#Experimental+Options%2FFeatures

To connect to the server in read-only mode, the client must also allow read-only mode. The `ZooKeeperClient` class in the bookkeeper repository also has an option called `allowReadOnlyMode`.
https://github.com/apache/bookkeeper/blob/15171e1904f7196d8e9f4116ab2aecdf582e0032/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java#L219-L222

However, even if this option is set to true, the connection to the server in read-only mode will actually fail. The cause is in the `ZooKeeperWatcherBase` class. When the `ZooKeeperWatcherBase` class receives the `SyncConnected` event, it releases `clientConnectLatch` and assumes that the connection is complete.
https://github.com/apache/bookkeeper/blob/15171e1904f7196d8e9f4116ab2aecdf582e0032/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java#L128-L144

However, if the server is in read-only mode, it will receive `ConnectedReadOnly` instead of `SyncConnected`. This causes the connection to time out without being completed.

### Changes

Modified the switch statement in the `ZooKeeperWatcherBase` class to release `clientConnectLatch` when `ConnectedReadOnly` is received if the `allowReadOnlyMode` option is true.

By the way, `allowReadOnlyMode` is never set to true in BookKeeper. So this change would be useless for BookKeeper. However, it is useful for Pulsar. Because Pulsar also uses `ZooKeeperWatcherBase` and needs to be able to connect to ZooKeeper in read-only mode.
https://github.com/apache/pulsar/blob/cba1600d0f6a82f1ea194f3214a80f283fe8dc27/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java#L242-L244
  • Loading branch information
massakam authored and Anup Ghatage committed Jul 12, 2024
1 parent 4e1a30c commit c30ab3c
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE

// Create a watcher manager
StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
ZooKeeperWatcherBase watcherManager =
null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
ZooKeeperWatcherBase watcherManager = (null == watchers)
? new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watcherStatsLogger)
: new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watchers, watcherStatsLogger);
ZooKeeperClient client = new ZooKeeperClient(
connectString,
sessionTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ZooKeeperWatcherBase implements Watcher {
.getLogger(ZooKeeperWatcherBase.class);

private final int zkSessionTimeOut;
private final boolean allowReadOnlyMode;
private volatile CountDownLatch clientConnectLatch = new CountDownLatch(1);
private final CopyOnWriteArraySet<Watcher> childWatchers =
new CopyOnWriteArraySet<Watcher>();
Expand All @@ -53,18 +54,20 @@ public class ZooKeeperWatcherBase implements Watcher {
private final ConcurrentHashMap<EventType, Counter> eventCounters =
new ConcurrentHashMap<EventType, Counter>();

public ZooKeeperWatcherBase(int zkSessionTimeOut) {
this(zkSessionTimeOut, NullStatsLogger.INSTANCE);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode) {
this(zkSessionTimeOut, allowReadOnlyMode, NullStatsLogger.INSTANCE);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut, StatsLogger statsLogger) {
this(zkSessionTimeOut, new HashSet<Watcher>(), statsLogger);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode, StatsLogger statsLogger) {
this(zkSessionTimeOut, allowReadOnlyMode, new HashSet<Watcher>(), statsLogger);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut,
boolean allowReadOnlyMode,
Set<Watcher> childWatchers,
StatsLogger statsLogger) {
this.zkSessionTimeOut = zkSessionTimeOut;
this.allowReadOnlyMode = allowReadOnlyMode;
this.childWatchers.addAll(childWatchers);
this.statsLogger = statsLogger;
}
Expand Down Expand Up @@ -130,6 +133,14 @@ public void process(WatchedEvent event) {
LOG.info("ZooKeeper client is connected now.");
clientConnectLatch.countDown();
break;
case ConnectedReadOnly:
if (allowReadOnlyMode) {
LOG.info("ZooKeeper client is connected in read-only mode now.");
clientConnectLatch.countDown();
} else {
LOG.warn("ZooKeeper client is connected in read-only mode, which is not allowed.");
}
break;
case Disconnected:
LOG.info("ZooKeeper client is disconnected from zookeeper now,"
+ " but it is OK unless we received EXPIRED event.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void run() {
byte[] sessionPasswd = bkc.getZkHandle().getSessionPasswd();

try {
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
watcher, sessionId, sessionPasswd);
zk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ protected ZooKeeper createZooKeeper() throws IOException {
public void testZKConnectionLossForLedgerCreation() throws Exception {
int zkSessionTimeOut = 10000;
AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID);
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
* create MockZooKeeperClient instance and wait for it to be connected.
*/
int zkSessionTimeOut = 10000;
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
default void expireSession(ZooKeeper zk) throws Exception {
long id = zk.getSessionId();
byte[] password = zk.getSessionPasswd();
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(), zk.getSessionTimeout(), w, id, password);
w.waitForConnection();
zk2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,12 @@ public void killCluster() throws Exception {
public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}

public void stopPeer(int id) throws Exception {
quorumUtil.shutdown(id);
}

public void enableLocalSession(boolean localSessionEnabled) {
quorumUtil.enableLocalSession(localSessionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void process(WatchedEvent event) {
};
final int timeout = 2000;
ZooKeeperWatcherBase watcherManager =
new ZooKeeperWatcherBase(timeout).addChildWatcher(testWatcher);
new ZooKeeperWatcherBase(timeout, false).addChildWatcher(testWatcher);
List<Watcher> watchers = new ArrayList<Watcher>(1);
watchers.add(testWatcher);
ZooKeeperClient client = new ShutdownZkServerClient(
Expand Down Expand Up @@ -895,4 +895,32 @@ public void processResult(int rc, String path, Object ctx) {
logger.info("Delete children from znode " + path);
}

@Test
public void testAllowReadOnlyMode() throws Exception {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "true");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(true);
zkUtil.restartCluster();
Thread.sleep(2000);
((ZooKeeperClusterUtil) zkUtil).stopPeer(2);
((ZooKeeperClusterUtil) zkUtil).stopPeer(3);
}

try (ZooKeeperClient client = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(30000)
.watchers(new HashSet<Watcher>())
.operationRetryPolicy(retryPolicy)
.allowReadOnlyMode(true)
.build()) {
Assert.assertTrue("Client failed to connect a ZooKeeper in read-only mode.",
client.getState().isConnected());
} finally {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "false");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(false);
}
}
}

}

0 comments on commit c30ab3c

Please sign in to comment.