Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.server.diskbalancer.command;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
Expand All @@ -30,6 +32,11 @@
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.net.NetUtils;

import java.io.PrintStream;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;

/**
* Gets the current status of disk balancer command.
*/
Expand All @@ -41,9 +48,13 @@ public class QueryCommand extends Command {
* @param conf - Configuration.
*/
public QueryCommand(Configuration conf) {
super(conf);
this(conf, System.out);
}

public QueryCommand(Configuration conf, final PrintStream ps) {
super(conf, ps);
addValidCommandParameters(DiskBalancerCLI.QUERY,
"Queries the status of disk plan running on a given datanode.");
"Queries the status of disk plan running on given datanode(s).");
addValidCommandParameters(DiskBalancerCLI.VERBOSE,
"Prints verbose results.");
}
Expand All @@ -56,52 +67,70 @@ public QueryCommand(Configuration conf) {
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"query plan\" command.");
TextStringBuilder result = new TextStringBuilder();
Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
Preconditions.checkNotNull(nodeName);
nodeName = nodeName.trim();
String nodeAddress = nodeName;

// if the string is not name:port format use the default port.
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
int defaultIPC = NetUtils.createSocketAddr(
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
nodeAddress = nodeName + ":" + defaultIPC;
LOG.debug("Using default data node port : {}", nodeAddress);
String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY);
if (StringUtils.isBlank(nodeVal)) {
String warnMsg = "The number of input nodes is 0. "
+ "Please input the valid nodes.";
throw new DiskBalancerException(warnMsg,
DiskBalancerException.Result.INVALID_NODE);
}
nodeVal = nodeVal.trim();
Set<String> resultSet = new TreeSet<>();
String[] nodes = nodeVal.split(",");
Collections.addAll(resultSet, nodes);
String outputLine = String.format(
"Get current status of the diskbalancer for DataNode(s). "
+ "These DataNode(s) are parsed from '%s'.", nodeVal);
recordOutput(result, outputLine);
for (String nodeName : resultSet) {
// if the string is not name:port format use the default port.
String nodeAddress = nodeName;
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
int defaultIPC = NetUtils.createSocketAddr(
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
nodeAddress = nodeName + ":" + defaultIPC;
LOG.debug("Using default data node port : {}", nodeAddress);
}

ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
try {
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
workStatus.getPlanFile(),
workStatus.getPlanID(),
workStatus.getResult().toString());

if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
System.out.printf("%s", workStatus.currentStateString());
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
try {
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: %s%nResult: %s%n",
nodeAddress,
workStatus.getPlanFile(),
workStatus.getPlanID(),
workStatus.getResult().toString());
result.append(outputLine);
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
outputLine = String.format("%s", workStatus.currentStateString());
result.append(outputLine);
}
result.append(System.lineSeparator());
} catch (DiskBalancerException ex) {
LOG.error("Query plan failed by {}", nodeAddress, ex);
throw ex;
}
} catch (DiskBalancerException ex) {
LOG.error("Query plan failed.", ex);
throw ex;
}
getPrintStream().println(result);
}

/**
* Gets extended help for this command.
*/
@Override
public void printHelp() {
String header = "Query Plan queries a given data node about the " +
String header = "Query Plan queries given datanode(s) about the " +
"current state of disk balancer execution.\n\n";

String footer = "\nQuery command retrievs the plan ID and the current " +
"running state. ";

HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -query <hostname> [options]",
helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> " +
" [options]",
header, DiskBalancerCLI.getQueryOptions(), footer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ private void addQueryCommands(Options opt) {
Option query = Option.builder().longOpt(QUERY)
.hasArg()
.desc("Queries the disk balancer " +
"status of a given datanode.")
"status of given datanode(s).")
.build();
getQueryOptions().addOption(query);
opt.addOption(query);
Expand All @@ -387,7 +387,7 @@ private void addQueryCommands(Options opt) {
// added to global table.
Option verbose = Option.builder().longOpt(VERBOSE)
.desc("Prints details of the plan that is being executed " +
"on the node.")
"on the datanode(s).")
.build();
getQueryOptions().addOption(verbose);
}
Expand Down Expand Up @@ -482,7 +482,7 @@ private int dispatch(CommandLine cmd)
}

if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
dbCmd = new QueryCommand(getConf());
dbCmd = new QueryCommand(getConf(), this.printStream);
}

if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ So, query command can help to get the current status of execute command.

### Query

Query command gets the current status of the diskbalancer from a datanode.
Query command gets the current status of the diskbalancer from specified node(s).

`hdfs diskbalancer -query nodename.mycluster.com`
`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...`

| COMMAND\_OPTION | Description |
|:---- |:---- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,17 +814,44 @@ private List<String> runCommand(
return runCommandInternal(cmdLine, clusterConf);
}

/**
* Making sure that we can query the multiple nodes without having done a submit.
* @throws Exception
*/
@Test
public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception {
Configuration hdfsConf = new HdfsConfiguration();
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int numDatanodes = 2;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
.numDataNodes(numDatanodes).build();
try {
miniDFSCluster.waitActive();
DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1
.getIpcPort(), dataNode2.getIpcPort());
final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
List<String> outputs = runCommand(cmdLine);
assertThat(outputs.get(1), containsString("localhost:" + dataNode1.getIpcPort()));
assertThat(outputs.get(6), containsString("localhost:" + dataNode2.getIpcPort()));
} finally {
miniDFSCluster.shutdown();
}
}

/**
* Making sure that we can query the node without having done a submit.
* @throws Exception
*/
@Test
public void testDiskBalancerQueryWithoutSubmit() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
Configuration hdfsConf = new HdfsConfiguration();
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
final int numDatanodes = 2;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir)
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
.numDataNodes(numDatanodes).build();
try {
miniDFSCluster.waitActive();
Expand Down