diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index 71936b9f36d0..4d3e7b3c50ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; @@ -203,6 +204,10 @@ public boolean isClientReadable(String path) { path.equals(tableZNode) || path.startsWith(tableZNode + "/"); } + public String getRsPath(ServerName sn) { + return joinZNode(rsZNode, sn.toString()); + } + /** * Join the prefix znode name with the suffix znode name to generate a proper full znode name. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryEndpoint.java index 7e787d78994b..420c6d6b98e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryEndpoint.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collection; import java.util.List; import java.util.Optional; import org.apache.hadoop.hbase.HRegionLocation; @@ -47,7 +48,7 @@ public interface ConnectionRegistryEndpoint { /** * Get all the region servers address. */ - List getRegionServers(); + Collection getRegionServers(); /** * Get the location of meta regions. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3fc4cc302e56..0bf160bb8afb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -275,7 +275,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Manager and zk listener for master election private final ActiveMasterManager activeMasterManager; // Region server tracker - private RegionServerTracker regionServerTracker; + private final RegionServerTracker regionServerTracker; // Draining region server tracker private DrainingServerTracker drainingServerTracker; // Tracker for load balancer state @@ -489,6 +489,8 @@ public HMaster(final Configuration conf) throws IOException { this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this); cachedClusterId = new CachedClusterId(this, conf); + + this.regionServerTracker = new RegionServerTracker(zooKeeper, this); } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. @@ -928,8 +930,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out). // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker. // TODO: Generate the splitting and live Set in one pass instead of two as we currently do. - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); - this.regionServerTracker.start( + this.regionServerTracker.upgrade( procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream() .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()), walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir()); @@ -2726,8 +2727,8 @@ public int getBackupMasterInfoPort(final ServerName sn) { } @Override - public List getRegionServers() { - return serverManager.getOnlineServersList(); + public Collection getRegionServers() { + return regionServerTracker.getRegionServers(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 336f9dc04f8e..65cc7ae57f13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -31,18 +29,17 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.VersionInfoUtil; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; @@ -61,27 +58,27 @@ @InterfaceAudience.Private public class RegionServerTracker extends ZKListener { private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class); - private final Set regionServers = new HashSet<>(); - private final ServerManager serverManager; + // indicate whether we are active master + private boolean active; + private volatile Set regionServers = Collections.emptySet(); private final MasterServices server; // As we need to send request to zk when processing the nodeChildrenChanged event, we'd better // move the operation to a single threaded thread pool in order to not block the zk event // processing since all the zk listener across HMaster will be called in one thread sequentially. private final ExecutorService executor; - public RegionServerTracker(ZKWatcher watcher, MasterServices server, - ServerManager serverManager) { + public RegionServerTracker(ZKWatcher watcher, MasterServices server) { super(watcher); this.server = server; - this.serverManager = serverManager; this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); + watcher.registerListener(this); + refresh(); } - private Pair getServerInfo(String name) - throws KeeperException, IOException { - ServerName serverName = ServerName.parseServerName(name); - String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, name); + private RegionServerInfo getServerInfo(ServerName serverName) + throws KeeperException, IOException { + String nodePath = watcher.getZNodePaths().getRsPath(serverName); byte[] data; try { data = ZKUtil.getData(watcher, nodePath); @@ -91,24 +88,26 @@ private Pair getServerInfo(String name) if (data == null) { // we should receive a children changed event later and then we will expire it, so we still // need to add it to the region server set. - LOG.warn("Server node {} does not exist, already dead?", name); - return Pair.newPair(serverName, null); + LOG.warn("Server node {} does not exist, already dead?", serverName); + return null; } if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) { // this should not happen actually, unless we have bugs or someone has messed zk up. - LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", name, + LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", serverName, data.length); - return Pair.newPair(serverName, null); + return null; } RegionServerInfo.Builder builder = RegionServerInfo.newBuilder(); int magicLen = ProtobufUtil.lengthOfPBMagic(); ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen); - return Pair.newPair(serverName, builder.build()); + return builder.build(); } /** - * Starts the tracking of online RegionServers. All RSes will be tracked after this method is - * called. + * Upgrade to active master mode, where besides tracking the changes of region server set, we will + * also started to add new region servers to ServerManager and also schedule SCP if a region + * server dies. Starts the tracking of online RegionServers. All RSes will be tracked after this + * method is called. *

* In this method, we will also construct the region server sets in {@link ServerManager}. If a * region server is dead between the crash of the previous master instance and the start of the @@ -119,38 +118,32 @@ private Pair getServerInfo(String name) * @param liveServersFromWALDir the live region servers from wal directory. * @param splittingServersFromWALDir Servers whose WALs are being actively 'split'. */ - public void start(Set deadServersFromPE, Set liveServersFromWALDir, - Set splittingServersFromWALDir) - throws KeeperException, IOException { - LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " + - "possibly 'live' servers, and {} 'splitting'.", deadServersFromPE.size(), - liveServersFromWALDir.size(), splittingServersFromWALDir.size()); + public void upgrade(Set deadServersFromPE, Set liveServersFromWALDir, + Set splittingServersFromWALDir) throws KeeperException, IOException { + LOG.info( + "Upgrading RegionServerTracker to active master mode; {} have existing" + + "ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.", + deadServersFromPE.size(), liveServersFromWALDir.size(), splittingServersFromWALDir.size()); // deadServersFromPE is made from a list of outstanding ServerCrashProcedures. // splittingServersFromWALDir are being actively split -- the directory in the FS ends in // '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not. splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)). forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s)); - //create ServerNode for all possible live servers from wal directory + // create ServerNode for all possible live servers from wal directory liveServersFromWALDir .forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn)); - watcher.registerListener(this); + ServerManager serverManager = server.getServerManager(); synchronized (this) { - List servers = - ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); - if (null != servers) { - for (String n : servers) { - Pair pair = getServerInfo(n); - ServerName serverName = pair.getFirst(); - RegionServerInfo info = pair.getSecond(); - regionServers.add(serverName); - ServerMetrics serverMetrics = info != null ? - ServerMetricsBuilder.of(serverName, VersionInfoUtil.getVersionNumber(info.getVersionInfo()), - info.getVersionInfo().getVersion()) : - ServerMetricsBuilder.of(serverName); - serverManager.checkAndRecordNewServer(serverName, serverMetrics); - } + Set liveServers = regionServers; + for (ServerName serverName : liveServers) { + RegionServerInfo info = getServerInfo(serverName); + ServerMetrics serverMetrics = info != null ? ServerMetricsBuilder.of(serverName, + VersionInfoUtil.getVersionNumber(info.getVersionInfo()), + info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName); + serverManager.checkAndRecordNewServer(serverName, serverMetrics); } serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir); + active = true; } } @@ -158,32 +151,24 @@ public void stop() { executor.shutdownNow(); } - private synchronized void refresh() { - List names; - try { - names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); - } catch (KeeperException e) { - // here we need to abort as we failed to set watcher on the rs node which means that we can - // not track the node deleted evetnt any more. - server.abort("Unexpected zk exception getting RS nodes", e); - return; - } - Set servers = CollectionUtils.isEmpty(names) ? Collections.emptySet() : - names.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); + public Set getRegionServers() { + return regionServers; + } - for (Iterator iter = regionServers.iterator(); iter.hasNext();) { - ServerName sn = iter.next(); - if (!servers.contains(sn)) { - LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", sn); - serverManager.expireServer(sn); - iter.remove(); - } + // execute the operations which are only needed for active masters, such as expire old servers, + // add new servers, etc. + private void processAsActiveMaster(Set newServers) { + Set oldServers = regionServers; + ServerManager serverManager = server.getServerManager(); + // expire dead servers + for (ServerName crashedServer : Sets.difference(oldServers, newServers)) { + LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", crashedServer); + serverManager.expireServer(crashedServer); } - // here we do not need to parse the region server info as it is useless now, we only need the - // server name. + // check whether there are new servers, log them boolean newServerAdded = false; - for (ServerName sn : servers) { - if (regionServers.add(sn)) { + for (ServerName sn : newServers) { + if (!oldServers.contains(sn)) { newServerAdded = true; LOG.info("RegionServer ephemeral node created, adding [" + sn + "]"); } @@ -195,6 +180,25 @@ private synchronized void refresh() { } } + private synchronized void refresh() { + List names; + try { + names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); + } catch (KeeperException e) { + // here we need to abort as we failed to set watcher on the rs node which means that we can + // not track the node deleted event any more. + server.abort("Unexpected zk exception getting RS nodes", e); + return; + } + Set newServers = CollectionUtils.isEmpty(names) ? Collections.emptySet() : + names.stream().map(ServerName::parseServerName) + .collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet)); + if (active) { + processAsActiveMaster(newServers); + } + this.regionServers = newServers; + } + @Override public void nodeChildrenChanged(String path) { if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() && diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 13379a0deed1..a7a3b7dd6db5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -188,7 +188,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -686,7 +685,12 @@ public HRegionServer(final Configuration conf) throws IOException { } this.rpcServices.start(zooKeeper); this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); - this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this); + if (!(this instanceof HMaster)) { + // do not create this field for HMaster, we have another region server tracker for HMaster. + this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this); + } else { + this.regionServerAddressTracker = null; + } // This violates 'no starting stuff in Constructor' but Master depends on the below chore // and executor being created and takes a different startup route. Lots of overlap between HRS // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super @@ -3608,7 +3612,7 @@ public int movedRegionCacheExpiredTime() { } private String getMyEphemeralNodePath() { - return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString()); + return zooKeeper.getZNodePaths().getRsPath(serverName); } private boolean isHealthCheckerConfigured() { @@ -3995,7 +3999,7 @@ public List getBackupMasters() { } @Override - public List getRegionServers() { + public Collection getRegionServers() { return regionServerAddressTracker.getRegionServers(); }