diff --git a/NOTICE b/NOTICE index 7c3ca52840..2171bef1f0 100644 --- a/NOTICE +++ b/NOTICE @@ -3,3 +3,230 @@ All Rights Reserved. Licensed under the BSD 2-Clause License (the "License"). See LICENSE in the project root for license information. + +================================================================================ + +This product automatically loads third party code from an external repository +using the Gradle build system. Such third party code is subject to other license +terms than as set forth above. Please review the complete list of dependencies for +applicable license terms. + +In addition, such third party code may also depend on and load multiple tiers of +dependencies. Please review the applicable licenses of the additional dependencies. + +================================================================================ + +This product bundles a modified version of the +org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset class from +Apache Hadoop (http://hadoop.apache.org/) 2.7.4, which has the following notice: + +This product includes software developed by The Apache Software +Foundation (http://www.apache.org/). + +License: Apache 2.0 + +================================================================================ + +See below for the Apache 2.0 license in its entirety. + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 6855f9f8fa..fc43db67f9 100644 --- a/README.md +++ b/README.md @@ -163,10 +163,8 @@ First launch the infrastructure application to begin the startup of the internal -conf_path my-hadoop-conf -fs_image_dir hdfs:///fsimage -block_list_path hdfs:///dyno/blocks - -datanode_layout_version -56 ``` -This demonstrates the required arguments. `-datanode_layout_version` is necessary to determine how the block -files should be laid out on the DataNodes that are launched. +This demonstrates the required arguments. You can run this with the `-help` flag to see further usage information. The client will track the Dyno-NN's startup progress and how many Dyno-DNs it considers live. It will notify via logging when the Dyno-NN has exited safemode and is ready for use. @@ -200,11 +198,10 @@ launch an integrated application with the same parameters as were used above, th -conf_path my-hadoop-conf -fs_image_dir hdfs:///fsimage -block_list_path hdfs:///dyno/blocks - -datanode_layout_version -56 -workload_replay_enable -workload_input_path hdfs:///dyno/audit_logs/ -workload_threads_per_mapper 50 -workload_start_delay 5m ``` When run in this way, the client will automatically handle tearing down the Dyno-HDFS cluster once the -workload has completed. +workload has completed. To see the full list of supported parameters, run this with the `-help` flag. diff --git a/build.gradle b/build.gradle index 60663f10dc..296ef141c2 100644 --- a/build.gradle +++ b/build.gradle @@ -23,6 +23,7 @@ ext.deps = [ 'yarn-common': "org.apache.hadoop:hadoop-yarn-common:${hadoopVersion}", 'mapreduce-client-core': "org.apache.hadoop:hadoop-mapreduce-client-core:${hadoopVersion}", minicluster: "org.apache.hadoop:hadoop-minicluster:${hadoopVersion}", + 'hdfs-test': "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}:tests", ] ] diff --git a/dynamometer-infra/build.gradle b/dynamometer-infra/build.gradle index 7c0555e157..e9eb624020 100644 --- a/dynamometer-infra/build.gradle +++ b/dynamometer-infra/build.gradle @@ -9,6 +9,7 @@ dependencies { compile deps.hadoop.'yarn-api' compile deps.hadoop.'yarn-client' compile deps.hadoop.'yarn-common' + compile deps.hadoop.'hdfs-test' compile project(':dynamometer-workload') testCompile project(path: ':dynamometer-workload', configuration: 'testArtifacts') diff --git a/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh b/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh index c089234f24..07bc0d8179 100755 --- a/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh +++ b/dynamometer-infra/src/main/bash/create-slim-hadoop-tar.sh @@ -27,7 +27,7 @@ hadoopShare="$baseDir/share/hadoop" # Remove unnecessary files rm -rf ${baseDir}/share/doc ${hadoopShare}/mapreduce ${hadoopShare}/yarn \ ${hadoopShare}/kms ${hadoopShare}/tools ${hadoopShare}/httpfs \ - ${hadoopShare}/*/sources ${hadoopShare}/*/jdiff ${hadoopShare}/*/*-tests.jar + ${hadoopShare}/*/sources ${hadoopShare}/*/jdiff tar czf "$hadoopTarTmp.tar.gz" -C "$hadoopTarTmp" . rm -rf "$hadoopTarTmp" diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java index dc94b5d4c4..60545bee2e 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/AMOptions.java @@ -8,8 +8,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; /** @@ -31,10 +33,16 @@ class AMOptions { public static final String NAMENODE_METRICS_PERIOD_ARG = "namenode_metrics_period"; public static final String NAMENODE_METRICS_PERIOD_DEFAULT = "60"; public static final String SHELL_ENV_ARG = "shell_env"; + public static final String DATANODES_PER_CLUSTER_ARG = "datanodes_per_cluster"; + public static final String DATANODES_PER_CLUSTER_DEFAULT = "1"; + public static final String DATANODE_LAUNCH_DELAY_ARG = "datanode_launch_delay"; + public static final String DATANODE_LAUNCH_DELAY_DEFAULT = "0s"; private final int datanodeMemoryMB; private final int datanodeVirtualCores; private final String datanodeArgs; + private final int datanodesPerCluster; + private final String datanodeLaunchDelay; private final int namenodeMemoryMB; private final int namenodeVirtualCores; private final String namenodeArgs; @@ -45,11 +53,13 @@ class AMOptions { private final Map shellEnv; AMOptions(int datanodeMemoryMB, int datanodeVirtualCores, String datanodeArgs, - int namenodeMemoryMB, int namenodeVirtualCores, String namenodeArgs, - int namenodeMetricsPeriod, Map shellEnv) { + int datanodesPerCluster, String datanodeLaunchDelay, int namenodeMemoryMB, int namenodeVirtualCores, + String namenodeArgs, int namenodeMetricsPeriod, Map shellEnv) { this.datanodeMemoryMB = datanodeMemoryMB; this.datanodeVirtualCores = datanodeVirtualCores; this.datanodeArgs = datanodeArgs; + this.datanodesPerCluster = datanodesPerCluster; + this.datanodeLaunchDelay = datanodeLaunchDelay; this.namenodeMemoryMB = namenodeMemoryMB; this.namenodeVirtualCores = namenodeVirtualCores; this.namenodeArgs = namenodeArgs; @@ -74,6 +84,7 @@ void verify(int maxMemory, int maxVcores) throws IllegalArgumentException { "namenodeMemoryMB (%s) must be between 0 and %s", namenodeMemoryMB, maxMemory); Preconditions.checkArgument(namenodeVirtualCores > 0 && namenodeVirtualCores <= maxVcores, "namenodeVirtualCores (%s) must be between 0 and %s", namenodeVirtualCores, maxVcores); + Preconditions.checkArgument(datanodesPerCluster > 0, "datanodesPerCluster (%s) must be > 0", datanodesPerCluster); } /** @@ -89,6 +100,8 @@ void addToVargs(List vargs) { if (!datanodeArgs.isEmpty()) { vargs.add("--" + DATANODE_ARGS_ARG + " \\\"" + datanodeArgs + "\\\""); } + vargs.add("--" + DATANODES_PER_CLUSTER_ARG + " " + String.valueOf(datanodesPerCluster)); + vargs.add("--" + DATANODE_LAUNCH_DELAY_ARG + " " + datanodeLaunchDelay); vargs.add("--" + NAMENODE_MEMORY_MB_ARG + " " + String.valueOf(namenodeMemoryMB)); vargs.add("--" + NAMENODE_VCORES_ARG + " " + String.valueOf(namenodeVirtualCores)); if (!namenodeArgs.isEmpty()) { @@ -108,6 +121,18 @@ int getDataNodeVirtualCores() { return datanodeVirtualCores; } + int getDataNodesPerCluster() { + return datanodesPerCluster; + } + + long getDataNodeLaunchDelaySec() { + // Leverage the human-readable time parsing capabilities of Configuration + String tmpConfKey = "___temp_config_property___"; + Configuration tmpConf = new Configuration(); + tmpConf.set(tmpConfKey, datanodeLaunchDelay); + return tmpConf.getTimeDuration(tmpConfKey, 0, TimeUnit.SECONDS); + } + int getNameNodeMemoryMB() { return namenodeMemoryMB; } @@ -144,6 +169,12 @@ static void setOptions(Options opts) { opts.addOption(DATANODE_VCORES_ARG, true, "Amount of virtual cores to be requested to run the DNs (default " + DATANODE_VCORES_DEFAULT + ")"); opts.addOption(DATANODE_ARGS_ARG, true, "Additional arguments to add when starting the DataNodes."); + opts.addOption(DATANODES_PER_CLUSTER_ARG, true, "How many simulated DataNodes to run within each YARN container " + + "(default " + DATANODES_PER_CLUSTER_DEFAULT + ")"); + opts.addOption(DATANODE_LAUNCH_DELAY_ARG, true, "The period over which to launch the DataNodes; this will " + + "be used as the maximum delay and each DataNode container will be launched with some random delay less than " + + "this value. Accepts human-readable time durations (e.g. 10s, 1m) (default " + + DATANODE_LAUNCH_DELAY_DEFAULT + ")"); opts.addOption("help", false, "Print usage"); } @@ -176,6 +207,8 @@ static AMOptions initFromParser(CommandLine cliParser) { Integer.parseInt(cliParser.getOptionValue(DATANODE_MEMORY_MB_ARG, DATANODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(DATANODE_VCORES_ARG, DATANODE_VCORES_DEFAULT)), cliParser.getOptionValue(DATANODE_ARGS_ARG, ""), + Integer.parseInt(cliParser.getOptionValue(DATANODES_PER_CLUSTER_ARG, DATANODES_PER_CLUSTER_DEFAULT)), + cliParser.getOptionValue(DATANODE_LAUNCH_DELAY_ARG, DATANODE_LAUNCH_DELAY_DEFAULT), Integer.parseInt(cliParser.getOptionValue(NAMENODE_MEMORY_MB_ARG, NAMENODE_MEMORY_MB_DEFAULT)), Integer.parseInt(cliParser.getOptionValue(NAMENODE_VCORES_ARG, NAMENODE_VCORES_DEFAULT)), cliParser.getOptionValue(NAMENODE_ARGS_ARG, ""), diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java index 44183fb57f..7fde3b1afc 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/ApplicationMaster.java @@ -8,6 +8,7 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -19,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -93,10 +95,8 @@ @InterfaceStability.Unstable public class ApplicationMaster { - // Location of the script used to start the DataNode/NameNode processes - private static final String START_SCRIPT_PATH = "scripts/start-component.sh"; - private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + private static final Random RAND = new Random(); // Configuration private Configuration conf; @@ -114,14 +114,15 @@ public class ApplicationMaster { private List blockListFiles; private int numTotalDataNodes; + private int numTotalDataNodeContainers; // Counter for completed datanodes (complete denotes successful or failed ) - private AtomicInteger numCompletedDataNodes = new AtomicInteger(); + private AtomicInteger numCompletedDataNodeContainers = new AtomicInteger(); // Allocated datanode count so that we know how many datanodes has the RM // allocated to us - private AtomicInteger numAllocatedDataNodes = new AtomicInteger(); + private AtomicInteger numAllocatedDataNodeContainers = new AtomicInteger(); // Count of failed datanodes - private AtomicInteger numFailedDataNodes = new AtomicInteger(); + private AtomicInteger numFailedDataNodeContainers = new AtomicInteger(); // True iff the application has completed and is ready for cleanup // Once true, will never be false. This variable should not be accessed @@ -319,12 +320,15 @@ public Boolean get() { markCompleted(); return false; } + numTotalDataNodeContainers = (int) Math.ceil( + ((double) numTotalDataNodes) / Math.max(1, amOptions.getDataNodesPerCluster())); - LOG.info("Requesting " + numTotalDataNodes + " DataNodes with " + amOptions.getDataNodeMemoryMB() + "MB memory, " + + LOG.info("Requesting " + numTotalDataNodeContainers + " DataNode containers with " + + amOptions.getDataNodeMemoryMB() + "MB memory, " + amOptions.getDataNodeVirtualCores() + " vcores, "); - for (int i = 0; i < numTotalDataNodes; ++i) { - ContainerRequest datanodeAsk = - setupContainerAskForRM(amOptions.getDataNodeMemoryMB(), amOptions.getDataNodeVirtualCores(), 1); + for (int i = 0; i < numTotalDataNodeContainers; ++i) { + ContainerRequest datanodeAsk = setupContainerAskForRM(amOptions.getDataNodeMemoryMB(), + amOptions.getDataNodeVirtualCores(), 1); amRMClient.addContainerRequest(datanodeAsk); LOG.debug("Requested datanode ask: " + datanodeAsk.toString()); } @@ -400,14 +404,14 @@ private boolean cleanup() { FinalApplicationStatus appStatus; String appMessage = null; boolean success; - if (numFailedDataNodes.get() == 0 && numCompletedDataNodes.get() == numTotalDataNodes) { + if (numFailedDataNodeContainers.get() == 0 && numCompletedDataNodeContainers.get() == numTotalDataNodes) { appStatus = FinalApplicationStatus.SUCCEEDED; success = true; } else { appStatus = FinalApplicationStatus.FAILED; - appMessage = "Diagnostics." + ", total=" + numTotalDataNodes - + ", completed=" + numCompletedDataNodes.get() + ", allocated=" - + numAllocatedDataNodes.get() + ", failed=" + numFailedDataNodes.get(); + appMessage = "Diagnostics." + ", total=" + numTotalDataNodeContainers + + ", completed=" + numCompletedDataNodeContainers.get() + ", allocated=" + + numAllocatedDataNodeContainers.get() + ", failed=" + numFailedDataNodeContainers.get(); success = false; } try { @@ -454,16 +458,16 @@ public void onContainersCompleted(List completedContainers) { // increment counters for completed/failed containers int exitStatus = containerStatus.getExitStatus(); - int completed = numCompletedDataNodes.incrementAndGet(); + int completed = numCompletedDataNodeContainers.incrementAndGet(); if (0 != exitStatus) { - numFailedDataNodes.incrementAndGet(); + numFailedDataNodeContainers.incrementAndGet(); } else { LOG.info("DataNode " + completed + " completed successfully, containerId=" + containerStatus.getContainerId()); } } - if (numCompletedDataNodes.get() == numTotalDataNodes) { + if (numCompletedDataNodeContainers.get() == numTotalDataNodeContainers) { LOG.info("All datanode containers completed; marking application as done"); markCompleted(); } @@ -483,14 +487,14 @@ public void onContainersAllocated(List allocatedContainers) { containerLauncher = new LaunchContainerRunnable(container, true, containerListener); } else if (rsrc.getMemory() >= amOptions.getDataNodeMemoryMB() && rsrc.getVirtualCores() >= amOptions.getDataNodeVirtualCores() - && numAllocatedDataNodes.get() < numTotalDataNodes) { + && numAllocatedDataNodeContainers.get() < numTotalDataNodes) { if (launchNameNode && namenodeContainer == null) { LOG.error("Received a container with following resources suited " + "for a DataNode but no NameNode container exists: containerMem=" + rsrc.getMemory() + ", containerVcores=" + rsrc.getVirtualCores()); continue; } - numAllocatedDataNodes.getAndIncrement(); + numAllocatedDataNodeContainers.getAndIncrement(); datanodeContainers.put(container.getId(), container); componentType = "DATANODE"; containerLauncher = new LaunchContainerRunnable(container, false, containerListener); @@ -587,8 +591,8 @@ public void onStartContainerError(ContainerId containerId, Throwable t) { } else if (isDataNode(containerId)) { LOG.error("Failed to start DataNode Container " + containerId); datanodeContainers.remove(containerId); - numCompletedDataNodes.incrementAndGet(); - numFailedDataNodes.incrementAndGet(); + numCompletedDataNodeContainers.incrementAndGet(); + numFailedDataNodeContainers.incrementAndGet(); } else { LOG.error("onStartContainerError received unknown container ID: " + containerId); } @@ -676,15 +680,22 @@ private Map getLocalResources() throws IOException { Map envs = System.getenv(); addAsLocalResourceFromEnv(DynoConstants.CONF_ZIP, localResources, envs); - addAsLocalResourceFromEnv(DynoConstants.SCRIPTS_ZIP, localResources, envs); + addAsLocalResourceFromEnv(DynoConstants.START_SCRIPT, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.HADOOP_BINARY, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.VERSION, localResources, envs); + addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs); if (isNameNodeLauncher) { addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE, localResources, envs); addAsLocalResourceFromEnv(DynoConstants.FS_IMAGE_MD5, localResources, envs); } else { - addAsLocalResourceFromEnv(DynoConstants.DYNO_JAR, localResources, envs); - localResources.put(DynoConstants.BLOCK_LIST_RESOURCE_PATH, blockListFiles.remove(0)); + int blockFilesToLocalize = Math.max(1, amOptions.getDataNodesPerCluster()); + for (int i = 0; i < blockFilesToLocalize; i++) { + try { + localResources.put(DynoConstants.BLOCK_LIST_RESOURCE_PATH_PREFIX + i, blockListFiles.remove(0)); + } catch (IndexOutOfBoundsException e) { + break; + } + } } return localResources; } @@ -697,13 +708,15 @@ private List getContainerStartCommand() throws IOException { List vargs = new ArrayList<>(); // Set executable command - vargs.add(START_SCRIPT_PATH); + vargs.add("./" + DynoConstants.START_SCRIPT.getResourcePath()); String component = isNameNodeLauncher ? "namenode" : "datanode"; vargs.add(component); if (isNameNodeLauncher) { vargs.add(remoteStoragePath.getFileSystem(conf).makeQualified(remoteStoragePath).toString()); } else { vargs.add(namenodeServiceRpcAddress); + vargs.add(String.valueOf(amOptions.getDataNodeLaunchDelaySec() < 1 ? 0 : + RAND.nextInt(Ints.checkedCast(amOptions.getDataNodeLaunchDelaySec())))); } // Add log redirect params diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java new file mode 100644 index 0000000000..53b7b79b1f --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/BlockPlacementPolicyAlwaysSatisfied.java @@ -0,0 +1,36 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; + + +/** + * A BlockPlacementPolicy which always considered itself satisfied. This avoids the issue that the Dynamometer + * NameNode will complain about blocks being under-replicated because they're not being put on distinct racks. + */ +public class BlockPlacementPolicyAlwaysSatisfied extends BlockPlacementPolicyDefault { + + private static final BlockPlacementStatusSatisfied SATISFIED = new BlockPlacementStatusSatisfied(); + + private static class BlockPlacementStatusSatisfied implements BlockPlacementStatus { + @Override + public boolean isPlacementPolicySatisfied() { + return true; + } + + public String getErrorDescription() { + return null; + } + } + + @Override + public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, int numberOfReplicas) { + return SATISFIED; + } + +} diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index 9adf535da1..3b6c54cbd2 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -132,6 +132,9 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_THREADS_PER_MAPPER_ARG = "workload_threads_per_mapper"; public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; + private static final String START_SCRIPT_LOCATION = + Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString(); + private YarnClient yarnClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; @@ -148,8 +151,6 @@ public class Client extends Configured implements Tool { private String hadoopBinary = ""; // Location of DN conf zip private String confPath = ""; - // Location of scripts zip - private String scriptsPath = ""; // Location of root dir for DN block image zips private String blockListPath = ""; // Location of NN fs image @@ -346,7 +347,6 @@ public boolean accept(Path path) { this.amMemory = Integer.parseInt(cliParser.getOptionValue(MASTER_MEMORY_MB_ARG, MASTER_MEMORY_MB_DEFAULT)); this.amVCores = Integer.parseInt(cliParser.getOptionValue(MASTER_VCORES_ARG, MASTER_VCORES_DEFAULT)); this.confPath = cliParser.getOptionValue(CONF_PATH_ARG); - this.scriptsPath = Client.class.getClassLoader().getResource("scripts").getPath(); this.blockListPath = cliParser.getOptionValue(BLOCK_LIST_PATH_ARG); if (cliParser.hasOption(HADOOP_BINARY_PATH_ARG)) { this.hadoopBinary = cliParser.getOptionValue(HADOOP_BINARY_PATH_ARG); @@ -511,7 +511,7 @@ private Map setupRemoteResourcesGetEnv() throws IOException { } setupRemoteResource(versionFilePath, infraAppId, DynoConstants.VERSION, env); setupRemoteResource(confPath, infraAppId, DynoConstants.CONF_ZIP, env); - setupRemoteResource(scriptsPath, infraAppId, DynoConstants.SCRIPTS_ZIP, env); + setupRemoteResource(START_SCRIPT_LOCATION, infraAppId, DynoConstants.START_SCRIPT, env); setupRemoteResource(hadoopBinary, infraAppId, DynoConstants.HADOOP_BINARY, env); setupRemoteResource(appMasterJar, infraAppId, DynoConstants.DYNO_JAR, env); diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java index d5e9a85ae7..e4d82073d7 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/DynoConstants.java @@ -31,8 +31,8 @@ public class DynoConstants { public static final DynoResource CONF_ZIP = new DynoResource("CONF_ZIP", ARCHIVE, "conf"); // Resource for the Hadoop binary archive (distribution tar) public static final DynoResource HADOOP_BINARY = new DynoResource("HADOOP_BINARY", ARCHIVE, "hadoopBinary"); - // Resource for the zip file for the scripts used by the DataNodes/NameNode - public static final DynoResource SCRIPTS_ZIP = new DynoResource("SCRIPTS_ZIP", ARCHIVE, "scripts"); + // Resource for the script used to start the DataNodes/NameNode + public static final DynoResource START_SCRIPT = new DynoResource("START_SCRIPT", FILE, "start-component.sh"); // Resource for the file system image file used by the NameNode public static final DynoResource FS_IMAGE = new DynoResource("FS_IMAGE", FILE, null); // Resource for the md5 file accompanying the file system image for the NameNode @@ -47,8 +47,8 @@ public class DynoConstants { public static final String BLOCK_LIST_PATH_ENV = "BLOCK_ZIP_PATH"; // The format of the name of a single block file public static final Pattern BLOCK_LIST_FILE_PATTERN = Pattern.compile("dn[0-9]+-a-[0-9]+-r-[0-9]+"); - // The file name to use when localizing the block file on a DataNode - public static final String BLOCK_LIST_RESOURCE_PATH = "block"; + // The file name to use when localizing the block file on a DataNode; will be suffixed with an integer + public static final String BLOCK_LIST_RESOURCE_PATH_PREFIX = "blocks/block"; public static final PathFilter BLOCK_LIST_FILE_FILTER = new PathFilter() { @Override public boolean accept(Path path) { diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java new file mode 100644 index 0000000000..37a7b2ede6 --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedDataNodes.java @@ -0,0 +1,153 @@ +/** + * Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. + * See LICENSE in the project root for license information. + */ +package com.linkedin.dynamometer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + + +/** + * Starts up a number of DataNodes within the same JVM. These DataNodes all use + * {@link org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset}, so they do not store any actual data, and do not + * persist anything to disk; they maintain all metadata in memory. This is useful for testing and simulation purposes. + *

+ * The DataNodes will attempt to connect to a NameNode defined by the default FileSystem. There will be one DataNode + * started for each block list file passed as an argument. Each of these files should contain a list of blocks that + * the corresponding DataNode should contain, as specified by a triplet of block ID, block size, and generation stamp. + * Each line of the file is one block, in the format: + *

+ * {@code blockID,blockGenStamp,blockSize} + *

+ * This class is loosely based off of {@link org.apache.hadoop.hdfs.DataNodeCluster}. + */ +public class SimulatedDataNodes extends Configured implements Tool { + + // Set this arbitrarily large (100TB) since we don't care about storage capacity + private static final long STORAGE_CAPACITY = 100 * 2L<<40; + private static final String USAGE = + "Usage: com.linkedin.dynamometer.SimulatedDataNodes bpid blockListFile1 [ blockListFileN ... ]\n" + + " bpid should be the ID of the block pool to which these DataNodes belong.\n" + + " Each blockListFile specified should contain a list of blocks to be served by one DataNode.\n" + + " See the Javadoc of this class for more detail."; + + static void printUsageExit(String err) { + System.out.println(err); + System.out.println(USAGE); + System.exit(1); + } + + public static void main(String[] args) throws Exception { + SimulatedDataNodes datanodes = new SimulatedDataNodes(); + ToolRunner.run(new HdfsConfiguration(), datanodes, args); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + printUsageExit("Not enough arguments"); + } + String bpid = args[0]; + List blockListFiles = new ArrayList<>(); + for (int i = 1; i < args.length; i++) { + blockListFiles.add(new Path(args[i])); + } + + URI defaultFS = FileSystem.getDefaultUri(getConf()); + if (!HdfsConstants.HDFS_URI_SCHEME.equals(defaultFS.getScheme())) { + printUsageExit("Must specify an HDFS-based default FS! Got <" + defaultFS + ">"); + } + String nameNodeAdr = defaultFS.getAuthority(); + if (nameNodeAdr == null) { + printUsageExit("No NameNode address and port in config"); + } + System.out.println("DataNodes will connect to NameNode at " + nameNodeAdr); + + System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, + DataNode.getStorageLocations(getConf()).get(0).getFile().getAbsolutePath()); + SimulatedMultiStorageFSDataset.setFactory(getConf()); + getConf().setLong(SimulatedMultiStorageFSDataset.CONFIG_PROPERTY_CAPACITY, STORAGE_CAPACITY); + + UserGroupInformation.setConfiguration(getConf()); + MiniDFSCluster mc = new MiniDFSCluster(); + try { + mc.formatDataNodeDirs(); + } catch (IOException e) { + System.out.println("Error formatting DataNode dirs: " + e); + System.exit(1); + } + + try { + System.out.println("Found " + blockListFiles.size() + " block listing files; launching DataNodes accordingly."); + mc.startDataNodes(getConf(), blockListFiles.size(), null, false, StartupOption.REGULAR, + null, null, null, null, false, true, true, null); + long startTime = Time.monotonicNow(); + System.out.println("Waiting for DataNodes to connect to NameNode and init storage directories."); + Set datanodesWithoutFSDataset = new HashSet<>(mc.getDataNodes()); + while (!datanodesWithoutFSDataset.isEmpty()) { + Iterator iter = datanodesWithoutFSDataset.iterator(); + while (iter.hasNext()) { + if (DataNodeTestUtils.getFSDataset(iter.next()) != null) { + iter.remove(); + } + } + Thread.sleep(100); + } + System.out.println("Waited " + (Time.monotonicNow() - startTime) + " ms for DataNode FSDatasets to be ready"); + + for (int dnIndex = 0; dnIndex < blockListFiles.size(); dnIndex++) { + Path blockListFile = blockListFiles.get(dnIndex); + try (FSDataInputStream fsdis = blockListFile.getFileSystem(getConf()).open(blockListFile)) { + BufferedReader reader = new BufferedReader(new InputStreamReader(fsdis)); + List blockList = new ArrayList<>(); + int cnt = 0; + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + // Format of the listing files is blockID,blockGenStamp,blockSize + String[] blockInfo = line.split(","); + blockList.add(new Block(Long.parseLong(blockInfo[0]), + Long.parseLong(blockInfo[2]), Long.parseLong(blockInfo[1]))); + cnt++; + } + try { + mc.injectBlocks(dnIndex, blockList, bpid); + } catch (IOException ioe) { + System.out.printf("Error injecting blocks into DataNode %d for block pool %s: %s%n", dnIndex, bpid, + ExceptionUtils.getFullStackTrace(ioe)); + } + System.out.printf("Injected %d blocks into DataNode %d for block pool %s%n", cnt, dnIndex, bpid); + } + } + + } catch (IOException e) { + System.out.println("Error creating DataNodes: " + ExceptionUtils.getFullStackTrace(e)); + return 1; + } + return 0; + } + +} diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java new file mode 100644 index 0000000000..54cc38bf86 --- /dev/null +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/SimulatedMultiStorageFSDataset.java @@ -0,0 +1,1407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.dynamometer; + +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.Replica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; +import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; +import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.DataChecksum; + + +/** + *

+ * This is a modified version of {@link SimulatedFSDataset} from Hadoop branch-2, because previous versions + * did not support having multiple storages per DataNode, which is very important for obtaining correct + * full block report performance (as there is one report per storage). It is essentially the branch-2 + * {@link SimulatedFSDataset} with HDFS-12818 additionally applied on top of it. + * + * This file is a modified version of the {@link SimulatedFSDataset} class, taken from + * Apache Hadoop 2.7.4. It was originally developed by + * The Apache Software Foundation. + */ +public class SimulatedMultiStorageFSDataset extends SimulatedFSDataset { + static class Factory extends FsDatasetSpi.Factory { + @Override + public SimulatedMultiStorageFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new SimulatedMultiStorageFSDataset(datanode, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + + public static void setFactory(Configuration conf) { + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + Factory.class.getName()); + } + + public static final String CONFIG_PROPERTY_CAPACITY = + "dfs.datanode.simulateddatastorage.capacity"; + + public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte + public static final byte DEFAULT_DATABYTE = 9; + + public static final String CONFIG_PROPERTY_STATE = + "dfs.datanode.simulateddatastorage.state"; + private static final DatanodeStorage.State DEFAULT_STATE = + DatanodeStorage.State.NORMAL; + + static final byte[] nullCrcFileData; + static { + DataChecksum checksum = DataChecksum.newDataChecksum( + DataChecksum.Type.NULL, 16*1024 ); + byte[] nullCrcHeader = checksum.getHeader(); + nullCrcFileData = new byte[2 + nullCrcHeader.length]; + nullCrcFileData[0] = (byte) ((BlockMetadataHeader.VERSION >>> 8) & 0xff); + nullCrcFileData[1] = (byte) (BlockMetadataHeader.VERSION & 0xff); + for (int i = 0; i < nullCrcHeader.length; i++) { + nullCrcFileData[i+2] = nullCrcHeader[i]; + } + } + + // information about a single block + private class BInfo implements ReplicaInPipelineInterface { + final Block theBlock; + private boolean finalized = false; // if not finalized => ongoing creation + SimulatedOutputStream oStream = null; + private long bytesAcked; + private long bytesRcvd; + private boolean pinned = false; + BInfo(String bpid, Block b, boolean forWriting) throws IOException { + theBlock = new Block(b); + if (theBlock.getNumBytes() < 0) { + theBlock.setNumBytes(0); + } + if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) { + // expected length - actual length may + // be more - we find out at finalize + DataNode.LOG.warn("Lack of free storage on a block alloc"); + throw new IOException("Creating block, no free space available"); + } + + if (forWriting) { + finalized = false; + oStream = new SimulatedOutputStream(); + } else { + finalized = true; + oStream = null; + } + } + + @Override + public String getStorageUuid() { + return getStorage(theBlock).getStorageUuid(); + } + + @Override + synchronized public long getGenerationStamp() { + return theBlock.getGenerationStamp(); + } + + @Override + synchronized public long getNumBytes() { + if (!finalized) { + return bytesRcvd; + } else { + return theBlock.getNumBytes(); + } + } + + @Override + synchronized public void setNumBytes(long length) { + if (!finalized) { + bytesRcvd = length; + } else { + theBlock.setNumBytes(length); + } + } + + synchronized SimulatedInputStream getIStream() { + if (!finalized) { + // throw new IOException("Trying to read an unfinalized block"); + return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE); + } else { + return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE); + } + } + + synchronized void finalizeBlock(String bpid, long finalSize) + throws IOException { + if (finalized) { + throw new IOException( + "Finalizing a block that has already been finalized" + + theBlock.getBlockId()); + } + if (oStream == null) { + DataNode.LOG.error("Null oStream on unfinalized block - bug"); + throw new IOException("Unexpected error on finalize"); + } + + if (oStream.getLength() != finalSize) { + DataNode.LOG.warn("Size passed to finalize (" + finalSize + + ")does not match what was written:" + oStream.getLength()); + throw new IOException( + "Size passed to finalize does not match the amount of data written"); + } + // We had allocated the expected length when block was created; + // adjust if necessary + long extraLen = finalSize - theBlock.getNumBytes(); + if (extraLen > 0) { + if (!getStorage(theBlock).alloc(bpid,extraLen)) { + DataNode.LOG.warn("Lack of free storage on a block alloc"); + throw new IOException("Creating block, no free space available"); + } + } else { + getStorage(theBlock).free(bpid, -extraLen); + } + theBlock.setNumBytes(finalSize); + + finalized = true; + oStream = null; + return; + } + + synchronized void unfinalizeBlock() throws IOException { + if (!finalized) { + throw new IOException("Unfinalized a block that's not finalized " + + theBlock); + } + finalized = false; + oStream = new SimulatedOutputStream(); + long blockLen = theBlock.getNumBytes(); + oStream.setLength(blockLen); + bytesRcvd = blockLen; + bytesAcked = blockLen; + } + + SimulatedInputStream getMetaIStream() { + return new SimulatedInputStream(nullCrcFileData); + } + + synchronized boolean isFinalized() { + return finalized; + } + + @Override + synchronized public ReplicaOutputStreams createStreams(boolean isCreate, + DataChecksum requestedChecksum) throws IOException { + if (finalized) { + throw new IOException("Trying to write to a finalized replica " + + theBlock); + } else { + SimulatedOutputStream crcStream = new SimulatedOutputStream(); + return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, + getStorage(theBlock).getVolume().isTransientStorage()); + } + } + + @Override + synchronized public long getBlockId() { + return theBlock.getBlockId(); + } + + @Override + synchronized public long getVisibleLength() { + return getBytesAcked(); + } + + @Override + public ReplicaState getState() { + return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW; + } + + @Override + synchronized public long getBytesAcked() { + if (finalized) { + return theBlock.getNumBytes(); + } else { + return bytesAcked; + } + } + + @Override + synchronized public void setBytesAcked(long bytesAcked) { + if (!finalized) { + this.bytesAcked = bytesAcked; + } + } + + @Override + public void releaseAllBytesReserved() { + } + + @Override + synchronized public long getBytesOnDisk() { + if (finalized) { + return theBlock.getNumBytes(); + } else { + return oStream.getLength(); + } + } + + @Override + public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) { + oStream.setLength(dataLength); + } + + @Override + public ChunkChecksum getLastChecksumAndDataLen() { + return new ChunkChecksum(oStream.getLength(), null); + } + + @Override + public boolean isOnTransientStorage() { + return false; + } + } + + /** + * Class is used for tracking block pool storage + */ + private static class SimulatedBPStorage { + private long used; // in bytes + private final Map blockMap = new HashMap<>(); + + long getUsed() { + return used; + } + + void alloc(long amount) { + used += amount; + } + + void free(long amount) { + used -= amount; + } + + Map getBlockMap() { + return blockMap; + } + + SimulatedBPStorage() { + used = 0; + } + } + + /** + * Class used for tracking datanode level storage + */ + private static class SimulatedStorage { + private final Map map = + new HashMap(); + + private final long capacity; // in bytes + private final DatanodeStorage dnStorage; + private final SimulatedVolume volume; + + synchronized long getFree() { + return capacity - getUsed(); + } + + long getCapacity() { + return capacity; + } + + synchronized long getUsed() { + long used = 0; + for (SimulatedBPStorage bpStorage : map.values()) { + used += bpStorage.getUsed(); + } + return used; + } + + synchronized long getBlockPoolUsed(String bpid) throws IOException { + return getBPStorage(bpid).getUsed(); + } + + int getNumFailedVolumes() { + return 0; + } + + synchronized boolean alloc(String bpid, long amount) throws IOException { + if (getFree() >= amount) { + getBPStorage(bpid).alloc(amount); + return true; + } + return false; + } + + synchronized void free(String bpid, long amount) throws IOException { + getBPStorage(bpid).free(amount); + } + + SimulatedStorage(long cap, DatanodeStorage.State state) { + capacity = cap; + dnStorage = new DatanodeStorage( + "SimulatedStorage-" + DatanodeStorage.generateUuid(), + state, StorageType.DEFAULT); + this.volume = new SimulatedVolume(this); + } + + synchronized void addBlockPool(String bpid) { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage != null) { + return; + } + map.put(bpid, new SimulatedBPStorage()); + } + + synchronized void removeBlockPool(String bpid) { + map.remove(bpid); + } + + private SimulatedBPStorage getBPStorage(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("block pool " + bpid + " not found"); + } + return bpStorage; + } + + String getStorageUuid() { + return dnStorage.getStorageID(); + } + + DatanodeStorage getDnStorage() { + return dnStorage; + } + + synchronized StorageReport getStorageReport(String bpid) { + return new StorageReport(dnStorage, + false, getCapacity(), getUsed(), getFree(), + map.get(bpid).getUsed(), 0L); + } + + SimulatedVolume getVolume() { + return volume; + } + + Map getBlockMap(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("Nonexistent block pool: " + bpid); + } + return bpStorage.getBlockMap(); + } + } + + static class SimulatedVolume implements FsVolumeSpi { + private final SimulatedStorage storage; + + SimulatedVolume(final SimulatedStorage storage) { + this.storage = storage; + } + + @Override + public FsVolumeReference obtainReference() throws ClosedChannelException { + return null; + } + + @Override + public String getStorageID() { + return storage.getStorageUuid(); + } + + @Override + public String[] getBlockPoolList() { + return new String[0]; + } + + @Override + public long getAvailable() throws IOException { + return storage.getCapacity() - storage.getUsed(); + } + + @Override + public String getBasePath() { + return null; + } + + @Override + public String getPath(String bpid) throws IOException { + return null; + } + + @Override + public File getFinalizedDir(String bpid) throws IOException { + return null; + } + + @Override + public StorageType getStorageType() { + return null; + } + + @Override + public boolean isTransientStorage() { + return false; + } + + @Override + public void reserveSpaceForRbw(long bytesToReserve) { + } + + @Override + public void releaseReservedSpace(long bytesToRelease) { + } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FsDatasetSpi getDataset() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + } + + private final List storages; + private final String datanodeUuid; + private final DataNode datanode; + + public SimulatedMultiStorageFSDataset(DataNode datanode, Configuration conf) { + super(datanode, null, conf); + this.datanode = datanode; + int storageCount = DataNode.getStorageLocations(conf).size(); + this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid(); + + this.storages = new ArrayList<>(); + for (int i = 0; i < storageCount; i++) { + this.storages.add(new SimulatedStorage( + conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), + conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE))); + } + } + + public synchronized void injectBlocks(String bpid, + Iterable injectBlocks) throws IOException { + ExtendedBlock blk = new ExtendedBlock(); + if (injectBlocks != null) { + for (Block b: injectBlocks) { // if any blocks in list is bad, reject list + if (b == null) { + throw new NullPointerException("Null blocks in block list"); + } + blk.set(bpid, b); + if (isValidBlock(blk)) { + DataNode.LOG.error("Block already exists in block list; trying to add <" + + b + "> but already have <" + getBlockMap(blk).get(blk.getLocalBlock()).theBlock + ">; skipping"); + } + } + + List> blockMaps = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); + blockMaps.add(storage.getBlockMap(bpid)); + } + + for (Block b: injectBlocks) { + BInfo binfo = new BInfo(bpid, b, false); + blockMaps.get((int) (b.getBlockId() % storages.size())).put(binfo.theBlock, binfo); + } + } + } + + /** Get the storage that a given block lives within. */ + private SimulatedStorage getStorage(Block b) { + return storages.get((int) (b.getBlockId() % storages.size())); + } + + /** + * Get the block map that a given block lives within, assuming it is within + * block pool bpid. + */ + private Map getBlockMap(Block b, String bpid) + throws IOException { + return getStorage(b).getBlockMap(bpid); + } + + /** Get the block map that a given block lives within. */ + private Map getBlockMap(ExtendedBlock b) throws IOException { + return getBlockMap(b.getLocalBlock(), b.getBlockPoolId()); + } + + @Override // FsDatasetSpi + public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) + throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("Finalizing a non existing block " + b); + } + binfo.finalizeBlock(b.getBlockPoolId(), b.getNumBytes()); + } + + @Override // FsDatasetSpi + public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{ + if (isValidRbw(b)) { + getBlockMap(b).remove(b.getLocalBlock()); + } + } + + synchronized BlockListAsLongs getBlockReport(String bpid, + SimulatedStorage storage) { + BlockListAsLongs.Builder report = BlockListAsLongs.builder(); + try { + for (BInfo b : storage.getBlockMap(bpid).values()) { + if (b.isFinalized()) { + report.add(b); + } + } + } catch (IOException ioe) { + // Ignore + } + return report.build(); + } + + @Override + public synchronized Map getBlockReports( + String bpid) { + Map blockReports = new HashMap<>(); + for (SimulatedStorage storage : storages) { + blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage)); + } + return blockReports; + } + + @Override // FsDatasetSpi + public List getCacheReport(String bpid) { + return new LinkedList(); + } + + @Override // FSDatasetMBean + public long getCapacity() { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getCapacity(); + } + return total; + } + + @Override // FSDatasetMBean + public long getDfsUsed() { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getUsed(); + } + return total; + } + + @Override // FSDatasetMBean + public long getBlockPoolUsed(String bpid) throws IOException { + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getBlockPoolUsed(bpid); + } + return total; + } + + @Override // FSDatasetMBean + public long getRemaining() { + + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getFree(); + } + return total; + } + + @Override // FSDatasetMBean + public int getNumFailedVolumes() { + + int total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getNumFailedVolumes(); + } + return total; + } + + @Override // FSDatasetMBean + public String[] getFailedStorageLocations() { + return null; + } + + @Override // FSDatasetMBean + public long getLastVolumeFailureDate() { + return 0; + } + + @Override // FSDatasetMBean + public long getEstimatedCapacityLostTotal() { + return 0; + } + + @Override // FsDatasetSpi + public VolumeFailureSummary getVolumeFailureSummary() { + return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0); + } + + @Override // FSDatasetMBean + public long getCacheUsed() { + return 0l; + } + + @Override // FSDatasetMBean + public long getCacheCapacity() { + return 0l; + } + + @Override // FSDatasetMBean + public long getNumBlocksCached() { + return 0l; + } + + @Override + public long getNumBlocksFailedToCache() { + return 0l; + } + + @Override + public long getNumBlocksFailedToUncache() { + return 0l; + } + + @Override // FsDatasetSpi + public synchronized long getLength(ExtendedBlock b) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("Finalizing a non existing block " + b); + } + return binfo.getNumBytes(); + } + + @Override + @Deprecated + public Replica getReplica(String bpid, long blockId) { + Block b = new Block(blockId); + try { + return getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + return null; + } + } + + @Override + public synchronized String getReplicaString(String bpid, long blockId) { + Replica r = null; + try { + Block b = new Block(blockId); + r = getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + // Ignore + } + return r == null? "null": r.toString(); + } + + @Override // FsDatasetSpi + public Block getStoredBlock(String bpid, long blkid) throws IOException { + Block b = new Block(blkid); + try { + BInfo binfo = getBlockMap(b, bpid).get(b); + if (binfo == null) { + return null; + } + return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); + } catch (IOException ioe) { + return null; + } + } + + @Override // FsDatasetSpi + public synchronized void invalidate(String bpid, Block[] invalidBlks) + throws IOException { + boolean error = false; + if (invalidBlks == null) { + return; + } + for (Block b: invalidBlks) { + if (b == null) { + continue; + } + Map map = getBlockMap(b, bpid); + BInfo binfo = map.get(b); + if (binfo == null) { + error = true; + DataNode.LOG.warn("Invalidate: Missing block"); + continue; + } + getStorage(b).free(bpid, binfo.getNumBytes()); + map.remove(b); + if (datanode != null) { + datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b), + binfo.getStorageUuid()); + } + } + if (error) { + throw new IOException("Invalidate: Missing blocks."); + } + } + + @Override // FSDatasetSpi + public void cache(String bpid, long[] cacheBlks) { + throw new UnsupportedOperationException( + "SimulatedMultiStorageFSDataset does not support cache operation!"); + } + + @Override // FSDatasetSpi + public void uncache(String bpid, long[] uncacheBlks) { + throw new UnsupportedOperationException( + "SimulatedMultiStorageFSDataset does not support uncache operation!"); + } + + @Override // FSDatasetSpi + public boolean isCached(String bpid, long blockId) { + return false; + } + + private BInfo getBInfo(final ExtendedBlock b) { + try { + return getBlockMap(b).get(b.getLocalBlock()); + } catch (IOException ioe) { + return null; + } + } + + @Override // {@link FsDatasetSpi} + public boolean contains(ExtendedBlock block) { + return getBInfo(block) != null; + } + + /** + * Check if a block is valid. + * + * @param b The block to check. + * @param minLength The minimum length that the block must have. May be 0. + * @param state If this is null, it is ignored. If it is non-null, we + * will check that the replica has this state. + * + * @throws ReplicaNotFoundException If the replica is not found + * + * @throws UnexpectedReplicaStateException If the replica is not in the + * expected state. + */ + @Override // {@link FsDatasetSpi} + public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) + throws ReplicaNotFoundException, UnexpectedReplicaStateException { + final BInfo binfo = getBInfo(b); + + if (binfo == null) { + throw new ReplicaNotFoundException(b); + } + if ((state == ReplicaState.FINALIZED && !binfo.isFinalized()) || + (state != ReplicaState.FINALIZED && binfo.isFinalized())) { + throw new UnexpectedReplicaStateException(b,state); + } + } + + @Override // FsDatasetSpi + public synchronized boolean isValidBlock(ExtendedBlock b) { + try { + checkBlock(b, 0, ReplicaState.FINALIZED); + } catch (IOException e) { + return false; + } + return true; + } + + /* check if a block is created but not finalized */ + @Override + public synchronized boolean isValidRbw(ExtendedBlock b) { + try { + checkBlock(b, 0, ReplicaState.RBW); + } catch (IOException e) { + return false; + } + return true; + } + + @Override + public String toString() { + return getStorageInfo(); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler append( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null || !binfo.isFinalized()) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + binfo.unfinalizeBlock(); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler recoverAppend( + ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if (binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + if (binfo.isFinalized()) { + binfo.unfinalizeBlock(); + } + map.remove(b); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) + throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if (binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " is not valid, and cannot be appended to."); + } + if (!binfo.isFinalized()) { + binfo.finalizeBlock(b.getBlockPoolId(), binfo.getNumBytes()); + } + map.remove(b.getLocalBlock()); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return binfo; + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler recoverRbw( + ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) + throws IOException { + final Map map = getBlockMap(b); + BInfo binfo = map.get(b.getLocalBlock()); + if ( binfo == null) { + throw new ReplicaNotFoundException("Block " + b + + " does not exist, and cannot be appended to."); + } + if (binfo.isFinalized()) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } + map.remove(b); + binfo.theBlock.setGenerationStamp(newGS); + map.put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler createRbw( + StorageType storageType, ExtendedBlock b, + boolean allowLazyPersist) throws IOException { + return createTemporary(storageType, b, false); + } + + @Override // FsDatasetSpi + public synchronized ReplicaHandler createTemporary( + StorageType storageType, ExtendedBlock b, boolean isTransfer) throws IOException { + if (isValidBlock(b)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } + if (isValidRbw(b)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " is being written, and cannot be written to."); + } + BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); + getBlockMap(b).put(binfo.theBlock, binfo); + return new ReplicaHandler(binfo, null); + } + + synchronized InputStream getBlockInputStream(ExtendedBlock b + ) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + + return binfo.getIStream(); + } + + @Override // FsDatasetSpi + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + InputStream result = getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return result; + } + + /** Not supported */ + @Override // FsDatasetSpi + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, + long ckoff) throws IOException { + throw new IOException("Not supported"); + } + + @Override // FsDatasetSpi + public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b + ) throws IOException { + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + if (!binfo.finalized) { + throw new IOException("Block " + b + + " is being written, its meta cannot be read"); + } + final SimulatedInputStream sin = binfo.getMetaIStream(); + return new LengthInputStream(sin, sin.getLength()); + } + + @Override + public Set checkDataDir() { + // nothing to check for simulated data set + return null; + } + + @Override // FsDatasetSpi + public synchronized void adjustCrcChannelPosition(ExtendedBlock b, + ReplicaOutputStreams stream, + int checksumSize) + throws IOException { + } + + /** + * Simulated input and output streams + * + */ + static private class SimulatedInputStream extends java.io.InputStream { + + + byte theRepeatedData = 7; + final long length; // bytes + int currentPos = 0; + byte[] data = null; + + /** + * An input stream of size l with repeated bytes + * @param l size of the stream + * @param iRepeatedData byte that is repeated in the stream + */ + SimulatedInputStream(long l, byte iRepeatedData) { + length = l; + theRepeatedData = iRepeatedData; + } + + /** + * An input stream of of the supplied data + * @param iData data to construct the stream + */ + SimulatedInputStream(byte[] iData) { + data = iData; + length = data.length; + } + + /** + * @return the lenght of the input stream + */ + long getLength() { + return length; + } + + @Override + public int read() throws IOException { + if (currentPos >= length) + return -1; + if (data !=null) { + return data[currentPos++]; + } else { + currentPos++; + return theRepeatedData; + } + } + + @Override + public int read(byte[] b) throws IOException { + + if (b == null) { + throw new NullPointerException(); + } + if (b.length == 0) { + return 0; + } + if (currentPos >= length) { // EOF + return -1; + } + int bytesRead = (int) Math.min(b.length, length-currentPos); + if (data != null) { + System.arraycopy(data, currentPos, b, 0, bytesRead); + } else { // all data is zero + for (int i : b) { + b[i] = theRepeatedData; + } + } + currentPos += bytesRead; + return bytesRead; + } + } + + /** + * This class implements an output stream that merely throws its data away, but records its + * length. + * + */ + static private class SimulatedOutputStream extends OutputStream { + long length = 0; + + /** + * constructor for Simulated Output Steram + */ + SimulatedOutputStream() { + } + + /** + * + * @return the length of the data created so far. + */ + long getLength() { + return length; + } + + /** + */ + void setLength(long length) { + this.length = length; + } + + @Override + public void write(int arg0) throws IOException { + length++; + } + + @Override + public void write(byte[] b) throws IOException { + length += b.length; + } + + @Override + public void write(byte[] b, + int off, + int len) throws IOException { + length += len; + } + } + + private ObjectName mbeanName; + + + + /** + * Register the FSDataset MBean using the name + * "hadoop:service=DataNode,name=FSDatasetState-" + * We use storage id for MBean name since a minicluster within a single + * Java VM may have multiple Simulated Datanodes. + */ + void registerMBean(final String storageId) { + // We wrap to bypass standard mbean naming convetion. + // This wraping can be removed in java 6 as it is more flexible in + // package naming for mbeans and their impl. + StandardMBean bean; + + try { + bean = new StandardMBean(this,FSDatasetMBean.class); + mbeanName = MBeans.register("DataNode", "FSDatasetState-"+ + storageId, bean); + } catch (NotCompliantMBeanException e) { + DataNode.LOG.warn("Error registering FSDatasetState MBean", e); + } + + DataNode.LOG.info("Registered FSDatasetState MBean"); + } + + @Override + public void shutdown() { + if (mbeanName != null) MBeans.unregister(mbeanName); + } + + @Override + public String getStorageInfo() { + return "Simulated FSDataset-" + datanodeUuid; + } + + @Override + public boolean hasEnoughResource() { + return true; + } + + @Override + public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) + throws IOException { + ExtendedBlock b = rBlock.getBlock(); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); + if (binfo == null) { + throw new IOException("No such Block " + b ); + } + + return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), + binfo.getGenerationStamp(), + binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); + } + + @Override // FsDatasetSpi + public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, + long recoveryId, + long newBlockId, + long newlength) throws IOException { + return getBInfo(oldBlock); + } + + @Override // FsDatasetSpi + public long getReplicaVisibleLength(ExtendedBlock block) { + return block.getNumBytes(); + } + + @Override // FsDatasetSpi + public void addBlockPool(String bpid, Configuration conf) { + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); + } + } + + @Override // FsDatasetSpi + public void shutdownBlockPool(String bpid) { + for (SimulatedStorage storage : storages) { + storage.removeBlockPool(bpid); + } + } + + @Override // FsDatasetSpi + public void deleteBlockPool(String bpid, boolean force) { + return; + } + + @Override + public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary) + throws IOException { + final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock()); + if (r == null) { + throw new IOException("Block not found, temporary=" + temporary); + } else if (r.isFinalized()) { + throw new IOException("Replica already finalized, temporary=" + + temporary + ", r=" + r); + } + return r; + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { + throw new UnsupportedOperationException(); + } + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void enableTrash(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public void clearTrash(String bpid) { + } + + @Override + public boolean trashEnabled(String bpid) { + return false; + } + + @Override + public void setRollingUpgradeMarker(String bpid) { + } + + @Override + public void clearRollingUpgradeMarker(String bpid) { + } + + @Override + public void checkAndUpdate(String bpid, long blockId, File diskFile, + File diskMetaFile, FsVolumeSpi vol) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public List getVolumes() { + throw new UnsupportedOperationException(); + } + + @Override + public void addVolume( + final StorageLocation location, + final List nsInfos) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public DatanodeStorage getStorage(final String storageUuid) { + for (SimulatedStorage storage : storages) { + if (storageUuid.equals(storage.getStorageUuid())) { + return storage.getDnStorage(); + } + } + return null; + } + + @Override + public StorageReport[] getStorageReports(String bpid) { + List reports = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + reports.add(storage.getStorageReport(bpid)); + } + return reports.toArray(new StorageReport[0]); + } + + @Override + public List getFinalizedBlocks(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public List getFinalizedBlocksOnPersistentStorage(String bpid) { + throw new UnsupportedOperationException(); + } + + @Override + public Map getVolumeInfoMap() { + throw new UnsupportedOperationException(); + } + + @Override + public FsVolumeSpi getVolume(ExtendedBlock b) { + return getStorage(b.getLocalBlock()).getVolume(); + } + + @Override + public synchronized void removeVolumes(Set volumes, boolean clearFailure) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, + FileDescriptor fd, long offset, long nbytes, int flags) { + throw new UnsupportedOperationException(); + } + + @Override + public void onCompleteLazyPersist(String bpId, long blockId, + long creationTime, File[] savedFiles, FsVolumeSpi targetVolume) { + throw new UnsupportedOperationException(); + } + + @Override + public void onFailLazyPersist(String bpId, long blockId) { + throw new UnsupportedOperationException(); + } + + @Override + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setPinning(ExtendedBlock b) throws IOException { + getBlockMap(b).get(b.getLocalBlock()).pinned = true; + } + + @Override + public boolean getPinning(ExtendedBlock b) throws IOException { + return getBlockMap(b).get(b.getLocalBlock()).pinned; + } + + @Override + public boolean isDeletingBlock(String bpid, long blockId) { + throw new UnsupportedOperationException(); + } +} + + diff --git a/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh b/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh deleted file mode 100644 index 651e414c44..0000000000 --- a/dynamometer-infra/src/main/resources/scripts/create-dn-dir.sh +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env bash -# -# Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license. -# See LICENSE in the project root for license information. -# - -# Script for creating Dynamometer DataNode directories within YARN container. -# Usage: ./create-dn-dir.sh data_dir_list -# data_dir_list is a comma-separated list of data directories to populate - -dataDirs="$1" -# Strip off file:// prefix if present; convert to array -dataDirs=${dataDirs//file:\/\//} -dataDirs=(${dataDirs//,/ }) - -layoutVersion=$(${HADOOP_HOME}/bin/hadoop jar dynamometer.jar \ - com.linkedin.dynamometer.DataNodeLayoutVersionFetcher) -if [ "$layoutVersion" = -56 ]; then - layoutDirectoryMax=255 -elif [ "$layoutVersion" = -57 ]; then - layoutDirectoryMax=31 -else - echo "Invalid/unsupported layout version: $layoutVersion" - exit 1 -fi - -# We call this periodically so that if the application is killed before this completes, -# the script will exit itself rather than continuing to create new blocks -parentPID=$PPID -function exitIfParentIsDead() { - if ! kill -0 ${parentPID} 2>/dev/null; then - echo "Parent process died; exiting from ${0##*/}" - exit 1 - fi -} - -set -m - -metafile=`pwd`/scripts/metafile -versionFile=`pwd`/VERSION - -bpId=`cat ${versionFile} | grep blockpoolID | awk -F\= '{print $2}'` -clusterId=`cat ${versionFile} | grep clusterID | awk -F\= '{print $2}'` -namespaceID=`cat ${versionFile} | grep namespaceID | awk -F\= '{print $2}'` - -chmod 644 ${metafile} - -dnUUID=$(uuidgen) - -for dataDir in ${dataDirs[@]}; do - mkdir -p "$dataDir/current" - cat > "$dataDir/current/VERSION" << EOF -clusterID=${clusterId} -namespaceID=${namespaceID} -cTime=0 -blockpoolID=${bpId} -datanodeUuid=${dnUUID} -storageID=DS-$(uuidgen) -storageType=DATA_NODE -layoutVersion=${layoutVersion} -EOF - bpDir="$dataDir/current/$bpId/current" - mkdir -p ${bpDir} - mkdir ${bpDir}/rbw - cp "$dataDir/current/VERSION" ${bpDir}/VERSION - finDir=${bpDir}/finalized - mkdir ${finDir} - - for p in `seq 0 ${layoutDirectoryMax}`; do - mkdir_args=() - for q in `seq 0 ${layoutDirectoryMax}`; do - mkdir_args[$((q+1))]="${finDir}/subdir${p}/subdir${q}" - done - mkdir -p "${mkdir_args[@]}" - exitIfParentIsDead - done -done - -exitIfParentIsDead - -while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done - -exitIfParentIsDead - -blkList=`pwd`/block -blkIdx=0 - -blkListSplitDir="`pwd`/blkListSplit" -mkdir -p "$blkListSplitDir" - -max_parallelism=100 -lines_per_split=5000 -blkCnt=`wc -l "$blkList" | awk '{ print $1 }'` -if [ ${blkCnt} -gt $((max_parallelism * lines_per_split)) ]; then - lines_per_split=$((blkCnt / max_parallelism)) -fi - -echo "Beginning to create $blkCnt blocks using $lines_per_split blocks per process" - -split -a 3 -l "$lines_per_split" "$blkList" "$blkListSplitDir/blkList" - -function read_split() { - splitFile="$1" - blkIdx=0 - - while read blkInfo; do - dataDirIdx=$((blkIdx % ${#dataDirs[@]})) - dataDir=${dataDirs[$dataDirIdx]}/current/${bpId}/current/finalized - - blkInfoArray=(${blkInfo//,/ }) - blkId=${blkInfoArray[0]} - blkGs=${blkInfoArray[1]} - blkSz=${blkInfoArray[2]} - - d1=$(( (blkId>>16) & layoutDirectoryMax )) - d2=$(( (blkId>>8) & layoutDirectoryMax )) - - blkDir=${dataDir}/subdir${d1}/subdir${d2} - - truncate -s ${blkSz} ${blkDir}/blk_${blkId} - ln -s ${metafile} ${blkDir}/blk_${blkId}_${blkGs}.meta - - blkIdx=$((blkIdx+1)) - - if [ $((blkIdx % 1000)) == 0 ]; then - echo "For split file '$splitFile', created $blkIdx of $lines_per_split blocks" - exitIfParentIsDead - fi - - done < "$splitFile" -} - -for splitFile in "$blkListSplitDir/blkList"*; do - read_split "$splitFile" & -done - -while [ 1 ]; do fg 2> /dev/null; [ $? == 1 ] && break; done - -echo "Finished creating $blkCnt blocks" - -rm -rf "$blkListSplitDir" \ No newline at end of file diff --git a/dynamometer-infra/src/main/resources/scripts/metafile b/dynamometer-infra/src/main/resources/scripts/metafile deleted file mode 100644 index 6f220b46b9..0000000000 Binary files a/dynamometer-infra/src/main/resources/scripts/metafile and /dev/null differ diff --git a/dynamometer-infra/src/main/resources/scripts/start-component.sh b/dynamometer-infra/src/main/resources/start-component.sh similarity index 80% rename from dynamometer-infra/src/main/resources/scripts/start-component.sh rename to dynamometer-infra/src/main/resources/start-component.sh index 1c596424fd..ebdb0aece5 100755 --- a/dynamometer-infra/src/main/resources/scripts/start-component.sh +++ b/dynamometer-infra/src/main/resources/start-component.sh @@ -8,14 +8,17 @@ # USAGE: # ./start-component.sh namenode hdfs_storage # OR -# ./start-component.sh datanode nn_servicerpc_address +# ./start-component.sh datanode nn_servicerpc_address sleep_time_sec # First parameter should be component being launched, either `datanode` or `namenode` # If component is namenode, hdfs_storage is expected to point to a location to # write out shared files such as the file containing the information about # which ports the NN started on (at nn_info.prop) and the namenode's metrics # (at namenode_metrics) -# If component is datanode, nn_servicerpc_address is expected to point to -# servicerpc address of the namenode +# If component is datanode, nn_servicerpc_address is expected to point to the +# servicerpc address of the namenode. sleep_time_sec is the amount of time that +# should be allowed to elapse before launching anything. The +# `com.linkedin.dynamometer.SimulatedDataNodes` class will be used to start multiple +# DataNodes within the same JVM, and they will store their block files in memory. component="$1" if [[ "$component" != "datanode" && "$component" != "namenode" ]]; then @@ -29,11 +32,12 @@ if [ "$component" = "namenode" ]; then fi hdfsStoragePath="$2" else - if [ $# -lt 2 ]; then + if [ $# -lt 3 ]; then echo "Not enough arguments for DataNode" exit 1 fi nnServiceRpcAddress="$2" + launchDelaySec="$3" fi containerID=${CONTAINER_ID##*_} @@ -142,33 +146,49 @@ if [ "$component" = "datanode" ]; then done dataDirs=${dataDirs:1} - ./scripts/create-dn-dir.sh "$dataDirs" - if [ $? -ne 0 ]; then - echo "Unable to create datanode directories" - exit 1 - fi - - ipcPort=`find_available_port "$baseIpcPort"` - httpPort=`find_available_port "$baseHttpPort"` - serverPort=`find_available_port "$baseServerPort"` - - read -r -d '' datanodeConfigs </dev/null; then + echo "Parent process ($PPID) exited while waiting; now exiting" + exit 0 + fi + done + + versionFile="`pwd`/VERSION" + bpId=`cat "${versionFile}" | grep blockpoolID | awk -F\= '{print $2}'` + listingFiles=() + blockDir="`pwd`/blocks" + for listingFile in ${blockDir}/*; do + listingFiles+=("file://${listingFile}") + done + + localHostname=`hostname` + + read -r -d '' datanodeClusterConfigs <