Skip to content

Commit

Permalink
CURATOR-704. Add server compatibility check support (#497)
Browse files Browse the repository at this point in the history
Add new interface ZookeeperCompatibility to represent server compatibility and the existing Compatibility class (which represents client compatibility).

Enhance CuratorFramework to accept ZookeeperCompatibility instance, allowing users to specify which server version to target (default is LATEST).
  • Loading branch information
laurentgo authored May 2, 2024
1 parent 972fffa commit 82f2e53
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.curator.utils;

/**
* Describe feature supports based on server compatibility (as opposed to
* {@code Compatibility} which represents client compatibility.
*/
public class ZookeeperCompatibility {
/**
* Represent latest version with all features enabled
*/
public static final ZookeeperCompatibility LATEST =
builder().hasPersistentWatchers(true).build();

public static Builder builder() {
return new Builder();
}

public static class Builder {
// List of features introduced by Zookeeper over time.
// All values are set to false by default for backward compatibility
private boolean hasPersistentWatchers = false;

public Builder hasPersistentWatchers(boolean value) {
this.hasPersistentWatchers = value;
return this;
}

public boolean hasPersistentWatchers() {
return this.hasPersistentWatchers;
}

public ZookeeperCompatibility build() {
return new ZookeeperCompatibility(this);
}
}

private final boolean hasPersistentWatchers;

private ZookeeperCompatibility(Builder builder) {
this.hasPersistentWatchers = builder.hasPersistentWatchers;
}

/**
* Check if both client and server support persistent watchers
*
* @return {@code true} if both the client library and the server version
* support persistent watchers
*/
public boolean hasPersistentWatchers() {
return this.hasPersistentWatchers && Compatibility.hasPersistentWatchers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;

Expand Down Expand Up @@ -262,6 +263,13 @@ public interface CuratorFramework extends Closeable {
*/
public CuratorZookeeperClient getZookeeperClient();

/**
* Return zookeeper server compatibility
*
* @return compatibility
*/
public ZookeeperCompatibility getZookeeperCompatibility();

/**
* Allocates an ensure path instance that is namespace aware
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory;
import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
Expand Down Expand Up @@ -174,6 +175,7 @@ public static class Builder {
ConnectionStateListenerManagerFactory.standard;
private int simulatedSessionExpirationPercent = 100;
private ZKClientConfig zkClientConfig;
private ZookeeperCompatibility zookeeperCompatibility = ZookeeperCompatibility.LATEST;

/**
* Apply the current values and build a new CuratorFramework
Expand Down Expand Up @@ -519,6 +521,11 @@ public Builder connectionStateListenerManagerFactory(
return this;
}

public Builder zookeeperCompatibility(ZookeeperCompatibility zookeeperCompatibility) {
this.zookeeperCompatibility = zookeeperCompatibility;
return this;
}

public Executor getRunSafeService() {
return runSafeService;
}
Expand Down Expand Up @@ -640,6 +647,10 @@ public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFa
return connectionStateListenerManagerFactory;
}

public ZookeeperCompatibility getZookeeperCompatibility() {
return zookeeperCompatibility;
}

private Builder() {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateManager;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
Expand Down Expand Up @@ -117,6 +117,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
private final ZookeeperCompatibility zookeeperCompatibility;

private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
Expand Down Expand Up @@ -204,6 +205,7 @@ public void process(WatchedEvent watchedEvent) {
builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null;

runSafeService = makeRunSafeService(builder);
zookeeperCompatibility = builder.getZookeeperCompatibility();
}

private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) {
Expand Down Expand Up @@ -292,6 +294,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
schemaSet = parent.schemaSet;
ensembleTracker = parent.ensembleTracker;
runSafeService = parent.runSafeService;
zookeeperCompatibility = parent.zookeeperCompatibility;
}

@Override
Expand Down Expand Up @@ -585,8 +588,8 @@ public RemoveWatchesBuilder watches() {
@Override
public WatchesBuilder watchers() {
Preconditions.checkState(
Compatibility.hasPersistentWatchers(),
"watchers() is not supported in the ZooKeeper library being used. Use watches() instead.");
zookeeperCompatibility.hasPersistentWatchers(),
"watchers() is not supported in the ZooKeeper library and/or server being used. Use watches() instead.");
return new WatchesBuilderImpl(this);
}

Expand All @@ -600,6 +603,11 @@ public CuratorZookeeperClient getZookeeperClient() {
return client;
}

@Override
public ZookeeperCompatibility getZookeeperCompatibility() {
return zookeeperCompatibility;
}

@Override
public EnsurePath newNamespaceAwareEnsurePath(String path) {
return namespace.newNamespaceAwareEnsurePath(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.concurrent.ExecutorService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.Compatibility;
import org.slf4j.LoggerFactory;

class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder {
Expand Down Expand Up @@ -57,7 +56,7 @@ public CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorSer

@Override
public CuratorCacheBridge build() {
if (!forceTreeCache && Compatibility.hasPersistentWatchers()) {
if (!forceTreeCache && client.getZookeeperCompatibility().hasPersistentWatchers()) {
if (executorService != null) {
LoggerFactory.getLogger(getClass()).warn("CuratorCache does not support custom ExecutorService");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -33,6 +34,12 @@ public void testIsZk35() {
assertFalse(Compatibility.hasGetReachableOrOneMethod());
assertTrue(Compatibility.hasAddrField());
assertFalse(Compatibility.hasPersistentWatchers());
assertFalse(ZookeeperCompatibility.LATEST.hasPersistentWatchers());
assertFalse(ZookeeperCompatibility.builder().build().hasPersistentWatchers());
assertFalse(ZookeeperCompatibility.builder()
.hasPersistentWatchers(false)
.build()
.hasPersistentWatchers());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.apache.curator.zk36;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -33,6 +35,12 @@ public void testIsZk36() {
assertTrue(Compatibility.hasGetReachableOrOneMethod());
assertTrue(Compatibility.hasAddrField());
assertTrue(Compatibility.hasPersistentWatchers());
assertTrue(ZookeeperCompatibility.LATEST.hasPersistentWatchers());
assertFalse(ZookeeperCompatibility.builder().build().hasPersistentWatchers());
assertFalse(ZookeeperCompatibility.builder()
.hasPersistentWatchers(false)
.build()
.hasPersistentWatchers());
try {
Class.forName("org.apache.zookeeper.proto.WhoAmIResponse");
fail("WhoAmIResponse is introduced after ZooKeeper 3.7");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.curator.framework.imps.CuratorMultiTransactionImpl;
import org.apache.curator.framework.imps.GetACLBuilderImpl;
import org.apache.curator.framework.imps.SyncBuilderImpl;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.AsyncStage;
import org.apache.curator.x.async.WatchMode;
Expand Down Expand Up @@ -140,8 +139,8 @@ public AsyncRemoveWatchesBuilder removeWatches() {
@Override
public AsyncWatchBuilder addWatch() {
Preconditions.checkState(
Compatibility.hasPersistentWatchers(),
"addWatch() is not supported in the ZooKeeper library being used.");
client.getZookeeperCompatibility().hasPersistentWatchers(),
"addWatch() is not supported in the ZooKeeper library and/or server being used.");
return new AsyncWatchBuilderImpl(client, filters);
}

Expand Down

0 comments on commit 82f2e53

Please sign in to comment.