Skip to content

Commit

Permalink
HBASE-26235 We could start RegionServerTracker before becoming active…
Browse files Browse the repository at this point in the history
… master (#3645)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
  • Loading branch information
Apache9 authored Aug 30, 2021
1 parent c4daabd commit 889049e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -203,6 +204,10 @@ public boolean isClientReadable(String path) {
path.equals(tableZNode) || path.startsWith(tableZNode + "/");
}

public String getRsPath(ServerName sn) {
return joinZNode(rsZNode, sn.toString());
}

/**
* Join the prefix znode name with the suffix znode name to generate a proper full znode name.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.HRegionLocation;
Expand Down Expand Up @@ -47,7 +48,7 @@ public interface ConnectionRegistryEndpoint {
/**
* Get all the region servers address.
*/
List<ServerName> getRegionServers();
Collection<ServerName> getRegionServers();

/**
* Get the location of meta regions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Manager and zk listener for master election
private final ActiveMasterManager activeMasterManager;
// Region server tracker
private RegionServerTracker regionServerTracker;
private final RegionServerTracker regionServerTracker;
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
Expand Down Expand Up @@ -489,6 +489,8 @@ public HMaster(final Configuration conf) throws IOException {
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);

cachedClusterId = new CachedClusterId(this, conf);

this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
} catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via reflection and the
// cause of failed startup is lost.
Expand Down Expand Up @@ -928,8 +930,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
// We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
// TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
this.regionServerTracker.start(
this.regionServerTracker.upgrade(
procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
Expand Down Expand Up @@ -2726,8 +2727,8 @@ public int getBackupMasterInfoPort(final ServerName sn) {
}

@Override
public List<ServerName> getRegionServers() {
return serverManager.getOnlineServersList();
public Collection<ServerName> getRegionServers() {
return regionServerTracker.getRegionServers();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,18 +29,17 @@
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
Expand All @@ -61,27 +58,27 @@
@InterfaceAudience.Private
public class RegionServerTracker extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
private final Set<ServerName> regionServers = new HashSet<>();
private final ServerManager serverManager;
// indicate whether we are active master
private boolean active;
private volatile Set<ServerName> regionServers = Collections.emptySet();
private final MasterServices server;
// As we need to send request to zk when processing the nodeChildrenChanged event, we'd better
// move the operation to a single threaded thread pool in order to not block the zk event
// processing since all the zk listener across HMaster will be called in one thread sequentially.
private final ExecutorService executor;

public RegionServerTracker(ZKWatcher watcher, MasterServices server,
ServerManager serverManager) {
public RegionServerTracker(ZKWatcher watcher, MasterServices server) {
super(watcher);
this.server = server;
this.serverManager = serverManager;
this.executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
watcher.registerListener(this);
refresh();
}

private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
throws KeeperException, IOException {
ServerName serverName = ServerName.parseServerName(name);
String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, name);
private RegionServerInfo getServerInfo(ServerName serverName)
throws KeeperException, IOException {
String nodePath = watcher.getZNodePaths().getRsPath(serverName);
byte[] data;
try {
data = ZKUtil.getData(watcher, nodePath);
Expand All @@ -91,24 +88,26 @@ private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
if (data == null) {
// we should receive a children changed event later and then we will expire it, so we still
// need to add it to the region server set.
LOG.warn("Server node {} does not exist, already dead?", name);
return Pair.newPair(serverName, null);
LOG.warn("Server node {} does not exist, already dead?", serverName);
return null;
}
if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) {
// this should not happen actually, unless we have bugs or someone has messed zk up.
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", name,
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", serverName,
data.length);
return Pair.newPair(serverName, null);
return null;
}
RegionServerInfo.Builder builder = RegionServerInfo.newBuilder();
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
return Pair.newPair(serverName, builder.build());
return builder.build();
}

/**
* Starts the tracking of online RegionServers. All RSes will be tracked after this method is
* called.
* Upgrade to active master mode, where besides tracking the changes of region server set, we will
* also started to add new region servers to ServerManager and also schedule SCP if a region
* server dies. Starts the tracking of online RegionServers. All RSes will be tracked after this
* method is called.
* <p/>
* In this method, we will also construct the region server sets in {@link ServerManager}. If a
* region server is dead between the crash of the previous master instance and the start of the
Expand All @@ -119,71 +118,57 @@ private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
* @param liveServersFromWALDir the live region servers from wal directory.
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
*/
public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
Set<ServerName> splittingServersFromWALDir)
throws KeeperException, IOException {
LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " +
"possibly 'live' servers, and {} 'splitting'.", deadServersFromPE.size(),
liveServersFromWALDir.size(), splittingServersFromWALDir.size());
public void upgrade(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
Set<ServerName> splittingServersFromWALDir) throws KeeperException, IOException {
LOG.info(
"Upgrading RegionServerTracker to active master mode; {} have existing" +
"ServerCrashProcedures, {} possibly 'live' servers, and {} 'splitting'.",
deadServersFromPE.size(), liveServersFromWALDir.size(), splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
//create ServerNode for all possible live servers from wal directory
// create ServerNode for all possible live servers from wal directory
liveServersFromWALDir
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
watcher.registerListener(this);
ServerManager serverManager = server.getServerManager();
synchronized (this) {
List<String> servers =
ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
if (null != servers) {
for (String n : servers) {
Pair<ServerName, RegionServerInfo> pair = getServerInfo(n);
ServerName serverName = pair.getFirst();
RegionServerInfo info = pair.getSecond();
regionServers.add(serverName);
ServerMetrics serverMetrics = info != null ?
ServerMetricsBuilder.of(serverName, VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
info.getVersionInfo().getVersion()) :
ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
Set<ServerName> liveServers = regionServers;
for (ServerName serverName : liveServers) {
RegionServerInfo info = getServerInfo(serverName);
ServerMetrics serverMetrics = info != null ? ServerMetricsBuilder.of(serverName,
VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
info.getVersionInfo().getVersion()) : ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
active = true;
}
}

public void stop() {
executor.shutdownNow();
}

private synchronized void refresh() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
// here we need to abort as we failed to set watcher on the rs node which means that we can
// not track the node deleted evetnt any more.
server.abort("Unexpected zk exception getting RS nodes", e);
return;
}
Set<ServerName> servers = CollectionUtils.isEmpty(names) ? Collections.emptySet() :
names.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
public Set<ServerName> getRegionServers() {
return regionServers;
}

for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
ServerName sn = iter.next();
if (!servers.contains(sn)) {
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", sn);
serverManager.expireServer(sn);
iter.remove();
}
// execute the operations which are only needed for active masters, such as expire old servers,
// add new servers, etc.
private void processAsActiveMaster(Set<ServerName> newServers) {
Set<ServerName> oldServers = regionServers;
ServerManager serverManager = server.getServerManager();
// expire dead servers
for (ServerName crashedServer : Sets.difference(oldServers, newServers)) {
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", crashedServer);
serverManager.expireServer(crashedServer);
}
// here we do not need to parse the region server info as it is useless now, we only need the
// server name.
// check whether there are new servers, log them
boolean newServerAdded = false;
for (ServerName sn : servers) {
if (regionServers.add(sn)) {
for (ServerName sn : newServers) {
if (!oldServers.contains(sn)) {
newServerAdded = true;
LOG.info("RegionServer ephemeral node created, adding [" + sn + "]");
}
Expand All @@ -195,6 +180,25 @@ private synchronized void refresh() {
}
}

private synchronized void refresh() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
// here we need to abort as we failed to set watcher on the rs node which means that we can
// not track the node deleted event any more.
server.abort("Unexpected zk exception getting RS nodes", e);
return;
}
Set<ServerName> newServers = CollectionUtils.isEmpty(names) ? Collections.emptySet() :
names.stream().map(ServerName::parseServerName)
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
if (active) {
processAsActiveMaster(newServers);
}
this.regionServers = newServers;
}

@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -686,7 +685,12 @@ public HRegionServer(final Configuration conf) throws IOException {
}
this.rpcServices.start(zooKeeper);
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
if (!(this instanceof HMaster)) {
// do not create this field for HMaster, we have another region server tracker for HMaster.
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
} else {
this.regionServerAddressTracker = null;
}
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
Expand Down Expand Up @@ -3608,7 +3612,7 @@ public int movedRegionCacheExpiredTime() {
}

private String getMyEphemeralNodePath() {
return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
return zooKeeper.getZNodePaths().getRsPath(serverName);
}

private boolean isHealthCheckerConfigured() {
Expand Down Expand Up @@ -3995,7 +3999,7 @@ public List<ServerName> getBackupMasters() {
}

@Override
public List<ServerName> getRegionServers() {
public Collection<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers();
}

Expand Down

0 comments on commit 889049e

Please sign in to comment.