Skip to content

Commit 11dc186

Browse files
committed
HDFS-17361. DiskBalancer: Query command support with multiple nodes
1 parent 4046751 commit 11dc186

File tree

4 files changed

+91
-34
lines changed

4 files changed

+91
-34
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.hadoop.hdfs.server.diskbalancer.command;
2121

22+
import org.apache.commons.text.TextStringBuilder;
2223
import org.apache.hadoop.util.Preconditions;
2324
import org.apache.commons.cli.CommandLine;
2425
import org.apache.commons.cli.HelpFormatter;
@@ -30,6 +31,11 @@
3031
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
3132
import org.apache.hadoop.net.NetUtils;
3233

34+
import java.io.PrintStream;
35+
import java.util.Collections;
36+
import java.util.Set;
37+
import java.util.TreeSet;
38+
3339
/**
3440
* Gets the current status of disk balancer command.
3541
*/
@@ -41,9 +47,13 @@ public class QueryCommand extends Command {
4147
* @param conf - Configuration.
4248
*/
4349
public QueryCommand(Configuration conf) {
44-
super(conf);
50+
this(conf, System.out);
51+
}
52+
53+
public QueryCommand(Configuration conf, final PrintStream ps) {
54+
super(conf, ps);
4555
addValidCommandParameters(DiskBalancerCLI.QUERY,
46-
"Queries the status of disk plan running on a given datanode.");
56+
"Queries the status of disk plan running on given datanode(s).");
4757
addValidCommandParameters(DiskBalancerCLI.VERBOSE,
4858
"Prints verbose results.");
4959
}
@@ -56,52 +66,72 @@ public QueryCommand(Configuration conf) {
5666
@Override
5767
public void execute(CommandLine cmd) throws Exception {
5868
LOG.info("Executing \"query plan\" command.");
69+
TextStringBuilder result = new TextStringBuilder();
5970
Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
6071
verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
61-
String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
62-
Preconditions.checkNotNull(nodeName);
63-
nodeName = nodeName.trim();
64-
String nodeAddress = nodeName;
65-
66-
// if the string is not name:port format use the default port.
67-
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
68-
int defaultIPC = NetUtils.createSocketAddr(
69-
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
70-
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
71-
nodeAddress = nodeName + ":" + defaultIPC;
72-
LOG.debug("Using default data node port : {}", nodeAddress);
72+
String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY);
73+
Preconditions.checkNotNull(nodeVal);
74+
nodeVal = nodeVal.trim();
75+
Set<String> resultSet = new TreeSet<>();
76+
String[] nodes = nodeVal.split(",");
77+
if (nodes.length == 0) {
78+
String warnMsg = "The number of input nodes is 0. "
79+
+ "Please input the valid nodes.";
80+
throw new DiskBalancerException(warnMsg,
81+
DiskBalancerException.Result.INVALID_NODE);
7382
}
7483

75-
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
76-
try {
77-
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
78-
System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
79-
workStatus.getPlanFile(),
80-
workStatus.getPlanID(),
81-
workStatus.getResult().toString());
84+
Collections.addAll(resultSet, nodes);
85+
String outputLine = String.format(
86+
"Get current status of the diskbalancer for DataNode(s). "
87+
+ "These DataNode(s) are parsed from '%s'.", nodeVal);
88+
recordOutput(result, outputLine);
89+
for (String nodeName : resultSet) {
90+
// if the string is not name:port format use the default port.
91+
String nodeAddress = nodeName;
92+
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
93+
int defaultIPC = NetUtils.createSocketAddr(
94+
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
95+
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
96+
nodeAddress = nodeName + ":" + defaultIPC;
97+
LOG.debug("Using default data node port : {}", nodeAddress);
98+
}
8299

83-
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
84-
System.out.printf("%s", workStatus.currentStateString());
100+
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
101+
try {
102+
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
103+
outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: %s%nResult: %s%n",
104+
nodeAddress,
105+
workStatus.getPlanFile(),
106+
workStatus.getPlanID(),
107+
workStatus.getResult().toString());
108+
result.append(outputLine);
109+
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
110+
outputLine = String.format("%s", workStatus.currentStateString());
111+
result.append(outputLine);
112+
}
113+
result.append(System.lineSeparator());
114+
} catch (DiskBalancerException ex) {
115+
LOG.error("Query plan failed by {}", nodeAddress, ex);
116+
throw ex;
85117
}
86-
} catch (DiskBalancerException ex) {
87-
LOG.error("Query plan failed.", ex);
88-
throw ex;
89118
}
119+
getPrintStream().println(result.toString());
90120
}
91121

92122
/**
93123
* Gets extended help for this command.
94124
*/
95125
@Override
96126
public void printHelp() {
97-
String header = "Query Plan queries a given data node about the " +
127+
String header = "Query Plan queries given datanode(s) about the " +
98128
"current state of disk balancer execution.\n\n";
99129

100130
String footer = "\nQuery command retrievs the plan ID and the current " +
101131
"running state. ";
102-
103132
HelpFormatter helpFormatter = new HelpFormatter();
104-
helpFormatter.printHelp("hdfs diskbalancer -query <hostname> [options]",
133+
helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> " +
134+
" [options]",
105135
header, DiskBalancerCLI.getQueryOptions(), footer);
106136
}
107137
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ private void addQueryCommands(Options opt) {
378378
Option query = Option.builder().longOpt(QUERY)
379379
.hasArg()
380380
.desc("Queries the disk balancer " +
381-
"status of a given datanode.")
381+
"status of given datanode(s).")
382382
.build();
383383
getQueryOptions().addOption(query);
384384
opt.addOption(query);
@@ -387,7 +387,7 @@ private void addQueryCommands(Options opt) {
387387
// added to global table.
388388
Option verbose = Option.builder().longOpt(VERBOSE)
389389
.desc("Prints details of the plan that is being executed " +
390-
"on the node.")
390+
"on the datanode(s).")
391391
.build();
392392
getQueryOptions().addOption(verbose);
393393
}
@@ -482,7 +482,7 @@ private int dispatch(CommandLine cmd)
482482
}
483483

484484
if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
485-
dbCmd = new QueryCommand(getConf());
485+
dbCmd = new QueryCommand(getConf(), this.printStream);
486486
}
487487

488488
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {

hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ So, query command can help to get the current status of execute command.
8686

8787
### Query
8888

89-
Query command gets the current status of the diskbalancer from a datanode.
89+
Query command gets the current status of the diskbalancer from specified node(s).
9090

91-
`hdfs diskbalancer -query nodename.mycluster.com`
91+
`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...`
9292

9393
| COMMAND\_OPTION | Description |
9494
|:---- |:---- |

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,33 @@ private List<String> runCommand(
814814
return runCommandInternal(cmdLine, clusterConf);
815815
}
816816

817+
/**
818+
* Making sure that we can query the multiple nodes without having done a submit.
819+
* @throws Exception
820+
*/
821+
@Test
822+
public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception {
823+
Configuration conf = new HdfsConfiguration();
824+
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
825+
final int numDatanodes = 2;
826+
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
827+
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir)
828+
.numDataNodes(numDatanodes).build();
829+
try {
830+
miniDFSCluster.waitActive();
831+
DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
832+
DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
833+
final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1
834+
.getIpcPort(), dataNode2.getIpcPort());
835+
final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
836+
List<String> outputs = runCommand(cmdLine);
837+
assertTrue(outputs.get(1).contains("localhost:" + dataNode1.getIpcPort()));
838+
assertTrue(outputs.get(6).contains("localhost:" + dataNode2.getIpcPort()));
839+
} finally {
840+
miniDFSCluster.shutdown();
841+
}
842+
}
843+
817844
/**
818845
* Making sure that we can query the node without having done a submit.
819846
* @throws Exception

0 commit comments

Comments
 (0)