@@ -203,6 +203,8 @@ public static class Builder {
203203 private int nameNodeHttpPort = 0 ;
204204 private final Configuration conf ;
205205 private int numDataNodes = 1 ;
206+ private int [] dnHttpPorts = null ;
207+ private int [] dnIpcPorts = null ;
206208 private StorageType [][] storageTypes = null ;
207209 private StorageType [] storageTypes1D = null ;
208210 private int storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE ;
@@ -277,6 +279,16 @@ public Builder numDataNodes(int val) {
277279 return this ;
278280 }
279281
282+ public Builder setDnHttpPorts (int ... ports ) {
283+ this .dnHttpPorts = ports ;
284+ return this ;
285+ }
286+
287+ public Builder setDnIpcPorts (int ... ports ) {
288+ this .dnIpcPorts = ports ;
289+ return this ;
290+ }
291+
280292 /**
281293 * Default: DEFAULT_STORAGES_PER_DATANODE
282294 */
@@ -599,7 +611,9 @@ protected MiniDFSCluster(Builder builder) throws IOException {
599611 builder .checkDataNodeHostConfig ,
600612 builder .dnConfOverlays ,
601613 builder .skipFsyncForTesting ,
602- builder .useConfiguredTopologyMappingClass );
614+ builder .useConfiguredTopologyMappingClass ,
615+ builder .dnHttpPorts ,
616+ builder .dnIpcPorts );
603617 }
604618
605619 public static class DataNodeProperties {
@@ -873,7 +887,7 @@ public MiniDFSCluster(int nameNodePort,
873887 operation , null , racks , hosts ,
874888 null , simulatedCapacities , null , true , false ,
875889 MiniDFSNNTopology .simpleSingleNN (nameNodePort , 0 ),
876- true , false , false , null , true , false );
890+ true , false , false , null , true , false , null , null );
877891 }
878892
879893 private void initMiniDFSCluster (
@@ -891,7 +905,9 @@ private void initMiniDFSCluster(
891905 boolean checkDataNodeHostConfig ,
892906 Configuration [] dnConfOverlays ,
893907 boolean skipFsyncForTesting ,
894- boolean useConfiguredTopologyMappingClass )
908+ boolean useConfiguredTopologyMappingClass ,
909+ int [] dnHttpPorts ,
910+ int [] dnIpcPorts )
895911 throws IOException {
896912 boolean success = false ;
897913 try {
@@ -974,9 +990,9 @@ private void initMiniDFSCluster(
974990
975991 // Start the DataNodes
976992 startDataNodes (conf , numDataNodes , storageTypes , manageDataDfsDirs ,
977- dnStartOpt != null ? dnStartOpt : startOpt ,
978- racks , hosts , storageCapacities , simulatedCapacities , setupHostsFile ,
979- checkDataNodeAddrConfig , checkDataNodeHostConfig , dnConfOverlays );
993+ dnStartOpt != null ? dnStartOpt : startOpt , racks , hosts , storageCapacities ,
994+ simulatedCapacities , setupHostsFile , checkDataNodeAddrConfig , checkDataNodeHostConfig ,
995+ dnConfOverlays , dnHttpPorts , dnIpcPorts );
980996 waitClusterUp ();
981997 //make sure ProxyUsers uses the latest conf
982998 ProxyUsers .refreshSuperUserGroupsConfiguration (conf );
@@ -1598,8 +1614,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
15981614 String [] racks , String [] hosts ,
15991615 long [] simulatedCapacities ,
16001616 boolean setupHostsFile ) throws IOException {
1601- startDataNodes (conf , numDataNodes , null , manageDfsDirs , operation , racks , hosts ,
1602- null , simulatedCapacities , setupHostsFile , false , false , null );
1617+ startDataNodes (conf , numDataNodes , null , manageDfsDirs , operation , racks , hosts , null ,
1618+ simulatedCapacities , setupHostsFile , false , false , null , null , null );
16031619 }
16041620
16051621 public synchronized void startDataNodes (Configuration conf , int numDataNodes ,
@@ -1608,14 +1624,14 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
16081624 long [] simulatedCapacities ,
16091625 boolean setupHostsFile ,
16101626 boolean checkDataNodeAddrConfig ) throws IOException {
1611- startDataNodes (conf , numDataNodes , null , manageDfsDirs , operation , racks , hosts ,
1612- null , simulatedCapacities , setupHostsFile , checkDataNodeAddrConfig , false , null );
1627+ startDataNodes (conf , numDataNodes , null , manageDfsDirs , operation , racks , hosts , null ,
1628+ simulatedCapacities , setupHostsFile , checkDataNodeAddrConfig , false , null , null , null );
16131629 }
16141630
16151631 /**
16161632 * Modify the config and start up additional DataNodes. The info port for
16171633 * DataNodes is guaranteed to use a free port.
1618- *
1634+ *
16191635 * Data nodes can run with the name node in the mini cluster or
16201636 * a real name node. For example, running with a real name node is useful
16211637 * when running simulated data nodes with a real name node.
@@ -1625,20 +1641,24 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
16251641 * @param conf the base configuration to use in starting the DataNodes. This
16261642 * will be modified as necessary.
16271643 * @param numDataNodes Number of DataNodes to start; may be zero
1644+ * @param storageTypes Storage Types for DataNodes.
16281645 * @param manageDfsDirs if true, the data directories for DataNodes will be
16291646 * created and {@link DFSConfigKeys#DFS_DATANODE_DATA_DIR_KEY} will be
16301647 * set in the conf
16311648 * @param operation the operation with which to start the DataNodes. If null
16321649 * or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
16331650 * @param racks array of strings indicating the rack that each DataNode is on
16341651 * @param hosts array of strings indicating the hostnames for each DataNode
1652+ * @param storageCapacities array of Storage Capacities to be used while testing.
16351653 * @param simulatedCapacities array of capacities of the simulated data nodes
16361654 * @param setupHostsFile add new nodes to dfs hosts files
16371655 * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
16381656 * @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
16391657 * @param dnConfOverlays An array of {@link Configuration} objects that will overlay the
16401658 * global MiniDFSCluster Configuration for the corresponding DataNode.
1641- * @throws IllegalStateException if NameNode has been shutdown
1659+ * @param dnHttpPorts An array of Http ports if present, to be used for DataNodes.
1660+ * @param dnIpcPorts An array of Ipc ports if present, to be used for DataNodes.
1661+ * @throws IOException If the DFS daemons experience some issues.
16421662 */
16431663 public synchronized void startDataNodes (Configuration conf , int numDataNodes ,
16441664 StorageType [][] storageTypes , boolean manageDfsDirs , StartupOption operation ,
@@ -1648,14 +1668,29 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
16481668 boolean setupHostsFile ,
16491669 boolean checkDataNodeAddrConfig ,
16501670 boolean checkDataNodeHostConfig ,
1651- Configuration [] dnConfOverlays ) throws IOException {
1671+ Configuration [] dnConfOverlays ,
1672+ int [] dnHttpPorts ,
1673+ int [] dnIpcPorts ) throws IOException {
16521674 assert storageCapacities == null || simulatedCapacities == null ;
16531675 assert storageTypes == null || storageTypes .length == numDataNodes ;
16541676 assert storageCapacities == null || storageCapacities .length == numDataNodes ;
16551677
16561678 if (operation == StartupOption .RECOVER ) {
16571679 return ;
16581680 }
1681+
1682+ if (dnHttpPorts != null && dnHttpPorts .length != numDataNodes ) {
1683+ throw new IllegalArgumentException (
1684+ "Num of http ports (" + dnHttpPorts .length + ") should match num of DataNodes ("
1685+ + numDataNodes + ")" );
1686+ }
1687+
1688+ if (dnIpcPorts != null && dnIpcPorts .length != numDataNodes ) {
1689+ throw new IllegalArgumentException (
1690+ "Num of ipc ports (" + dnIpcPorts .length + ") should match num of DataNodes ("
1691+ + numDataNodes + ")" );
1692+ }
1693+
16591694 if (checkDataNodeHostConfig ) {
16601695 conf .setIfUnset (DFS_DATANODE_HOST_NAME_KEY , "127.0.0.1" );
16611696 } else {
@@ -1711,7 +1746,15 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
17111746 dnConf .addResource (dnConfOverlays [i ]);
17121747 }
17131748 // Set up datanode address
1714- setupDatanodeAddress (dnConf , setupHostsFile , checkDataNodeAddrConfig );
1749+ int httpPort = 0 ;
1750+ int ipcPort = 0 ;
1751+ if (dnHttpPorts != null ) {
1752+ httpPort = dnHttpPorts [i - curDatanodesNum ];
1753+ }
1754+ if (dnIpcPorts != null ) {
1755+ ipcPort = dnIpcPorts [i - curDatanodesNum ];
1756+ }
1757+ setupDatanodeAddress (dnConf , setupHostsFile , checkDataNodeAddrConfig , httpPort , ipcPort );
17151758 if (manageDfsDirs ) {
17161759 String dirs = makeDataNodeDirs (i , storageTypes == null ?
17171760 null : storageTypes [i - curDatanodesNum ]);
@@ -3363,9 +3406,9 @@ public void setBlockRecoveryTimeout(long timeout) {
33633406 timeout );
33643407 }
33653408 }
3366-
3409+
33673410 protected void setupDatanodeAddress (Configuration conf , boolean setupHostsFile ,
3368- boolean checkDataNodeAddrConfig ) throws IOException {
3411+ boolean checkDataNodeAddrConfig , int httpPort , int ipcPort ) throws IOException {
33693412 if (setupHostsFile ) {
33703413 String hostsFile = conf .get (DFS_HOSTS , "" ).trim ();
33713414 if (hostsFile .length () == 0 ) {
@@ -3388,11 +3431,11 @@ protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
33883431 }
33893432 }
33903433 if (checkDataNodeAddrConfig ) {
3391- conf .setIfUnset (DFS_DATANODE_HTTP_ADDRESS_KEY , "127.0.0.1:0" );
3392- conf .setIfUnset (DFS_DATANODE_IPC_ADDRESS_KEY , "127.0.0.1:0" );
3434+ conf .setIfUnset (DFS_DATANODE_HTTP_ADDRESS_KEY , "127.0.0.1:" + httpPort );
3435+ conf .setIfUnset (DFS_DATANODE_IPC_ADDRESS_KEY , "127.0.0.1:" + ipcPort );
33933436 } else {
3394- conf .set (DFS_DATANODE_HTTP_ADDRESS_KEY , "127.0.0.1:0" );
3395- conf .set (DFS_DATANODE_IPC_ADDRESS_KEY , "127.0.0.1:0" );
3437+ conf .set (DFS_DATANODE_HTTP_ADDRESS_KEY , "127.0.0.1:" + httpPort );
3438+ conf .set (DFS_DATANODE_IPC_ADDRESS_KEY , "127.0.0.1:" + ipcPort );
33963439 }
33973440 }
33983441
0 commit comments