Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ZooKeeper client to establish connection in read-only mode #4244

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE

// Create a watcher manager
StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
ZooKeeperWatcherBase watcherManager =
null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
ZooKeeperWatcherBase watcherManager = (null == watchers)
? new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watcherStatsLogger)
: new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watchers, watcherStatsLogger);
ZooKeeperClient client = new ZooKeeperClient(
connectString,
sessionTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ZooKeeperWatcherBase implements Watcher {
.getLogger(ZooKeeperWatcherBase.class);

private final int zkSessionTimeOut;
private final boolean allowReadOnlyMode;
private volatile CountDownLatch clientConnectLatch = new CountDownLatch(1);
private final CopyOnWriteArraySet<Watcher> childWatchers =
new CopyOnWriteArraySet<Watcher>();
Expand All @@ -53,18 +54,20 @@ public class ZooKeeperWatcherBase implements Watcher {
private final ConcurrentHashMap<EventType, Counter> eventCounters =
new ConcurrentHashMap<EventType, Counter>();

public ZooKeeperWatcherBase(int zkSessionTimeOut) {
this(zkSessionTimeOut, NullStatsLogger.INSTANCE);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode) {
this(zkSessionTimeOut, allowReadOnlyMode, NullStatsLogger.INSTANCE);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut, StatsLogger statsLogger) {
this(zkSessionTimeOut, new HashSet<Watcher>(), statsLogger);
public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode, StatsLogger statsLogger) {
this(zkSessionTimeOut, allowReadOnlyMode, new HashSet<Watcher>(), statsLogger);
}

public ZooKeeperWatcherBase(int zkSessionTimeOut,
boolean allowReadOnlyMode,
Set<Watcher> childWatchers,
StatsLogger statsLogger) {
this.zkSessionTimeOut = zkSessionTimeOut;
this.allowReadOnlyMode = allowReadOnlyMode;
this.childWatchers.addAll(childWatchers);
this.statsLogger = statsLogger;
}
Expand Down Expand Up @@ -130,6 +133,14 @@ public void process(WatchedEvent event) {
LOG.info("ZooKeeper client is connected now.");
clientConnectLatch.countDown();
break;
case ConnectedReadOnly:
if (allowReadOnlyMode) {
LOG.info("ZooKeeper client is connected in read-only mode now.");
clientConnectLatch.countDown();
} else {
LOG.warn("ZooKeeper client is connected in read-only mode, which is not allowed.");
}
break;
case Disconnected:
LOG.info("ZooKeeper client is disconnected from zookeeper now,"
+ " but it is OK unless we received EXPIRED event.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void run() {
byte[] sessionPasswd = bkc.getZkHandle().getSessionPasswd();

try {
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000,
watcher, sessionId, sessionPasswd);
zk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ protected ZooKeeper createZooKeeper() throws IOException {
public void testZKConnectionLossForLedgerCreation() throws Exception {
int zkSessionTimeOut = 10000;
AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID);
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception {
* create MockZooKeeperClient instance and wait for it to be connected.
*/
int zkSessionTimeOut = 10000;
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut,
ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false,
NullStatsLogger.INSTANCE);
MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(),
zkSessionTimeOut, zooKeeperWatcherBase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
default void expireSession(ZooKeeper zk) throws Exception {
long id = zk.getSessionId();
byte[] password = zk.getSessionPasswd();
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000, false);
ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(), zk.getSessionTimeout(), w, id, password);
w.waitForConnection();
zk2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,12 @@ public void killCluster() throws Exception {
public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}

public void stopPeer(int id) throws Exception {
quorumUtil.shutdown(id);
}

public void enableLocalSession(boolean localSessionEnabled) {
quorumUtil.enableLocalSession(localSessionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void process(WatchedEvent event) {
};
final int timeout = 2000;
ZooKeeperWatcherBase watcherManager =
new ZooKeeperWatcherBase(timeout).addChildWatcher(testWatcher);
new ZooKeeperWatcherBase(timeout, false).addChildWatcher(testWatcher);
List<Watcher> watchers = new ArrayList<Watcher>(1);
watchers.add(testWatcher);
ZooKeeperClient client = new ShutdownZkServerClient(
Expand Down Expand Up @@ -895,4 +895,32 @@ public void processResult(int rc, String path, Object ctx) {
logger.info("Delete children from znode " + path);
}

@Test
public void testAllowReadOnlyMode() throws Exception {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "true");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(true);
zkUtil.restartCluster();
Thread.sleep(2000);
((ZooKeeperClusterUtil) zkUtil).stopPeer(2);
((ZooKeeperClusterUtil) zkUtil).stopPeer(3);
}

try (ZooKeeperClient client = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(30000)
.watchers(new HashSet<Watcher>())
.operationRetryPolicy(retryPolicy)
.allowReadOnlyMode(true)
.build()) {
Assert.assertTrue("Client failed to connect a ZooKeeper in read-only mode.",
client.getState().isConnected());
} finally {
if (zkUtil instanceof ZooKeeperClusterUtil) {
System.setProperty("readonlymode.enabled", "false");
((ZooKeeperClusterUtil) zkUtil).enableLocalSession(false);
}
}
}

}