Skip to content

Commit

Permalink
HBASE-23324 Deprecate clients that connect to Zookeeper
Browse files Browse the repository at this point in the history
Our objective is to remove ZooKeeper from our public interface, remaking it as an internal
concern. Connecting to a cluster via ZooKeeper quorum will be considered deprecated starting in
2.6. Our default connection mechanism will switch to via RPC in 3.0 And finally we intend to
remove the ZooKeeper connection mechanism from client-facing APIs in 4.0.
  • Loading branch information
ndimiduk committed Mar 6, 2024
1 parent 2306820 commit ec6004a
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,235 +17,38 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;

/**
* Zookeeper based registry implementation.
* @deprecated As of 2.6.0, replaced by {@link RpcConnectionRegistry}, which is the default
* connection mechanism as of 3.0.0. Expected to be removed in 4.0.0.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-23324">HBASE-23324</a> and its parent
* ticket for details.
*/
@InterfaceAudience.Private
class ZKConnectionRegistry implements ConnectionRegistry {

@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
class ZKConnectionRegistry extends ZKConnectionRegistryInternal {
private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);

private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;

// User not used, but for rpc based registry we need it
ZKConnectionRegistry(Configuration conf, User user) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
}

private interface Converter<T> {
T convert(byte[] data) throws Exception;
}

private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
addListener(zk.get(path), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
private static final Object WARN_LOCK = new Object();
private static volatile boolean NEEDS_LOG_WARN = true;

ZKConnectionRegistry(Configuration conf, User ignored) {
super(conf, ignored);
if (NEEDS_LOG_WARN) {
synchronized (WARN_LOCK) {
if (NEEDS_LOG_WARN) {
LOG.warn(
"ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry");
NEEDS_LOG_WARN = false;
}
}
try {
future.complete(converter.convert(data));
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}

private static String getClusterId(byte[] data) throws DeserializationException {
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
return ClusterId.parseFrom(data).toString();
}

@Override
public CompletableFuture<String> getClusterId() {
return tracedFuture(
() -> getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId),
"ZKConnectionRegistry.getClusterId");
}

ReadOnlyZKClient getZKClient() {
return zk;
}

private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
int prefixLen = lengthOfPBMagic();
return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
data.length - prefixLen);
}

private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
CompletableFuture<RegionLocations> future) {
remaining.decrement();
if (remaining.intValue() > 0) {
return;
}
future.complete(new RegionLocations(locs));
}

private Pair<RegionState.State, ServerName>
getStateAndServerName(ZooKeeperProtos.MetaRegionServer proto) {
RegionState.State state;
if (proto.hasState()) {
state = RegionState.State.convert(proto.getState());
} else {
state = RegionState.State.OPEN;
}
HBaseProtos.ServerName snProto = proto.getServer();
return Pair.newPair(state,
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
}

private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
List<String> metaReplicaZNodes) {
if (metaReplicaZNodes.isEmpty()) {
future.completeExceptionally(new IOException("No meta znode available"));
}
HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
MutableInt remaining = new MutableInt(locs.length);
for (String metaReplicaZNode : metaReplicaZNodes) {
int replicaId = znodePaths.getMetaReplicaIdFromZNode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (proto == null) {
future.completeExceptionally(new IOException("Meta znode is null"));
return;
}
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region is in state " + stateAndServerName.getFirst());
}
locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO), stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) {
return;
}
if (error != null) {
LOG.warn("Failed to fetch " + path, error);
locs[replicaId] = null;
} else if (proto == null) {
LOG.warn("Meta znode for replica " + replicaId + " is null");
locs[replicaId] = null;
} else {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state "
+ stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] =
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);
});
}
}
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
}, "ZKConnectionRegistry.getMetaRegionLocations");
}

private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
data = removeMetaData(data);
int prefixLen = lengthOfPBMagic();
return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return tracedFuture(
() -> getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
}
HBaseProtos.ServerName snProto = proto.getMaster();
return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
snProto.getStartCode());
}),
"ZKConnectionRegistry.getActiveMaster");
}

@Override
public String getConnectionString() {
final String serverList = zk.getConnectString();
final String baseZNode = znodePaths.baseZNode;
return serverList + ":" + baseZNode;
}

@Override
public void close() {
zk.close();
}
}
Loading

0 comments on commit ec6004a

Please sign in to comment.