Skip to content

Commit

Permalink
HBASE-27567 Introduce ChaosMonkey Action to print HDFS Cluster status
Browse files Browse the repository at this point in the history
Signed-off-by: Reid Chan <reidchan@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
ndimiduk committed Jan 16, 2023
1 parent ae1ec90 commit 6244891
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos.actions;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DumpHdfsClusterStatusAction extends Action {
private static final Logger LOG = LoggerFactory.getLogger(DumpHdfsClusterStatusAction.class);
private static final String PREFIX = "\n ";

@Override
protected Logger getLogger() {
return LOG;
}

@Override
public void perform() throws Exception {
StringBuilder sb = new StringBuilder();
try (final DistributedFileSystem dfs = HdfsActionUtils.createDfs(getConf())) {
final Configuration dfsConf = dfs.getConf();
final URI dfsUri = dfs.getUri();
final boolean isHaAndLogicalUri = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
sb.append("Cluster status").append('\n');
if (isHaAndLogicalUri) {
final String nsId = dfsUri.getHost();
final List<ClientProtocol> namenodes =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, nsId);
final boolean atLeastOneActive = HAUtil.isAtLeastOneActive(namenodes);
final InetSocketAddress activeAddress = HAUtil.getAddressOfActive(dfs);
sb.append("Active NameNode=").append(activeAddress).append(", isAtLeastOneActive=")
.append(atLeastOneActive).append('\n');
}
DatanodeInfo[] dns = dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
sb.append("Number of live DataNodes: ").append(dns.length);
for (DatanodeInfo dni : dns) {
sb.append(PREFIX).append("name=").append(dni.getName()).append(", used%=")
.append(dni.getDfsUsedPercent()).append(", capacity=")
.append(FileUtils.byteCountToDisplaySize(dni.getCapacity()));
}
sb.append('\n');
dns = dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.DEAD);
sb.append("Number of dead DataNodes: ").append(dns.length);
for (DatanodeInfo dni : dns) {
sb.append(PREFIX).append(dni.getName()).append("/").append(dni.getNetworkLocation());
}
}
// TODO: add more on NN, JNs, and ZK.
// TODO: Print how long process has been up.
getLogger().info(sb.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos.actions;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;

/**
* Configuration common across the HDFS Actions.
*/
public final class HdfsActionUtils {

private HdfsActionUtils() {
}

/**
* Specify a user as whom HDFS actions should be run. The chaos process must have permissions
* sufficient to assume the role of the specified user.
* @see <a href=
* "https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/Superusers.html">Proxy
* user - Superusers Acting On Behalf Of Other Users</a>
*/
public static final String HDFS_USER_CONF_KEY = "org.apache.hadoop.hbase.chaos.actions.hdfs_user";

private static DistributedFileSystem createUnproxiedDfs(final Configuration conf)
throws IOException {
final Path rootDir = CommonFSUtils.getRootDir(conf);
final FileSystem fs = rootDir.getFileSystem(conf);
return (DistributedFileSystem) fs;
}

/**
* Create an instance of {@link DistributedFileSystem} that honors {@value HDFS_USER_CONF_KEY}.
*/
static DistributedFileSystem createDfs(final Configuration conf) throws IOException {
final String proxyUser = conf.get(HDFS_USER_CONF_KEY);
if (proxyUser == null) {
return createUnproxiedDfs(conf);
}
final UserGroupInformation proxyUgi =
UserGroupInformation.createProxyUser(proxyUser, UserGroupInformation.getLoginUser());
try {
return proxyUgi
.doAs((PrivilegedExceptionAction<DistributedFileSystem>) () -> createUnproxiedDfs(conf));
} catch (InterruptedException e) {
final InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.setStackTrace(e.getStackTrace());
throw iioe;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
*/
package org.apache.hadoop.hbase.chaos.actions;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,39 +59,51 @@ protected Logger getLogger() {
@Override
public void perform() throws Exception {
getLogger().info("Performing action: Restart active namenode");
Configuration conf = CommonFSUtils.getRootDir(getConf()).getFileSystem(getConf()).getConf();
String nameServiceID = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameServiceID)) {
throw new Exception("HA for namenode is not enabled");
}
ZKWatcher zkw = null;
RecoverableZooKeeper rzk = null;

final String hadoopHAZkNode;
String activeNamenode = null;
String hadoopHAZkNode = conf.get(ZK_PARENT_ZNODE_KEY, ZK_PARENT_ZNODE_DEFAULT);
try {
zkw = new ZKWatcher(conf, "get-active-namenode", null);
rzk = zkw.getRecoverableZooKeeper();
String hadoopHAZkNodePath = ZNodePaths.joinZNode(hadoopHAZkNode, nameServiceID);
List<String> subChildern = ZKUtil.listChildrenNoWatch(zkw, hadoopHAZkNodePath);
for (String eachEntry : subChildern) {
if (eachEntry.contains(ACTIVE_NN_LOCK_NAME)) {
int activeNamenodePort = -1;
try (final DistributedFileSystem dfs = HdfsActionUtils.createDfs(getConf())) {
final Configuration conf = dfs.getConf();
hadoopHAZkNode = conf.get(ZK_PARENT_ZNODE_KEY, ZK_PARENT_ZNODE_DEFAULT);
final String nameServiceID = DFSUtil.getNamenodeNameServiceId(conf);

if (!HAUtil.isHAEnabled(conf, nameServiceID)) {
getLogger().info("HA for HDFS is not enabled; skipping");
return;
}
try (final ZKWatcher zkw = new ZKWatcher(conf, "get-active-namenode", null)) {
final RecoverableZooKeeper rzk = zkw.getRecoverableZooKeeper();
// If hadoopHAZkNode == '/', pass '' instead because then joinZNode will return '//' as a
// prefix
// which zk doesn't like as a prefix on the path.
final String hadoopHAZkNodePath = ZNodePaths.joinZNode(
(hadoopHAZkNode != null && hadoopHAZkNode.equals("/")) ? "" : hadoopHAZkNode,
nameServiceID);
final List<String> subChildren =
Optional.ofNullable(ZKUtil.listChildrenNoWatch(zkw, hadoopHAZkNodePath))
.orElse(Collections.emptyList());
for (final String eachEntry : subChildren) {
if (!eachEntry.contains(ACTIVE_NN_LOCK_NAME)) {
continue;
}
byte[] data =
rzk.getData(ZNodePaths.joinZNode(hadoopHAZkNodePath, ACTIVE_NN_LOCK_NAME), false, null);
ActiveNodeInfo proto = ActiveNodeInfo.parseFrom(data);
activeNamenode = proto.getHostname();
activeNamenodePort = proto.getPort();
}
}
} finally {
if (zkw != null) {
zkw.close();
}
}

if (activeNamenode == null) {
throw new Exception("No active Name node found in zookeeper under " + hadoopHAZkNode);
getLogger().info("No active Name node found in zookeeper under '{}'", hadoopHAZkNode);
return;
}
getLogger().info("Found active namenode host:" + activeNamenode);
ServerName activeNNHost = ServerName.valueOf(activeNamenode, -1, -1);
getLogger().info("Restarting Active NameNode :" + activeNamenode);
restartNameNode(activeNNHost, sleepTime);

getLogger().info("Found Active NameNode host: {}", activeNamenode);
final ServerName activeNNHost = ServerName.valueOf(activeNamenode, activeNamenodePort, -1L);
getLogger().info("Restarting Active NameNode: {}", activeNamenode);
restartNameNode(activeNNHost, this.sleepTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
package org.apache.hadoop.hbase.chaos.actions;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Arrays;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,18 +45,15 @@ protected Logger getLogger() {
@Override
public void perform() throws Exception {
getLogger().info("Performing action: Restart random data node");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getDataNodes());
final ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getDataNodes());
restartDataNode(server, sleepTime);
}

public ServerName[] getDataNodes() throws IOException {
DistributedFileSystem fs =
(DistributedFileSystem) CommonFSUtils.getRootDir(getConf()).getFileSystem(getConf());
DFSClient dfsClient = fs.getClient();
List<ServerName> hosts = new LinkedList<>();
for (DatanodeInfo dataNode : dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE)) {
hosts.add(ServerName.valueOf(dataNode.getHostName(), -1, -1));
private ServerName[] getDataNodes() throws IOException {
try (final DistributedFileSystem dfs = HdfsActionUtils.createDfs(getConf())) {
final DFSClient dfsClient = dfs.getClient();
return Arrays.stream(dfsClient.datanodeReport(HdfsConstants.DatanodeReportType.LIVE))
.map(dn -> ServerName.valueOf(dn.getHostName(), -1, -1)).toArray(ServerName[]::new);
}
return hosts.toArray(new ServerName[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.hadoop.hbase.chaos.actions.Action;
import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction;
import org.apache.hadoop.hbase.chaos.actions.DumpHdfsClusterStatusAction;
import org.apache.hadoop.hbase.chaos.actions.ForceBalancerAction;
import org.apache.hadoop.hbase.chaos.actions.GracefulRollingRestartRsAction;
import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
import org.apache.hadoop.hbase.chaos.actions.RestartActiveNameNodeAction;
import org.apache.hadoop.hbase.chaos.actions.RestartRandomDataNodeAction;
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
import org.apache.hadoop.hbase.chaos.actions.RestartRandomZKNodeAction;
Expand Down Expand Up @@ -55,6 +57,7 @@ public ChaosMonkey build() {
// only allow 2 servers to be dead.
new RollingBatchRestartRsAction(5000, 1.0f, 2, true),
new ForceBalancerAction(),
new RestartActiveNameNodeAction(60000),
new RestartRandomDataNodeAction(60000),
new RestartRandomZKNodeAction(60000),
new GracefulRollingRestartRsAction(gracefulRollingRestartTSSLeepTime),
Expand All @@ -64,7 +67,8 @@ public ChaosMonkey build() {
// @formatter:on

// Action to log more info for debugging
Action[] actions2 = new Action[] { new DumpClusterStatusAction() };
Action[] actions2 =
new Action[] { new DumpClusterStatusAction(), new DumpHdfsClusterStatusAction() };

return new PolicyBasedChaosMonkey(properties, util,
new CompositeSequentialPolicy(new DoActionsOncePolicy(60 * 1000, actions1),
Expand Down

0 comments on commit 6244891

Please sign in to comment.