Skip to content

Commit 03d9aca

Browse files
authored
HDFS-17361. DiskBalancer: Query command support with multiple nodes (#6508)
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
1 parent 9751b6e commit 03d9aca

File tree

4 files changed

+94
-38
lines changed

4 files changed

+94
-38
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.hadoop.hdfs.server.diskbalancer.command;
2121

22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.commons.text.TextStringBuilder;
2224
import org.apache.hadoop.util.Preconditions;
2325
import org.apache.commons.cli.CommandLine;
2426
import org.apache.commons.cli.HelpFormatter;
@@ -30,6 +32,11 @@
3032
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
3133
import org.apache.hadoop.net.NetUtils;
3234

35+
import java.io.PrintStream;
36+
import java.util.Collections;
37+
import java.util.Set;
38+
import java.util.TreeSet;
39+
3340
/**
3441
* Gets the current status of disk balancer command.
3542
*/
@@ -41,9 +48,13 @@ public class QueryCommand extends Command {
4148
* @param conf - Configuration.
4249
*/
4350
public QueryCommand(Configuration conf) {
44-
super(conf);
51+
this(conf, System.out);
52+
}
53+
54+
public QueryCommand(Configuration conf, final PrintStream ps) {
55+
super(conf, ps);
4556
addValidCommandParameters(DiskBalancerCLI.QUERY,
46-
"Queries the status of disk plan running on a given datanode.");
57+
"Queries the status of disk plan running on given datanode(s).");
4758
addValidCommandParameters(DiskBalancerCLI.VERBOSE,
4859
"Prints verbose results.");
4960
}
@@ -56,52 +67,70 @@ public QueryCommand(Configuration conf) {
5667
@Override
5768
public void execute(CommandLine cmd) throws Exception {
5869
LOG.info("Executing \"query plan\" command.");
70+
TextStringBuilder result = new TextStringBuilder();
5971
Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
6072
verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
61-
String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
62-
Preconditions.checkNotNull(nodeName);
63-
nodeName = nodeName.trim();
64-
String nodeAddress = nodeName;
65-
66-
// if the string is not name:port format use the default port.
67-
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
68-
int defaultIPC = NetUtils.createSocketAddr(
69-
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
70-
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
71-
nodeAddress = nodeName + ":" + defaultIPC;
72-
LOG.debug("Using default data node port : {}", nodeAddress);
73+
String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY);
74+
if (StringUtils.isBlank(nodeVal)) {
75+
String warnMsg = "The number of input nodes is 0. "
76+
+ "Please input the valid nodes.";
77+
throw new DiskBalancerException(warnMsg,
78+
DiskBalancerException.Result.INVALID_NODE);
7379
}
80+
nodeVal = nodeVal.trim();
81+
Set<String> resultSet = new TreeSet<>();
82+
String[] nodes = nodeVal.split(",");
83+
Collections.addAll(resultSet, nodes);
84+
String outputLine = String.format(
85+
"Get current status of the diskbalancer for DataNode(s). "
86+
+ "These DataNode(s) are parsed from '%s'.", nodeVal);
87+
recordOutput(result, outputLine);
88+
for (String nodeName : resultSet) {
89+
// if the string is not name:port format use the default port.
90+
String nodeAddress = nodeName;
91+
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
92+
int defaultIPC = NetUtils.createSocketAddr(
93+
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
94+
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
95+
nodeAddress = nodeName + ":" + defaultIPC;
96+
LOG.debug("Using default data node port : {}", nodeAddress);
97+
}
7498

75-
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
76-
try {
77-
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
78-
System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
79-
workStatus.getPlanFile(),
80-
workStatus.getPlanID(),
81-
workStatus.getResult().toString());
82-
83-
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
84-
System.out.printf("%s", workStatus.currentStateString());
99+
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
100+
try {
101+
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
102+
outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: %s%nResult: %s%n",
103+
nodeAddress,
104+
workStatus.getPlanFile(),
105+
workStatus.getPlanID(),
106+
workStatus.getResult().toString());
107+
result.append(outputLine);
108+
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
109+
outputLine = String.format("%s", workStatus.currentStateString());
110+
result.append(outputLine);
111+
}
112+
result.append(System.lineSeparator());
113+
} catch (DiskBalancerException ex) {
114+
LOG.error("Query plan failed by {}", nodeAddress, ex);
115+
throw ex;
85116
}
86-
} catch (DiskBalancerException ex) {
87-
LOG.error("Query plan failed.", ex);
88-
throw ex;
89117
}
118+
getPrintStream().println(result);
90119
}
91120

92121
/**
93122
* Gets extended help for this command.
94123
*/
95124
@Override
96125
public void printHelp() {
97-
String header = "Query Plan queries a given data node about the " +
126+
String header = "Query Plan queries given datanode(s) about the " +
98127
"current state of disk balancer execution.\n\n";
99128

100129
String footer = "\nQuery command retrievs the plan ID and the current " +
101130
"running state. ";
102-
103131
HelpFormatter helpFormatter = new HelpFormatter();
104-
helpFormatter.printHelp("hdfs diskbalancer -query <hostname> [options]",
132+
helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> " +
133+
" [options]",
105134
header, DiskBalancerCLI.getQueryOptions(), footer);
106135
}
107136
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ private void addQueryCommands(Options opt) {
378378
Option query = Option.builder().longOpt(QUERY)
379379
.hasArg()
380380
.desc("Queries the disk balancer " +
381-
"status of a given datanode.")
381+
"status of given datanode(s).")
382382
.build();
383383
getQueryOptions().addOption(query);
384384
opt.addOption(query);
@@ -387,7 +387,7 @@ private void addQueryCommands(Options opt) {
387387
// added to global table.
388388
Option verbose = Option.builder().longOpt(VERBOSE)
389389
.desc("Prints details of the plan that is being executed " +
390-
"on the node.")
390+
"on the datanode(s).")
391391
.build();
392392
getQueryOptions().addOption(verbose);
393393
}
@@ -482,7 +482,7 @@ private int dispatch(CommandLine cmd)
482482
}
483483

484484
if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
485-
dbCmd = new QueryCommand(getConf());
485+
dbCmd = new QueryCommand(getConf(), this.printStream);
486486
}
487487

488488
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {

hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ So, query command can help to get the current status of execute command.
8686

8787
### Query
8888

89-
Query command gets the current status of the diskbalancer from a datanode.
89+
Query command gets the current status of the diskbalancer from specified node(s).
9090

91-
`hdfs diskbalancer -query nodename.mycluster.com`
91+
`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...`
9292

9393
| COMMAND\_OPTION | Description |
9494
|:---- |:---- |

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -814,17 +814,44 @@ private List<String> runCommand(
814814
return runCommandInternal(cmdLine, clusterConf);
815815
}
816816

817+
/**
818+
* Making sure that we can query the multiple nodes without having done a submit.
819+
* @throws Exception
820+
*/
821+
@Test
822+
public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception {
823+
Configuration hdfsConf = new HdfsConfiguration();
824+
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
825+
final int numDatanodes = 2;
826+
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
827+
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
828+
.numDataNodes(numDatanodes).build();
829+
try {
830+
miniDFSCluster.waitActive();
831+
DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
832+
DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
833+
final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1
834+
.getIpcPort(), dataNode2.getIpcPort());
835+
final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
836+
List<String> outputs = runCommand(cmdLine);
837+
assertThat(outputs.get(1), containsString("localhost:" + dataNode1.getIpcPort()));
838+
assertThat(outputs.get(6), containsString("localhost:" + dataNode2.getIpcPort()));
839+
} finally {
840+
miniDFSCluster.shutdown();
841+
}
842+
}
843+
817844
/**
818845
* Making sure that we can query the node without having done a submit.
819846
* @throws Exception
820847
*/
821848
@Test
822849
public void testDiskBalancerQueryWithoutSubmit() throws Exception {
823-
Configuration conf = new HdfsConfiguration();
824-
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
850+
Configuration hdfsConf = new HdfsConfiguration();
851+
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
825852
final int numDatanodes = 2;
826853
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
827-
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir)
854+
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
828855
.numDataNodes(numDatanodes).build();
829856
try {
830857
miniDFSCluster.waitActive();

0 commit comments

Comments
 (0)