Skip to content

Commit

Permalink
HBASE-28425 Allow specify cluster key without zookeeper in replication (
Browse files Browse the repository at this point in the history
#5865)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Reviewed-by: Andor Molnár <andor@apache.org>
  • Loading branch information
Apache9 authored May 24, 2024
1 parent 65ff7a2 commit c5e6d82
Show file tree
Hide file tree
Showing 54 changed files with 352 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.ServiceLoader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -35,7 +37,7 @@
* The entry point for creating a {@link ConnectionRegistry}.
*/
@InterfaceAudience.Private
final class ConnectionRegistryFactory {
public final class ConnectionRegistryFactory {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryFactory.class);

Expand Down Expand Up @@ -90,4 +92,46 @@ static ConnectionRegistry create(Configuration conf, User user) {
RpcConnectionRegistry.class, ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf, user);
}

/**
* Check whether the given {@code uri} is valid.
* <p/>
* Notice that there is no fallback logic for this method, so passing an URI with null scheme can
* not pass.
* @throws IOException if this is not a valid connection registry URI
*/
public static void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getScheme())) {
throw new IOException("No schema for uri: " + uri);
}
ConnectionRegistryURIFactory factory = FACTORIES.get(uri.getScheme().toLowerCase(Locale.ROOT));
if (factory == null) {
throw new IOException(
"No factory registered for scheme " + uri.getScheme() + ", uri: " + uri);
}
factory.validate(uri);
}

/**
* If the given {@code clusterKey} can be parsed to a {@link URI}, and the scheme of the
* {@link URI} is supported by us, return the {@link URI}, otherwise return {@code null}.
* @param clusterKey the cluster key, typically from replication peer config
* @return a {@link URI} or {@code null}.
*/
public static URI tryParseAsConnectionURI(String clusterKey) {
// The old cluster key format may not be parsed as URI if we use ip address as the zookeeper
// address, so here we need to catch the URISyntaxException and return false
URI uri;
try {
uri = new URI(clusterKey);
} catch (URISyntaxException e) {
LOG.debug("failed to parse cluster key to URI: {}", clusterKey, e);
return null;
}
if (FACTORIES.containsKey(uri.getScheme())) {
return uri;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ public interface ConnectionRegistryURIFactory {
* {@link ConnectionRegistryFactory}.
*/
String getScheme();

/**
* Validate the given {@code uri}.
* @throws IOException if this is not a valid connection registry URI.
*/
void validate(URI uri) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -3785,15 +3786,17 @@ private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableN

private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
ReplicationPeerDescription peer) {
Configuration peerConf = null;
Configuration peerConf;
try {
peerConf =
ReplicationPeerConfigUtil.getPeerClusterConfiguration(connection.getConfiguration(), peer);
peerConf = ReplicationPeerConfigUtil
.getPeerClusterConfiguration(connection.getConfiguration(), peer.getPeerConfig());
} catch (IOException e) {
return failedFuture(e);
}
URI connectionUri =
ConnectionRegistryFactory.tryParseAsConnectionURI(peer.getPeerConfig().getClusterKey());
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(ConnectionFactory.createAsyncConnection(peerConf), (conn, err) -> {
addListener(ConnectionFactory.createAsyncConnection(connectionUri, peerConf), (conn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -46,4 +47,12 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws
public String getScheme() {
return "hbase+rpc";
}

@Override
public void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getAuthority())) {
throw new IOException("no bootstrap nodes specified, uri: " + uri);
}
// TODO: add more check about the bootstrap nodes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -49,4 +50,15 @@ public ConnectionRegistry create(URI uri, Configuration conf, User user) throws
public String getScheme() {
return "hbase+zk";
}

@Override
public void validate(URI uri) throws IOException {
if (StringUtils.isBlank(uri.getAuthority())) {
throw new IOException("no zookeeper quorum specified, uri: " + uri);
}
// TODO: add more check about the zookeeper quorum
if (StringUtils.isBlank(uri.getPath())) {
throw new IOException("no zookeeper parent path specified, uri: " + uri);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
Expand Down Expand Up @@ -629,19 +630,19 @@ public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConf

/**
* Returns the configuration needed to talk to the remote slave cluster.
* @param conf the base configuration
* @param peer the description of replication peer
* @param conf the base configuration
* @param peerConfig the peer config of replication peer
* @return the configuration for the peer cluster, null if it was unable to get the configuration
* @throws IOException when create peer cluster configuration failed
*/
public static Configuration getPeerClusterConfiguration(Configuration conf,
ReplicationPeerDescription peer) throws IOException {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
ReplicationPeerConfig peerConfig) throws IOException {
Configuration otherConf;
try {
if (ConnectionRegistryFactory.tryParseAsConnectionURI(peerConfig.getClusterKey()) != null) {
otherConf = HBaseConfiguration.create(conf);
} else {
// only need to apply cluster key for old style cluster key
otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
Expand Down Expand Up @@ -226,11 +227,11 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
String overridePrefix) throws IOException {
Configuration clusterConf = HBaseConfiguration.create(baseConf);
if (clusterKey != null && !clusterKey.isEmpty()) {
if (!StringUtils.isBlank(clusterKey)) {
applyClusterKeyToConf(clusterConf, clusterKey);
}

if (overridePrefix != null && !overridePrefix.isEmpty()) {
if (!StringUtils.isBlank(overridePrefix)) {
Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
HBaseConfiguration.merge(clusterConf, clusterSubset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
Expand All @@ -55,7 +56,6 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -397,7 +397,7 @@ public boolean isAborted() {
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig));
} catch (ReplicationException e) {
throw new IOException("An error occurred while trying to connect to the remote peer cluster",
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ public class TestVerifyReplication extends TestReplicationBase {
@Rule
public TestName name = new TestName();

@Override
protected String getClusterKey(HBaseTestingUtil util) throws Exception {
// TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
// key, as in this test we will pass the cluster key config in peer config directly to
// VerifyReplication job.
return util.getClusterKey();
}

@Before
public void setUp() throws Exception {
cleanUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public class TestVerifyReplicationAdjunct extends TestReplicationBase {
@Rule
public TestName name = new TestName();

@Override
protected String getClusterKey(HBaseTestingUtil util) throws Exception {
// TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster
// key, as in this test we will pass the cluster key config in peer config directly to
// VerifyReplication job.
return util.getClusterKey();
}

@Before
public void setUp() throws Exception {
cleanUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -155,8 +157,15 @@ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationExceptio
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
Configuration peerClusterConf;
try {
peerClusterConf = ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig);
} catch (IOException e) {
throw new ReplicationException(
"failed to apply cluster key to configuration for peer config " + peerConfig, e);
}
return new ReplicationPeerImpl(peerClusterConf, peerId, peerConfig, enabled,
syncReplicationState, newSyncReplicationState);
}

@Override
Expand All @@ -166,8 +175,8 @@ public void onConfigurationChange(Configuration conf) {
for (ReplicationPeerImpl peer : peerCache.values()) {
try {
peer.onConfigurationChange(
ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
} catch (ReplicationException e) {
ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig()));
} catch (IOException e) {
LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -61,25 +59,6 @@ public final class ReplicationUtils {
private ReplicationUtils() {
}

public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}

private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
if (c1 == null) {
return c2 == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.fail;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.Test;
Expand Down Expand Up @@ -86,8 +87,8 @@ public void testReplicationPeers() throws Exception {
SyncReplicationState.NONE);
assertNumberOfPeers(2);

assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeerConfigUtil
.getPeerClusterConfiguration(rp.getConf(), rp.getPeerStorage().getPeerConfig(ID_ONE))));
rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE);
assertNumberOfPeers(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -68,6 +69,20 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
localAddress, user);
}

/**
* Create a new {@link AsyncClusterConnection} instance.
* <p/>
* This is usually used in replication, the given {@code uri} specifies the connection info of the
* remote cluster.
*/
public static AsyncClusterConnection createAsyncClusterConnection(URI uri, Configuration conf,
SocketAddress localAddress, User user) throws IOException {
ConnectionRegistry registry = uri != null
? ConnectionRegistryFactory.create(uri, conf, user)
: ConnectionRegistryFactory.create(conf, user);
return createAsyncClusterConnection(conf, registry, localAddress, user);
}

/**
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
* {@link ConnectionRegistryEndpoint}.
Expand Down
Loading

0 comments on commit c5e6d82

Please sign in to comment.