From f4ad3d00e3d479c8daa81ed8e659f4aee5771336 Mon Sep 17 00:00:00 2001 From: Song Guyang Date: Wed, 13 Jun 2018 13:41:34 +0800 Subject: [PATCH 1/2] add ray on yarn project --- devops/yarn/README.md | 47 + devops/yarn/checkstyle.xml | 241 ++++ devops/yarn/deploy/run.sh | 11 + devops/yarn/pom.xml | 84 ++ .../org/ray/on/yarn/ApplicationMaster.java | 1260 +++++++++++++++++ .../src/main/java/org/ray/on/yarn/Client.java | 831 +++++++++++ .../java/org/ray/on/yarn/DsConstants.java | 34 + .../org/ray/on/yarn/Log4jPropertyHelper.java | 37 + .../java/org/ray/on/yarn/RayNodeContext.java | 22 + .../yarn/src/test/resources/log4j.properties | 19 + 10 files changed, 2586 insertions(+) create mode 100644 devops/yarn/README.md create mode 100644 devops/yarn/checkstyle.xml create mode 100644 devops/yarn/deploy/run.sh create mode 100644 devops/yarn/pom.xml create mode 100644 devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java create mode 100644 devops/yarn/src/main/java/org/ray/on/yarn/Client.java create mode 100644 devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java create mode 100644 devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java create mode 100644 devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java create mode 100644 devops/yarn/src/test/resources/log4j.properties diff --git a/devops/yarn/README.md b/devops/yarn/README.md new file mode 100644 index 000000000000..c836978f4533 --- /dev/null +++ b/devops/yarn/README.md @@ -0,0 +1,47 @@ +# Prerequisites + +1. Yarn app of Ray is run under the Hadoop environment version 2.8.0. You can get the Hadoop binary from [here](http://archive.apache.org/dist/hadoop/common/hadoop-2.8.0/hadoop-2.8.0.tar.gz). + +# Workthrough + +#### Yarn environment and configuration + +Firstly, you should have an available hadoop yarn environment and configuration. + +#### Prepare ray on yarn jar file + +```shell +$ make clean package +``` + +#### Prepare deploy zip file + +```shell +$ cd deploy +$ zip -r deploy.zip . +``` +Please modify the script 'deploy/run.sh' on-demand. + +#### Run + +```shell +$ /path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --jar ./target/ray-on-yarn-1.0.jar --rayArchive ./deploy/deploy.zip --containerVcores 2 --containerMemory 2048 --priority 10 --shellCmdPriority 10 --numRoles 1 1 --queue ray --headNodeStaticArgs "'--num-cpus 4 --num-gpus 4'" --workNodeStaticArgs "'--num-cpus 2 --num-gpus 2'" +``` + +Please modify the command line on-demand. Some detail about the input args is in the help doc. + +```shell +/path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --help +``` + +#### Monitoring + +Please check the logs depend on your yarn platform. + +#### Stop + +```shell +$ /path/to/hadoop-2.8.0/bin/yarn application -kill {app_id} +``` + +`{app_id}` shall be replaced by the ID of the corresponding Yarn application, e.g. `application_1505745052163_0107`. diff --git a/devops/yarn/checkstyle.xml b/devops/yarn/checkstyle.xml new file mode 100644 index 000000000000..fc5904d1d710 --- /dev/null +++ b/devops/yarn/checkstyle.xml @@ -0,0 +1,241 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/devops/yarn/deploy/run.sh b/devops/yarn/deploy/run.sh new file mode 100644 index 000000000000..559126ae03cf --- /dev/null +++ b/devops/yarn/deploy/run.sh @@ -0,0 +1,11 @@ +#! /bin/sh +echo "######## The script running... ########" +echo "input args are:" +for arg in "$@" +do + echo " " $arg +done +# Please add your shell +ray start "$@" +echo "######## End of sleep ... ########" +sleep 1h diff --git a/devops/yarn/pom.xml b/devops/yarn/pom.xml new file mode 100644 index 000000000000..8603940d05d9 --- /dev/null +++ b/devops/yarn/pom.xml @@ -0,0 +1,84 @@ + + + 4.0.0 + + org.ray + ray-on-yarn + 1.0 + + + 2.8.0 + 1.8 + 1.8 + UTF-8 + + + + + org.apache.hadoop + hadoop-yarn-common + ${hadoop.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + + org.apache.hadoop + hadoop-yarn-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-yarn-common + test-jar + ${hadoop.version} + test + + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + 6.19 + + + + + validate + validate + + check + + + + + checkstyle.xml + UTF-8 + true + false + false + warning + xml + html + ${project.build.directory}/test/checkstyle-errors.xml + false + + + + + + \ No newline at end of file diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java b/devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java new file mode 100644 index 000000000000..b8bc88ba4da0 --- /dev/null +++ b/devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java @@ -0,0 +1,1260 @@ +package org.ray.on.yarn; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.sun.jersey.api.client.ClientHandlerException; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.StringReader; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.log4j.LogManager; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ApplicationMaster { + + private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + + @VisibleForTesting + @Private + public static enum DsEvent { + DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END + } + + @VisibleForTesting + @Private + public static enum DsEntity { + DS_APP_ATTEMPT, DS_CONTAINER + } + + private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; + // Configuration + private Configuration conf; + // Handle to communicate with the Resource Manager + @SuppressWarnings("rawtypes") + private AMRMClientAsync amRmClient; + // In both secure and non-secure modes, this points to the job-submitter. + @VisibleForTesting + UserGroupInformation appSubmitterUgi; + // Handle to communicate with the Node Manager + private NMClientAsync nmClientAsync; + // Listen to process the response from the Node Manager + private NmCallbackHandler containerListener; + // Application Attempt Id ( combination of attemptId and fail count ) + @VisibleForTesting + protected ApplicationAttemptId appAttemptId; + // Hostname of the container + private String appMasterHostname = ""; + // Port on which the app master listens for status updates from clients + private int appMasterRpcPort = -1; + // Tracking url to which app master publishes info for clients to monitor + private String appMasterTrackingUrl = ""; + // App Master configuration + // No. of containers to run shell command on + @VisibleForTesting + protected int numTotalContainers = 1; + // Memory to request for the container on which the shell command will run + private long containerMemory = 10; + // VirtualCores to request for the container on which the shell command will run + private int containerVirtualCores = 1; + // Priority of the request + private int requestPriority; + // No. of each of the Ray roles including head and work + private Map numRoles = Maps.newHashMapWithExpectedSize(2); + private RayNodeContext[] indexToNode = null; + private Map containerToNode = Maps.newHashMap(); + + // The default value should be consistent with Ray RunParameters + private int redisPort = 34222; + private String redisAddress; + + // Counter for completed containers ( complete denotes successful or failed ) + private AtomicInteger numCompletedContainers = new AtomicInteger(); + // Allocated container count so that we know how many containers has the RM + // allocated to us + @VisibleForTesting + protected AtomicInteger numAllocatedContainers = new AtomicInteger(); + // Count of failed containers + private AtomicInteger numFailedContainers = new AtomicInteger(); + // Count of containers already requested from the RM + // Needed as once requested, we should not request for containers again. + // Only request for more if the original requirement changes. + @VisibleForTesting + protected AtomicInteger numRequestedContainers = new AtomicInteger(); + + // Shell command to be executed + private String shellCommand = ""; + // Args to be passed to the shell command + private String shellArgs = ""; + // Env variables to be setup for the shell command + private Map shellEnv = new HashMap(); + + // Location of shell script ( obtained from info set in env ) + // Shell script path in fs + private String rayArchiveFile = ""; + // Timestamp needed for creating a local resource + private long rayArchiveFileTimestamp = 0; + // File length needed for local resource + private long rayArchiveFileLen = 0; + + // Timeline domain ID + private String domainId = null; + + // Hardcoded path to shell script in launch container's local env + private static final String rayShellStringPath = "run.sh"; + // Hardcoded path to custom log_properties + private static final String log4jPath = "log4j.properties"; + private static final String shellCommandPath = "shellCommands"; + private static final String shellArgsPath = "shellArgs"; + + private volatile boolean done; + + private ByteBuffer allTokens; + + // Launch threads + private List launchThreads = new ArrayList(); + + // Timeline Client + @VisibleForTesting + TimelineClient timelineClient; + static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; + static final String APPID_TIMELINE_FILTER_NAME = "appId"; + static final String USER_TIMELINE_FILTER_NAME = "user"; + static final String LINUX_BASH_COMMEND = "bash"; + + private int rayInstanceCounter = 1; + + // the static args of head node + private String headNodeStaticArgs = null; + //the static args of head node + private String workNodeStaticArgs = null; + // supremeFo flag + private boolean supremeFo = false; + // disable process failover flag + private boolean disableProcessFo = true; + + @VisibleForTesting + protected final Set launchedContainers = + Collections.newSetFromMap(new ConcurrentHashMap()); + + /** + * The main entrance of appMaster. + * @param args Command line args + */ + public static void main(String[] args) { + boolean result = false; + try { + ApplicationMaster appMaster = new ApplicationMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + appMaster.run(); + result = appMaster.finish(); + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + LogManager.shutdown(); + ExitUtil.terminate(1, t); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } + + /** + * Dump out contents of $CWD and the environment to stdout for debugging. + */ + private void dumpOutDebugInfo() { + + LOG.info("Dump debug output"); + Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); + System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue()); + } + + BufferedReader buf = null; + try { + String lines = Shell.execCommand("ls", "-al"); + buf = new BufferedReader(new StringReader(lines)); + String line = ""; + while ((line = buf.readLine()) != null) { + LOG.info("System CWD content: " + line); + System.out.println("System CWD content: " + line); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + IOUtils.cleanup(LOG, buf); + } + } + + public ApplicationMaster() { + // Set up the configuration + conf = new YarnConfiguration(); + } + + /** + * Parse command line options. + * + * @param args Command line args + * @return Whether init successful and run should be invoked + */ + public boolean init(String[] args) throws ParseException, IOException { + Options opts = new Options(); + opts.addOption("appAttemptId", true, + "App Attempt ID. Not to be used unless for testing purposes"); + opts.addOption("shellEnv", true, + "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("containerMemory", true, + "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("containerVcores", true, + "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("numContainers", true, + "No. of containers on which the shell command needs to be executed"); + opts.addOption("numRoles", true, + "No. of the Ray roles including head and work"); + opts.getOption("numRoles").setArgs(2); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("headNodeStaticArgs", true, + "the static args which is needed when start head node"); + opts.addOption("workNodeStaticArgs", true, + "the static args which is needed when start work node"); + opts.addOption("supremeFo", false, "use supreme failover strategy"); + opts.addOption("disableProcessFo", false, "disable process failover"); + + opts.addOption("help", false, "Print usage"); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + printUsage(opts); + throw new IllegalArgumentException("No args specified for application master to initialize"); + } + + // Check whether customer log4j.properties file exists + if (fileExist(log4jPath)) { + try { + Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, log4jPath); + } catch (Exception e) { + LOG.warn("Can not set up custom log4j properties. " + e); + } + } + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + + if (cliParser.hasOption("debug")) { + dumpOutDebugInfo(); + } + + if (cliParser.hasOption("headNodeStaticArgs")) { + headNodeStaticArgs = cliParser.getOptionValue("headNodeStaticArgs"); + } + + if (cliParser.hasOption("workNodeStaticArgs")) { + workNodeStaticArgs = cliParser.getOptionValue("workNodeStaticArgs"); + } + + if (cliParser.hasOption("supremeFo")) { + supremeFo = true; + } + + if (cliParser.hasOption("disableProcessFo")) { + disableProcessFo = true; + } + + Map envs = System.getenv(); + + if (!envs.containsKey(Environment.CONTAINER_ID.name())) { + if (cliParser.hasOption("appAttemptId")) { + String appIdStr = cliParser.getOptionValue("appAttemptId", ""); + appAttemptId = ApplicationAttemptId.fromString(appIdStr); + } else { + throw new IllegalArgumentException("Application Attempt Id not set in the environment"); + } + } else { + ContainerId containerId = ContainerId.fromString(envs.get(Environment.CONTAINER_ID.name())); + appAttemptId = containerId.getApplicationAttemptId(); + } + + if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { + throw new RuntimeException( + ApplicationConstants.APP_SUBMIT_TIME_ENV + " not set in the environment"); + } + if (!envs.containsKey(Environment.NM_HOST.name())) { + throw new RuntimeException(Environment.NM_HOST.name() + " not set in the environment"); + } + if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { + throw new RuntimeException(Environment.NM_HTTP_PORT + " not set in the environment"); + } + if (!envs.containsKey(Environment.NM_PORT.name())) { + throw new RuntimeException(Environment.NM_PORT.name() + " not set in the environment"); + } + + LOG.info("Application master for app" + ", appId=" + appAttemptId.getApplicationId().getId() + + ", clustertimestamp=" + appAttemptId.getApplicationId().getClusterTimestamp() + + ", attemptId=" + appAttemptId.getAttemptId()); + + if (!fileExist(shellCommandPath) && envs.get(DsConstants.RAY_ARCHIVE_LOCATION).isEmpty()) { + throw new IllegalArgumentException( + "No shell command or shell script specified to be executed by application master"); + } + + if (fileExist(shellCommandPath)) { + shellCommand = readContent(shellCommandPath); + } + + if (fileExist(shellArgsPath)) { + shellArgs = readContent(shellArgsPath); + } + + if (cliParser.hasOption("shellEnv")) { + String[] shellEnvs = cliParser.getOptionValues("shellEnv"); + for (String env : shellEnvs) { + env = env.trim(); + int index = env.indexOf('='); + if (index == -1) { + shellEnv.put(env, ""); + continue; + } + String key = env.substring(0, index); + String val = ""; + if (index < (env.length() - 1)) { + val = env.substring(index + 1); + } + shellEnv.put(key, val); + } + } + + if (envs.containsKey(DsConstants.RAY_ARCHIVE_LOCATION)) { + rayArchiveFile = envs.get(DsConstants.RAY_ARCHIVE_LOCATION); + + if (envs.containsKey(DsConstants.RAY_ARCHIVE_TIMESTAMP)) { + rayArchiveFileTimestamp = Long.parseLong(envs.get(DsConstants.RAY_ARCHIVE_TIMESTAMP)); + } + if (envs.containsKey(DsConstants.RAY_ARCHIVE_LEN)) { + rayArchiveFileLen = Long.parseLong(envs.get(DsConstants.RAY_ARCHIVE_LEN)); + } + if (!rayArchiveFile.isEmpty() && (rayArchiveFileTimestamp <= 0 || rayArchiveFileLen <= 0)) { + LOG.error("Illegal values in env for shell script path" + ", path=" + rayArchiveFile + + ", len=" + rayArchiveFileLen + ", timestamp=" + rayArchiveFileTimestamp); + throw new IllegalArgumentException("Illegal values in env for shell script path"); + } + } + + if (envs.containsKey(DsConstants.RAY_TIMELINE_DOMAIN)) { + domainId = envs.get(DsConstants.RAY_TIMELINE_DOMAIN); + } + + containerMemory = Integer.parseInt(cliParser.getOptionValue("containerMemory", "10")); + containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("containerVcores", "1")); + numTotalContainers = Integer.parseInt(cliParser.getOptionValue("numContainers", "1")); + if (numTotalContainers == 0) { + throw new IllegalArgumentException("Cannot run distributed shell with no containers"); + } + requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + + if (cliParser.hasOption("numRoles")) { + String[] optNumRoles = cliParser.getOptionValues("numRoles"); + numRoles.put("head", Integer.parseInt(optNumRoles[0])); + numRoles.put("work", Integer.parseInt(optNumRoles[1])); + // Argument check has been done in Client + } else { + numRoles.put("head", 1); + numRoles.put("work", 0); + } + + numTotalContainers = numRoles.get("head") + numRoles.get("work"); + + indexToNode = new RayNodeContext[numTotalContainers]; + int i = 0; + if (numRoles.get("head") == 1) { + indexToNode[i] = new RayNodeContext("head"); + ++i; + } + for (int j = 0; j < numRoles.get("work"); ++j) { + indexToNode[i] = new RayNodeContext("work"); + ++i; + } + assert numTotalContainers == i; + return true; + } + + /** + * Helper function to print usage. + * + * @param opts Parsed command line options + */ + private void printUsage(Options opts) { + new HelpFormatter().printHelp("ApplicationMaster", opts); + } + + @SuppressWarnings("unchecked") + private int setupContainerRequest() { + int requestCount = 0; + for (RayNodeContext nodeContext : indexToNode) { + if (nodeContext.isRunning == false && nodeContext.isAlocating == false) { + ContainerRequest containerAsk = setupContainerAskForRm(); + amRmClient.addContainerRequest(containerAsk); + requestCount++; + nodeContext.isAlocating = true; + LOG.info("Setup container request: " + containerAsk); + } + } + LOG.info("Setup container request, count is " + requestCount); + return requestCount; + } + + /** + * Main run function for the application master. + */ + public void run() throws YarnException, IOException, InterruptedException { + LOG.info("Starting ApplicationMaster"); + + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + // Now remove the AM->RM token so that containers cannot access it. + Iterator> iter = credentials.getAllTokens().iterator(); + LOG.info("Executing with tokens:"); + while (iter.hasNext()) { + Token token = iter.next(); + LOG.info(token); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove(); + } + } + allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + // Create appSubmitterUgi and add original tokens to it + String appSubmitterUserName = System.getenv(ApplicationConstants.Environment.USER.name()); + appSubmitterUgi = UserGroupInformation.createRemoteUser(appSubmitterUserName); + appSubmitterUgi.addCredentials(credentials); + + AMRMClientAsync.AbstractCallbackHandler allocListener = new RmCallbackHandler(); + amRmClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); + amRmClient.init(conf); + amRmClient.start(); + + containerListener = createNmCallbackHandler(); + nmClientAsync = new NMClientAsyncImpl(containerListener); + nmClientAsync.init(conf); + nmClientAsync.start(); + + // startTimelineClient(conf); + if (timelineClient != null) { + publishApplicationAttemptEvent(timelineClient, appAttemptId.toString(), + DsEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); + } + + // Setup local RPC Server to accept status requests directly from clients + // TODO need to setup a protocol for client to be able to communicate to + // the RPC server + // TODO use the rpc port info to register with the RM for the client to + // send requests to this app master + + // Register self with ResourceManager + // This will start heartbeating to the RM + appMasterHostname = NetUtils.getHostname(); + RegisterApplicationMasterResponse response = amRmClient + .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); + // Dump out information about cluster capability as seen by the + // resource manager + long maxMem = response.getMaximumResourceCapability().getMemorySize(); + LOG.info("Max mem capability of resources in this cluster " + maxMem); + + int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max vcores capability of resources in this cluster " + maxVCores); + + // A resource ask cannot exceed the max. + if (containerMemory > maxMem) { + LOG.info("Container memory specified above max threshold of cluster." + " Using max value." + + ", specified=" + containerMemory + ", max=" + maxMem); + containerMemory = maxMem; + } + + if (containerVirtualCores > maxVCores) { + LOG.info("Container virtual cores specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores); + containerVirtualCores = maxVCores; + } + + List previousAmRunningContainers = response.getContainersFromPreviousAttempts(); + LOG.info(appAttemptId + " received " + previousAmRunningContainers.size() + + " previous attempts' running containers on AM registration."); + for (Container container : previousAmRunningContainers) { + launchedContainers.add(container.getId()); + } + numAllocatedContainers.addAndGet(previousAmRunningContainers.size()); + + int numTotalContainersToRequest = numTotalContainers - previousAmRunningContainers.size(); + + if (previousAmRunningContainers.size() > 0) { + // TODO: support failover about recovery ray node context + LOG.warn("Some previous containers found."); + } + // Setup ask for containers from RM + // Send request for containers to RM + // Until we get our fully allocated quota, we keep on polling RM for + // containers + // Keep looping until all the containers are launched and shell script + // executed on them ( regardless of success/failure). + int requestCount = setupContainerRequest(); + + assert requestCount == numTotalContainersToRequest : "The request count is inconsistent: " + + requestCount + " != " + numTotalContainersToRequest; + + // for (int i = 0; i < numTotalContainersToRequest; ++i) { + // ContainerRequest containerAsk = setupContainerAskForRM(null); + // amRMClient.addContainerRequest(containerAsk); + // } + numRequestedContainers.set(numTotalContainers); + } + + @VisibleForTesting + void startTimelineClient(final Configuration conf) + throws YarnException, IOException, InterruptedException { + try { + appSubmitterUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + // Creating the Timeline Client + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + } else { + timelineClient = null; + LOG.warn("Timeline service is not enabled"); + } + return null; + } + }); + } catch (UndeclaredThrowableException e) { + throw new YarnException(e.getCause()); + } + } + + @VisibleForTesting + NmCallbackHandler createNmCallbackHandler() { + return new NmCallbackHandler(this); + } + + @VisibleForTesting + protected boolean finish() { + // wait for completion. + while (!done && (numCompletedContainers.get() != numTotalContainers)) { + try { + Thread.sleep(200); + } catch (InterruptedException ex) { + LOG.warn("Catch InterruptedException when sleep."); + } + } + + if (timelineClient != null) { + publishApplicationAttemptEvent(timelineClient, appAttemptId.toString(), + DsEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); + } + + // Join all launched threads + // needed for when we time out + // and we need to release containers + for (Thread launchThread : launchThreads) { + try { + launchThread.join(10000); + } catch (InterruptedException e) { + LOG.info("Exception thrown in thread join: " + e.getMessage()); + e.printStackTrace(); + } + } + + // When the application completes, it should stop all running containers + LOG.info("Application completed. Stopping running containers"); + nmClientAsync.stop(); + + // When the application completes, it should send a finish application + // signal to the RM + LOG.info("Application completed. Signalling finish to RM"); + + FinalApplicationStatus appStatus; + String appMessage = null; + boolean success = true; + if (numFailedContainers.get() == 0 && numCompletedContainers.get() == numTotalContainers) { + appStatus = FinalApplicationStatus.SUCCEEDED; + } else { + appStatus = FinalApplicationStatus.FAILED; + appMessage = "Diagnostics." + ", total=" + numTotalContainers + ", completed=" + + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + + ", failed=" + numFailedContainers.get(); + LOG.info(appMessage); + success = false; + } + try { + amRmClient.unregisterApplicationMaster(appStatus, appMessage, null); + } catch (YarnException ex) { + LOG.error("Failed to unregister application", ex); + } catch (IOException e) { + LOG.error("Failed to unregister application", e); + } + + amRmClient.stop(); + + // Stop Timeline Client + if (timelineClient != null) { + timelineClient.stop(); + } + + return success; + } + + @VisibleForTesting + class RmCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { + @Override + public void onContainersCompleted(List completedContainers) { + Boolean restartClasterFlag = false; + LOG.info( + "Got response from RM for container ask, completedCnt=" + completedContainers.size()); + for (ContainerStatus containerStatus : completedContainers) { + LOG.info(appAttemptId + " got container status for containerID=" + + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + + containerStatus.getDiagnostics()); + + // non complete containers should not be here + assert (containerStatus.getState() == ContainerState.COMPLETE); + // ignore containers we know nothing about - probably from a previous + // attempt + if (!launchedContainers.contains(containerStatus.getContainerId())) { + LOG.info("Ignoring completed status of " + containerStatus.getContainerId() + + "; unknown container(probably launched by previous attempt)"); + continue; + } + + // increment counters for completed/failed containers + int exitStatus = containerStatus.getExitStatus(); + if (0 != exitStatus) { + // container failed + LOG.info("container failed, exit status is " + exitStatus); + for (RayNodeContext node : indexToNode) { + if (node.container != null + && node.container.getId().equals(containerStatus.getContainerId())) { + LOG.info("ray node failed, the role is " + node.role); + if (-100 == exitStatus) { /* release container will return -100 */ + if (node.isRunning == false) { + LOG.info("release container will return -100, don't process it"); + break; + } else { + LOG.warn("the exit status is -100, but this node should be running"); + } + } + node.isRunning = false; + node.isAlocating = false; + node.instanceId = null; + node.container = null; + node.failCounter++; + + if (disableProcessFo) { + LOG.info("process failover is disable, ignore container failed"); + break; + } + + if (supremeFo) { + LOG.info("Start supreme failover"); + restartClasterFlag = true; + } + + if (node.role == "head") { + restartClasterFlag = true; + } + numAllocatedContainers.decrementAndGet(); + numRequestedContainers.decrementAndGet(); + break; + } + } + + if (restartClasterFlag) { + LOG.info("restart all the Container of ray node"); + for (RayNodeContext node : indexToNode) { + if (node.isRunning && node.container != null) { + amRmClient.releaseAssignedContainer(node.container.getId()); + node.isRunning = false; + node.isAlocating = false; + node.instanceId = null; + node.container = null; + node.failCounter++; + numAllocatedContainers.decrementAndGet(); + numRequestedContainers.decrementAndGet(); + } + } + } + } else { + // nothing to do + // container completed successfully + numCompletedContainers.incrementAndGet(); + LOG.info("Container completed successfully." + ", containerId=" + + containerStatus.getContainerId()); + } + if (timelineClient != null) { + publishContainerEndEvent(timelineClient, containerStatus, domainId, appSubmitterUgi); + } + + if (restartClasterFlag) { + break; + } + } + + // ask for more containers if any failed + int askCount = numTotalContainers - numRequestedContainers.get(); + numRequestedContainers.addAndGet(askCount); + + int requestCount = setupContainerRequest(); + assert requestCount == askCount : "The request count is inconsistent(onContainersCompleted): " + + requestCount + " != " + askCount; + + if (numCompletedContainers.get() == numTotalContainers) { + done = true; + } + } + + @Override + public void onContainersAllocated(List allocatedContainers) { + LOG.info( + "Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); + numAllocatedContainers.addAndGet(allocatedContainers.size()); + for (Container allocatedContainer : allocatedContainers) { + String rayInstanceId = Integer.toString(rayInstanceCounter); + rayInstanceCounter++; + + Thread launchThread = null; + boolean shouldSleep = false; + for (RayNodeContext node : indexToNode) { + if (node.isRunning) { + continue; + } + node.isRunning = true; + node.isAlocating = false; + node.instanceId = rayInstanceId; + node.container = allocatedContainer; + containerToNode.put(allocatedContainer.getId().toString(), node); + if (node.role == "head") { + try { + redisAddress = + InetAddress.getByName(allocatedContainer.getNodeHttpAddress().split(":")[0]) + .getHostAddress() + ":" + redisPort; + } catch (UnknownHostException e) { + redisAddress = ""; + } + } else { + shouldSleep = true; + } + launchThread = createLaunchContainerThread(allocatedContainer, rayInstanceId, node.role, + shouldSleep ? 20000 : 0); + break; + } + + if (launchThread == null) { + LOG.error("The container " + allocatedContainer + " unused!"); + break; + } + + LOG.info("Launching Ray instance on a new container." + ", containerId=" + + allocatedContainer.getId() + ", rayInstanceId=" + rayInstanceId + ", containerNode=" + + allocatedContainer.getNodeId().getHost() + ":" + + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory=" + + allocatedContainer.getResource().getMemorySize() + ", containerResourceVirtualCores=" + + allocatedContainer.getResource().getVirtualCores()); + // + ", containerToken" + // +allocatedContainer.getContainerToken().getIdentifier().toString()); + + // launch and start the container on a separate thread to keep + // the main thread unblocked + // as all containers may not be allocated at one go. + launchThreads.add(launchThread); + launchedContainers.add(allocatedContainer.getId()); + launchThread.start(); + } + } + + @Override + public void onContainersUpdated(List containers) {} + + @Override + public void onShutdownRequest() { + done = true; + } + + @Override + public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + // set progress to deliver to RM on next heartbeat + float progress = (float) numCompletedContainers.get() / numTotalContainers; + return progress; + } + + @Override + public void onError(Throwable e) { + LOG.error("Error in RMCallbackHandler: ", e); + done = true; + amRmClient.stop(); + } + } + + @VisibleForTesting + static class NmCallbackHandler extends NMClientAsync.AbstractCallbackHandler { + + private ConcurrentMap containers = + new ConcurrentHashMap(); + private final ApplicationMaster applicationMaster; + + public NmCallbackHandler(ApplicationMaster applicationMaster) { + this.applicationMaster = applicationMaster; + } + + public void addContainer(ContainerId containerId, Container container) { + containers.putIfAbsent(containerId, container); + } + + @Override + public void onContainerStopped(ContainerId containerId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to stop Container " + containerId); + } + containers.remove(containerId); + } + + @Override + public void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus); + } + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map allServiceResponse) { + if (LOG.isDebugEnabled()) { + LOG.debug("Succeeded to start Container " + containerId); + } + Container container = containers.get(containerId); + if (container != null) { + applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); + } + if (applicationMaster.timelineClient != null) { + applicationMaster.publishContainerStartEvent(applicationMaster.timelineClient, container, + applicationMaster.domainId, applicationMaster.appSubmitterUgi); + } + } + + @Override + public void onContainerResourceIncreased(ContainerId containerId, Resource resource) {} + + @Override + public void onStartContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to start Container " + containerId); + containers.remove(containerId); + applicationMaster.numCompletedContainers.incrementAndGet(); + applicationMaster.numFailedContainers.incrementAndGet(); + } + + @Override + public void onGetContainerStatusError(ContainerId containerId, Throwable t) { + LOG.error("Failed to query the status of Container " + containerId); + } + + @Override + public void onStopContainerError(ContainerId containerId, Throwable t) { + LOG.error("Failed to stop Container " + containerId); + containers.remove(containerId); + } + + @Override + public void onIncreaseContainerResourceError(ContainerId containerId, Throwable t) {} + + } + + /** + * Thread to connect to the {@link ContainerManagementProtocol} and launch the container that will + * execute the shell command. + */ + private class LaunchContainerRunnable implements Runnable { + + // Allocated container + private Container container; + private String rayInstanceId; + private String role; + private long sleepMillis = 0; + + NmCallbackHandler containerListener; + + public LaunchContainerRunnable(Container lcontainer, NmCallbackHandler containerListener, + String rayInstanceId, String role, long sleepMillis) { + this.container = lcontainer; + this.containerListener = containerListener; + this.rayInstanceId = rayInstanceId; + this.role = role; + this.sleepMillis = sleepMillis; + } + + @Override + /** + * Connects to CM, sets up container launch context for shell command and eventually dispatches + * the container start request to the CM. + */ + public void run() { + LOG.info("Setting up container launch container for containerid=" + container.getId() + + " with rayInstanceId=" + rayInstanceId + " ,sleep millis " + sleepMillis); + + if (sleepMillis != 0) { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + LOG.warn("Catch InterruptedException when sleep."); + } + + } + // Set the local resources + Map localResources = new HashMap(); + + // The container for the eventual shell commands needs its own local + // resources too. + // In this scenario, if a shell script is specified, we need to have it + // copied and made available to the container. + String rayArchiveFileLocalPath = "Ray-package"; + if (!rayArchiveFile.isEmpty()) { + Path rayArchivePath = new Path(rayArchiveFile); + + URL yarnUrl = null; + try { + yarnUrl = URL.fromURI(new URI(rayArchivePath.toString())); + } catch (URISyntaxException e) { + LOG.error("Error when trying to use Ray archive path specified" + " in env, path=" + + rayArchivePath, e); + // A failure scenario on bad input such as invalid shell script path + // We know we cannot continue launching the container + // so we should release it. + // TODO + numCompletedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); + return; + } + LocalResource rayRsrc = LocalResource.newInstance(yarnUrl, LocalResourceType.ARCHIVE, + LocalResourceVisibility.APPLICATION, rayArchiveFileLen, rayArchiveFileTimestamp); + localResources.put(rayArchiveFileLocalPath, rayRsrc); + shellCommand = LINUX_BASH_COMMEND; + } + + // Set the necessary command to execute on the allocated container + Vector vargs = new Vector(5); + + // Set executable command + vargs.add(shellCommand); + // Set shell script path + if (!rayArchiveFile.isEmpty()) { + vargs.add(rayArchiveFileLocalPath + "/" + rayShellStringPath); + } + + // Set args based on role + switch (role) { + case "head": + vargs.add("--head"); + vargs.add("--redis-address"); + vargs.add(redisAddress); + if (headNodeStaticArgs != null) { + vargs.add(headNodeStaticArgs); + } + break; + case "work": + //vargs.add("--work"); + vargs.add("--redis_port"); + vargs.add(String.valueOf(redisPort)); + if (workNodeStaticArgs != null) { + vargs.add(workNodeStaticArgs); + } + break; + default: + break; + } + + try { + String nodeIpAddress = + InetAddress.getByName(container.getNodeHttpAddress().split(":")[0]).getHostAddress(); + vargs.add("--node-ip-address"); + vargs.add(nodeIpAddress); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + // Set args for the shell command if any + vargs.add(shellArgs); + + // Add log redirect params + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + LOG.info("command: " + commands); + + // Set up ContainerLaunchContext, setting local resource, environment, + // command and token for constructor. + + Map myShellEnv = new HashMap(shellEnv); + myShellEnv.put(YARN_SHELL_ID, rayInstanceId); + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(localResources, myShellEnv, + commands, null, allTokens.duplicate(), null); + containerListener.addContainer(container.getId(), container); + nmClientAsync.startContainerAsync(container, ctx); + } + } + + /** + * Setup the request that will be sent to the RM for the container ask. + * + * @return the setup ResourceRequest to be sent to RM + */ + private ContainerRequest setupContainerAskForRm() { + // setup requirements for hosts + // using * as any host will do for the distributed shell app + // set the priority for the request + // TODO - what is the range for priority? how to decide? + Priority pri = Priority.newInstance(requestPriority); + + // Set up resource type requirements + // For now, memory and CPU are supported so we set memory and cpu requirements + Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); + + ContainerRequest request = new ContainerRequest(capability, null, null, pri); + LOG.info("Requested container ask: " + request.toString()); + return request; + } + + private boolean fileExist(String filePath) { + return new File(filePath).exists(); + } + + private String readContent(String filePath) throws IOException { + DataInputStream ds = null; + try { + ds = new DataInputStream(new FileInputStream(filePath)); + return ds.readUTF(); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(ds); + } + } + + private void publishContainerStartEvent(final TimelineClient timelineClient, + final Container container, String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getId().toString()); + entity.setEntityType(DsEntity.DS_CONTAINER.toString()); + entity.setDomainId(domainId); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); + entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, + container.getId().getApplicationAttemptId().getApplicationId().toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(DsEvent.DS_CONTAINER_START.toString()); + event.addEventInfo("Node", container.getNodeId().toString()); + event.addEventInfo("Resources", container.getResource().toString()); + entity.addEvent(event); + + try { + processTimelineResponseErrors( + putContainerEntity(timelineClient, container.getId().getApplicationAttemptId(), entity)); + } catch (YarnException | IOException | ClientHandlerException e) { + LOG.error("Container start event could not be published for " + container.getId().toString(), + e); + } + } + + @VisibleForTesting + void publishContainerEndEvent(final TimelineClient timelineClient, ContainerStatus container, + String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(container.getContainerId().toString()); + entity.setEntityType(DsEntity.DS_CONTAINER.toString()); + entity.setDomainId(domainId); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); + entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, + container.getContainerId().getApplicationAttemptId().getApplicationId().toString()); + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType(DsEvent.DS_CONTAINER_END.toString()); + event.addEventInfo("State", container.getState().name()); + event.addEventInfo("Exit Status", container.getExitStatus()); + entity.addEvent(event); + try { + processTimelineResponseErrors(putContainerEntity(timelineClient, + container.getContainerId().getApplicationAttemptId(), entity)); + } catch (YarnException | IOException | ClientHandlerException e) { + LOG.error( + "Container end event could not be published for " + container.getContainerId().toString(), + e); + } + } + + private TimelinePutResponse putContainerEntity(TimelineClient timelineClient, + ApplicationAttemptId currAttemptId, TimelineEntity entity) throws YarnException, IOException { + if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { + TimelineEntityGroupId groupId = TimelineEntityGroupId + .newInstance(currAttemptId.getApplicationId(), CONTAINER_ENTITY_GROUP_ID); + return timelineClient.putEntities(currAttemptId, groupId, entity); + } else { + return timelineClient.putEntities(entity); + } + } + + private void publishApplicationAttemptEvent(final TimelineClient timelineClient, + String appAttemptId, DsEvent appEvent, String domainId, UserGroupInformation ugi) { + final TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(appAttemptId); + entity.setEntityType(DsEntity.DS_APP_ATTEMPT.toString()); + entity.setDomainId(domainId); + entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); + TimelineEvent event = new TimelineEvent(); + event.setEventType(appEvent.toString()); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + try { + TimelinePutResponse response = timelineClient.putEntities(entity); + processTimelineResponseErrors(response); + } catch (YarnException | IOException | ClientHandlerException e) { + LOG.error("App Attempt " + (appEvent.equals(DsEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + + " event could not be published for " + appAttemptId.toString(), e); + } + } + + private TimelinePutResponse processTimelineResponseErrors(TimelinePutResponse response) { + List errors = response.getErrors(); + if (errors.size() == 0) { + LOG.debug("Timeline entities are successfully put"); + } else { + for (TimelinePutResponse.TimelinePutError error : errors) { + LOG.error("Error when publishing entity [" + error.getEntityType() + "," + + error.getEntityId() + "], server side error code: " + error.getErrorCode()); + } + } + return response; + } + + RmCallbackHandler getRmCallbackHandler() { + return new RmCallbackHandler(); + } + + @SuppressWarnings("rawtypes") + @VisibleForTesting + void setAmRmClient(AMRMClientAsync client) { + this.amRmClient = client; + } + + @VisibleForTesting + int getNumCompletedContainers() { + return numCompletedContainers.get(); + } + + @VisibleForTesting + boolean getDone() { + return done; + } + + @VisibleForTesting + Thread createLaunchContainerThread(Container allocatedContainer, String shellId, String role, + long sleepMillis) { + LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable( + allocatedContainer, containerListener, shellId, role, sleepMillis); + return new Thread(runnableLaunchContainer); + } +} diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/Client.java b/devops/yarn/src/main/java/org/ray/on/yarn/Client.java new file mode 100644 index 000000000000..0b69a8b0e98b --- /dev/null +++ b/devops/yarn/src/main/java/org/ray/on/yarn/Client.java @@ -0,0 +1,831 @@ +package org.ray.on.yarn; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class Client { + + private static final Log LOG = LogFactory.getLog(Client.class); + + // Configuration + private Configuration conf; + private YarnClient yarnClient; + // Application master specific info to register a new Application with RM/ASM + private String appName = ""; + // App master priority + private int amPriority = 0; + // Queue for App master + private String amQueue = ""; + // Amt. of memory resource to request for to run the App Master + private long amMemory = 100; + // Amt. of virtual core resource to request for to run the App Master + private int amVCores = 1; + // Application master jar file + private String appMasterJar = ""; + // Main class to invoke application master + private final String appMasterMainClass; + // Shell command to be executed + private String shellCommand = ""; + // Location of ray archive + private String rayArchiveFile = ""; + // Args to be passed to the shell command + private String[] shellArgs = new String[] {}; + // Env variables to be setup for the shell command + private Map shellEnv = new HashMap(); + // Shell Command Container priority + private int shellCmdPriority = 0; + // Amt of memory to request for container in which shell script will be executed + private int containerMemory = 10; + // Amt. of virtual cores to request for container in which shell script will be executed + private int containerVirtualCores = 1; + // No. of containers in which the shell script needs to be executed + private int numContainers = 1; + // No. of the Ray roles including head and work + private Map numRoles = Maps.newHashMapWithExpectedSize(2); + private String nodeLabelExpression = null; + // log4j.properties file + // if available, add to local resources and set into classpath + private String log4jPropFile = ""; + // Start time for client + private final long clientStartTime = System.currentTimeMillis(); + // Timeout threshold for client. Kill app after time interval expires. + private long clientTimeout = 600000; + // flag to indicate whether to keep containers across application attempts. + private boolean keepContainers = false; + // attempt failures validity interval + private long attemptFailuresValidityInterval = -1; + // Debug flag + private boolean debugFlag = false; + // the static args of head node + private String headNodeStaticArgs = null; + // the static args of work node + private String workNodeStaticArgs = null; + // supremeFo flag + private boolean supremeFo = false; + // disable process failover flag + private boolean disableProcessFo = false; + // Timeline domain ID + private String domainId = null; + // Flag to indicate whether to create the domain of the given ID + private boolean toCreateDomain = false; + // Timeline domain reader access control + private String viewAcls = null; + // Timeline domain writer access control + private String modifyAcls = null; + // Command line options + private Options opts; + // The max times to get the 'RUNNING' state of application in monitor + private final int maxRunningStateTimes = 10; + + private static final String shellCommandPath = "shellCommands"; + private static final String shellArgsPath = "shellArgs"; + private static final String appMasterJarPath = "AppMaster.jar"; + private static final String log4jPath = "log4j.properties"; + private static final String rayArchivePath = "ray-deploy.zip"; + + /** + * The main entrance of Client. + * + * @param args Command line arguments + */ + public static void main(String[] args) { + boolean result = false; + try { + Client client = new Client(); + LOG.info("Initializing Client"); + try { + boolean doRun = client.init(args); + if (!doRun) { + System.exit(0); + } + } catch (IllegalArgumentException e) { + System.err.println(e.getLocalizedMessage()); + client.printUsage(); + System.exit(-1); + } + result = client.run(); + } catch (Throwable t) { + LOG.fatal("Error running Client", t); + System.exit(1); + } + if (result) { + LOG.info("Application completed successfully"); + System.exit(0); + } + LOG.error("Application failed to complete successfully"); + System.exit(2); + } + + public Client(Configuration conf) throws Exception { + this("org.ray.on.yarn.ApplicationMaster", conf); + } + + Client(String appMasterMainClass, Configuration conf) { + this.conf = conf; + this.appMasterMainClass = appMasterMainClass; + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + opts = new Options(); + opts.addOption("appname", true, "Application Name. Default value - Ray"); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); + opts.addOption("timeout", true, "Application timeout in milliseconds"); + opts.addOption("amMemory", true, + "Amount of memory in MB to be requested to run the application master"); + opts.addOption("amVCores", true, + "Amount of virtual cores to be requested to run the application master"); + opts.addOption("jar", true, "Jar file containing the application master"); + opts.addOption("shellCommand", true, "Shell command to be executed by " + + "the Application Master. Can only specify either --shellCommand " + "or --rayArchive"); + opts.addOption("rayArchive", true, "Location of the shell script to be " + + "executed. Can only specify either --shellCommand or --rayArchive"); + opts.addOption("shellArgs", true, "Command line args for the shell script." + + "Multiple args can be separated by empty space."); + opts.getOption("shellArgs").setArgs(Option.UNLIMITED_VALUES); + opts.addOption("shellArgs", true, + "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("shellCmdPriority", true, "Priority for the shell command containers"); + opts.addOption("containerMemory", true, + "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("containerVcores", true, + "Amount of virtual cores to be requested to run the shell command"); + opts.addOption("numRoles", true, "2-tuple. " + + "No. of the Ray roles including head and work"); + opts.getOption("numRoles").setArgs(2); + opts.addOption("numContainers", true, + "No. of containers on which the shell command needs to be executed"); + opts.addOption("logProperties", true, "log4j.properties file"); + opts.addOption("keepContainersAcrossApplication_attempts", false, + "Flag to indicate whether to keep containers across application attempts." + + " If the flag is true, running containers will not be killed when" + + " application attempt fails and these containers will be retrieved by" + + " the new application attempt "); + opts.addOption("attempt_failures_validity_interval", true, + "when attempt_failures_validity_interval in milliseconds is set to > 0," + + "the failure number will not take failures which happen out of " + + "the validityInterval into failure count. " + + "If failure count reaches to maxAppAttempts, " + "the application will be failed."); + opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("headNodeStaticArgs", true, + "the static args which is needed when start head node"); + opts.addOption("workNodeStaticArgs", true, + "the static args which is needed when start work node"); + // TODO: improvment the failover implement to enable the supremeFo and disableProcessFo option + // opts.addOption("supremeFo", false, "use supreme failover strategy"); + // opts.addOption("disableProcessFo", false, "disable process failover"); + opts.addOption("domain", true, + "ID of the timeline domain where the " + "timeline entities will be put"); + opts.addOption("viewAcls", true, + "Users and groups that allowed to " + "view the timeline entities in the given domain"); + opts.addOption("modifyAcls", true, + "Users and groups that allowed to " + "modify the timeline entities in the given domain"); + opts.addOption("create", false, + "Flag to indicate whether to create the " + "domain specified with -domain."); + opts.addOption("help", false, "Print usage"); + opts.addOption("nodeLabelExpression", true, + "Node label expression to determine the nodes" + + " where all the containers of this application" + + " will be allocated, \"\" means containers" + + " can be allocated anywhere, if you don't specify the option," + + " default nodeLabelExpression of queue will be used."); + } + + public Client() throws Exception { + this(new YarnConfiguration()); + } + + /** + * Helper function to print out usage. + */ + private void printUsage() { + new HelpFormatter().printHelp("Client", opts); + } + + /** + * Parse command line options. + * + * @param args Parsed command line options + * @return Whether the init was successful to run the client + */ + public boolean init(String[] args) throws ParseException { + + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + throw new IllegalArgumentException("No args specified for client to initialize"); + } + + if (cliParser.hasOption("logProperties")) { + String log4jPath = cliParser.getOptionValue("logProperties"); + try { + Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); + } catch (Exception e) { + LOG.warn("Can not set up custom log4j properties. " + e); + } + } + + if (cliParser.hasOption("help")) { + printUsage(); + return false; + } + + if (cliParser.hasOption("debug")) { + debugFlag = true; + } + + if (cliParser.hasOption("headNodeStaticArgs")) { + headNodeStaticArgs = cliParser.getOptionValue("headNodeStaticArgs"); + } + + if (cliParser.hasOption("workNodeStaticArgs")) { + workNodeStaticArgs = cliParser.getOptionValue("workNodeStaticArgs"); + } + + if (cliParser.hasOption("supremeFo")) { + supremeFo = true; + } + + if (cliParser.hasOption("disableProcessFo")) { + disableProcessFo = true; + } + + if (cliParser.hasOption("keepContainersAcrossApplication_attempts")) { + LOG.info("keepContainersAcrossApplication_attempts"); + keepContainers = true; + } + + appName = cliParser.getOptionValue("appname", "Ray"); + amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + amQueue = cliParser.getOptionValue("queue", "default"); + amMemory = Integer.parseInt(cliParser.getOptionValue("amMemory", "100")); + amVCores = Integer.parseInt(cliParser.getOptionValue("amVCores", "1")); + + if (amMemory < 0) { + throw new IllegalArgumentException("Invalid memory specified for application master, exiting." + + " Specified memory=" + amMemory); + } + if (amVCores < 0) { + throw new IllegalArgumentException( + "Invalid virtual cores specified for application master, exiting." + + " Specified virtual cores=" + amVCores); + } + + if (!cliParser.hasOption("jar")) { + throw new IllegalArgumentException("No jar file specified for application master"); + } + + appMasterJar = cliParser.getOptionValue("jar"); + + if (!cliParser.hasOption("shellCommand") && !cliParser.hasOption("rayArchive")) { + throw new IllegalArgumentException( + "No shell command or shell script specified to be executed by application master"); + } else if (cliParser.hasOption("shellCommand") && cliParser.hasOption("rayArchive")) { + throw new IllegalArgumentException( + "Can not specify shellCommand option " + "and rayArchive option at the same time"); + } else if (cliParser.hasOption("shellCommand")) { + shellCommand = cliParser.getOptionValue("shellCommand"); + } else { + rayArchiveFile = cliParser.getOptionValue("rayArchive"); + } + if (cliParser.hasOption("shellArgs")) { + shellArgs = cliParser.getOptionValues("shellArgs"); + } + if (cliParser.hasOption("shellArgs")) { + String[] envs = cliParser.getOptionValues("shellArgs"); + for (String env : envs) { + env = env.trim(); + int index = env.indexOf('='); + if (index == -1) { + shellEnv.put(env, ""); + continue; + } + String key = env.substring(0, index); + String val = ""; + if (index < (env.length() - 1)) { + val = env.substring(index + 1); + } + shellEnv.put(key, val); + } + } + shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shellCmdPriority", "0")); + containerMemory = Integer.parseInt(cliParser.getOptionValue("containerMemory", "10")); + containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("containerVcores", "1")); + numContainers = Integer.parseInt(cliParser.getOptionValue("numContainers", "1")); + + if (cliParser.hasOption("numRoles")) { + String[] optNumRoles = cliParser.getOptionValues("numRoles"); + numRoles.put("head", Integer.parseInt(optNumRoles[0])); + numRoles.put("work", Integer.parseInt(optNumRoles[1])); + + if (numRoles.get("head") < 0 || numRoles.get("work") < 0) { + throw new IllegalArgumentException("Invalid no. of Ray roles" + ", numHeadNode = " + + numRoles.get("head") + ", numWorkNode = " + numRoles.get("work")); + } + if (numRoles.get("head") != 1) { + throw new IllegalArgumentException("There should be one (and only one) head role"); + } + } else { + numRoles.put("head", 1); + numRoles.put("work", 0); + } + + if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { + throw new IllegalArgumentException( + "Invalid no. of containers or container memory/vcores specified," + " exiting." + + " Specified containerMemory=" + containerMemory + ", containerVirtualCores=" + + containerVirtualCores + ", numContainer=" + numContainers); + } + + nodeLabelExpression = cliParser.getOptionValue("nodeLabelExpression", null); + clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); + attemptFailuresValidityInterval = + Long.parseLong(cliParser.getOptionValue("attempt_failures_validity_interval", "-1")); + log4jPropFile = cliParser.getOptionValue("logProperties", ""); + + // Get timeline domain options + if (cliParser.hasOption("domain")) { + domainId = cliParser.getOptionValue("domain"); + toCreateDomain = cliParser.hasOption("create"); + if (cliParser.hasOption("viewAcls")) { + viewAcls = cliParser.getOptionValue("viewAcls"); + } + if (cliParser.hasOption("modifyAcls")) { + modifyAcls = cliParser.getOptionValue("modifyAcls"); + } + } + + return true; + } + + /** + * Main run function for the client. + * + * @return true if application completed successfully. + */ + public boolean run() throws IOException, YarnException { + + LOG.info("Running Client"); + yarnClient.start(); + + YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); + LOG.info("Got Cluster metric info from ASM" + ", numNodeManagers=" + + clusterMetrics.getNumNodeManagers()); + + List clusterNodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + LOG.info("Got Cluster node info from ASM"); + for (NodeReport node : clusterNodeReports) { + LOG.info("Got node report from ASM for" + ", nodeId=" + node.getNodeId() + ", nodeAddress=" + + node.getHttpAddress() + ", nodeRackName=" + node.getRackName() + ", nodeNumContainers=" + + node.getNumContainers()); + } + + QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); + if (queueInfo != null) { + LOG.info("Queue info" + ", queueName=" + queueInfo.getQueueName() + ", queueCurrentCapacity=" + + queueInfo.getCurrentCapacity() + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() + + ", queueApplicationCount=" + queueInfo.getApplications().size() + + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); + } + + List listAclInfo = yarnClient.getQueueAclsInfo(); + for (QueueUserACLInfo aclInfo : listAclInfo) { + for (QueueACL userAcl : aclInfo.getUserAcls()) { + LOG.info("User ACL Info for Queue" + ", queueName=" + aclInfo.getQueueName() + ", userAcl=" + + userAcl.name()); + } + } + + if (domainId != null && domainId.length() > 0 && toCreateDomain) { + prepareTimelineDomain(); + } + + // Get a new application id + YarnClientApplication app = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); + + long maxMem = appResponse.getMaximumResourceCapability().getMemorySize(); + // TODO get min/max resource capabilities from RM and change memory ask if needed + // If we do not have min/max, we may not be able to correctly request + // the required resources from the RM for the app master + // Memory ask has to be a multiple of min and less than max. + // Dump out information about cluster capability as seen by the resource manager + LOG.info("Max mem capability of resources in this cluster " + maxMem); + // A resource ask cannot exceed the maxMem. + if (amMemory > maxMem) { + LOG.info("AM memory specified above max threshold of cluster. Using max value." + + ", specified=" + amMemory + ", max=" + maxMem); + amMemory = maxMem; + } + + // A resource ask cannot exceed the maxVCores. + int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); + if (amVCores > maxVCores) { + LOG.info("AM virtual cores specified above max threshold of cluster. " + "Using max value." + + ", specified=" + amVCores + ", max=" + maxVCores); + amVCores = maxVCores; + } + + // set the application name + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); + appContext.setApplicationName(appName); + + if (attemptFailuresValidityInterval >= 0) { + appContext.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); + } + + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map localResources = new HashMap(); + + LOG.info("Copy App Master jar from local filesystem and add to local environment"); + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + FileSystem fs = FileSystem.get(conf); + ApplicationId appId = appContext.getApplicationId(); + addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); + + // Set the log4j properties if needed + if (!log4jPropFile.isEmpty()) { + addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null); + } + + // The shell script has to be made available on the final container(s) + // where it will be executed. + // To do this, we need to first copy into the filesystem that is visible + // to the yarn framework. + // We do not need to set this as a local resource for the application + // master as the application master does not need it. + String hdfsRayArchiveLocation = ""; + long hdfsRayArchiveLen = 0; + long hdfsRayArchiveTimestamp = 0; + if (!rayArchiveFile.isEmpty()) { + Path raySrc = new Path(rayArchiveFile); + String rayPathSuffix = appName + "/" + appId.toString() + "/" + rayArchivePath; + Path rayDst = new Path(fs.getHomeDirectory(), rayPathSuffix); + fs.copyFromLocalFile(false, true, raySrc, rayDst); + hdfsRayArchiveLocation = fs.getHomeDirectory() + "/" + rayPathSuffix; + FileStatus rayArchiveFileStatus = fs.getFileStatus(rayDst); + hdfsRayArchiveLen = rayArchiveFileStatus.getLen(); + hdfsRayArchiveTimestamp = rayArchiveFileStatus.getModificationTime(); + } + if (!shellCommand.isEmpty()) { + addToLocalResources(fs, null, shellCommandPath, appId.toString(), localResources, + shellCommand); + } + if (shellArgs.length > 0) { + addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, + StringUtils.join(shellArgs, " ")); + } + + // Set the necessary security tokens as needed + // amContainer.setContainerTokens(containerToken); + + // Set the env variables to be setup in the env where the application master will be run + LOG.info("Set the environment for the application master"); + Map env = new HashMap(); + // put location of shell script into env + // using the env info, the application master will create the correct local resource for the + // eventual containers that will be launched to execute the shell scripts + env.put(DsConstants.RAY_ARCHIVE_LOCATION, hdfsRayArchiveLocation); + env.put(DsConstants.RAY_ARCHIVE_TIMESTAMP, Long.toString(hdfsRayArchiveTimestamp)); + env.put(DsConstants.RAY_ARCHIVE_LEN, Long.toString(hdfsRayArchiveLen)); + if (domainId != null && domainId.length() > 0) { + env.put(DsConstants.RAY_TIMELINE_DOMAIN, domainId); + } + + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) + .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); + for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); + classPathEnv.append(c.trim()); + } + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./log4j.properties"); + // add the runtime classpath needed for tests to work + if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + classPathEnv.append(':'); + classPathEnv.append(System.getProperty("java.class.path")); + } + env.put("CLASSPATH", classPathEnv.toString()); + + // Set the necessary command to execute the application master + Vector vargs = new Vector(30); + + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); + // Set Xmx based on am memory size + vargs.add("-Xmx" + amMemory + "m"); + // Set debug args + // vargs.add("-agentlib:jdwp=transport=dt_socket,address=8765,server=y,suspend=n"); + + // Set class name + vargs.add(appMasterMainClass); + // Set params for Application Master + vargs.add("--containerMemory " + String.valueOf(containerMemory)); + vargs.add("--containerVcores " + String.valueOf(containerVirtualCores)); + vargs.add("--numContainers " + String.valueOf(numContainers)); + vargs.add("--numRoles" + " " + numRoles.get("head") + " " + numRoles.get("work")); + if (null != nodeLabelExpression) { + appContext.setNodeLabelExpression(nodeLabelExpression); + } + vargs.add("--priority " + String.valueOf(shellCmdPriority)); + + for (Map.Entry entry : shellEnv.entrySet()) { + vargs.add("--shellArgs " + entry.getKey() + "=" + entry.getValue()); + } + if (debugFlag) { + vargs.add("--debug"); + } + + if (headNodeStaticArgs != null) { + vargs.add("--headNodeStaticArgs " + headNodeStaticArgs); + } + + if (workNodeStaticArgs != null) { + vargs.add("--workNodeStaticArgs " + workNodeStaticArgs); + } + + if (supremeFo) { + vargs.add("--supremeFo"); + } + + if (disableProcessFo) { + vargs.add("--disableProcessFo"); + } + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List commands = new ArrayList(); + commands.add(command.toString()); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = + ContainerLaunchContext.newInstance(localResources, env, commands, null, null, null); + + // Set up resource type requirements + // For now, both memory and vcores are supported, so we set memory and + // vcores requirements + Resource capability = Resource.newInstance(amMemory, amVCores); + appContext.setResource(capability); + + // Service data is a binary blob that can be passed to the application + // Not needed in this scenario + // amContainer.setServiceData(serviceData); + + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce + Credentials credentials = new Credentials(); + String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer"); + } + + // For now, only getting tokens for the default file-system. + final Token[] tokens = fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } + + appContext.setAMContainerSpec(amContainer); + + // Set the priority for the application master + // TODO - what is the range for priority? how to decide? + Priority pri = Priority.newInstance(amPriority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); + // Ignore the response as either a valid response object is returned on success + // or an exception thrown to denote some form of a failure + LOG.info("Submitting application to ASM"); + + yarnClient.submitApplication(appContext); + + // TODO + // Try submitting the same request again + // app submission failure? + + // Monitor the application + return monitorApplication(appId); + } + + /** + * Monitor the submitted application for completion. Kill application if time expires. + * + * @param appId Application Id of application to be monitored + * @return true if application completed successfully + */ + private boolean monitorApplication(ApplicationId appId) throws YarnException, IOException { + + int runningStateTimes = 0; + while (true) { + + // Check app status every 1 second. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + ApplicationReport report = yarnClient.getApplicationReport(appId); + + LOG.info("Got application report from ASM for" + ", appId=" + appId.getId() + + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics=" + + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue=" + + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime=" + + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() + + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); + if (YarnApplicationState.FINISHED == state) { + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + LOG.info("Application has completed successfully. Breaking monitoring loop"); + return true; + } else { + LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString() + + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); + return false; + } + } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) { + LOG.info("Application did not finish." + " YarnState=" + state.toString() + + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); + return false; + } + + if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + LOG.info("Reached client specified timeout for application. Killing application"); + forceKillApplication(appId); + return false; + } + + if (YarnApplicationState.RUNNING == state && maxRunningStateTimes > 0) { + runningStateTimes++; + if (runningStateTimes >= maxRunningStateTimes) { + LOG.info("App state has been 'RUNNING' for " + maxRunningStateTimes + + " times. Breaking monitoring loop"); + return true; + } + } else { + runningStateTimes = 0; + } + } + + } + + /** + * Kill a submitted application by sending a call to the ASM. + * + * @param appId Application Id to be killed. + */ + private void forceKillApplication(ApplicationId appId) throws YarnException, IOException { + // TODO clarify whether multiple jobs with the same app id can be submitted and be running at + // the same time. + // If yes, can we kill a particular attempt only? + + // Response can be ignored as it is non-null on success or + // throws an exception in case of failures + yarnClient.killApplication(appId); + } + + private void addToLocalResources(FileSystem fs, String fileSrcPath, String fileDstPath, + String appId, Map localResources, String resources) + throws IOException { + String suffix = appName + "/" + appId + "/" + fileDstPath; + Path dst = new Path(fs.getHomeDirectory(), suffix); + if (fileSrcPath == null) { + FSDataOutputStream ostream = null; + try { + ostream = FileSystem.create(fs, dst, new FsPermission((short) 0710)); + ostream.writeUTF(resources); + } finally { + IOUtils.closeQuietly(ostream); + } + } else { + fs.copyFromLocalFile(new Path(fileSrcPath), dst); + } + FileStatus scFileStatus = fs.getFileStatus(dst); + LocalResource scRsrc = LocalResource.newInstance(URL.fromURI(dst.toUri()), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, scFileStatus.getLen(), + scFileStatus.getModificationTime()); + localResources.put(fileDstPath, scRsrc); + } + + private void prepareTimelineDomain() { + TimelineClient timelineClient = null; + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + timelineClient.start(); + } else { + LOG.warn( + "Cannot put the domain " + domainId + " because the timeline service is not enabled"); + return; + } + try { + // TODO: we need to check and combine the existing timeline domain ACLs, + // but let's do it once we have client java library to query domains. + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + domain.setReaders(viewAcls != null && viewAcls.length() > 0 ? viewAcls : " "); + domain.setWriters(modifyAcls != null && modifyAcls.length() > 0 ? modifyAcls : " "); + timelineClient.putDomain(domain); + LOG.info("Put the timeline domain: " + TimelineUtils.dumpTimelineRecordtoJSON(domain)); + } catch (Exception e) { + LOG.error("Error when putting the timeline domain", e); + } finally { + timelineClient.stop(); + } + } +} diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java b/devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java new file mode 100644 index 000000000000..29949f5e8e26 --- /dev/null +++ b/devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java @@ -0,0 +1,34 @@ +package org.ray.on.yarn; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Constants used in both Client and Application Master. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class DsConstants { + + /** + * Environment key name pointing to the shell script's location. + */ + public static final String RAY_ARCHIVE_LOCATION = "RAY_ARCHIVE_LOCATION"; + + /** + * Environment key name denoting the file timestamp for the shell script. Used to validate the + * local resource. + */ + public static final String RAY_ARCHIVE_TIMESTAMP = "RAY_ARCHIVE_TIMESTAMP"; + + /** + * Environment key name denoting the file content length for the shell script. Used to validate + * the local resource. + */ + public static final String RAY_ARCHIVE_LEN = "RAY_ARCHIVE_LEN"; + + /** + * Environment key name denoting the timeline domain ID. + */ + public static final String RAY_TIMELINE_DOMAIN = "RAY_TIMELINE_DOMAIN"; +} diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java b/devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java new file mode 100644 index 000000000000..6e37adba59d1 --- /dev/null +++ b/devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java @@ -0,0 +1,37 @@ +package org.ray.on.yarn; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Map.Entry; +import java.util.Properties; +import org.apache.commons.io.IOUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.PropertyConfigurator; + +public class Log4jPropertyHelper { + + /** + * Update log4j configuration. + */ + public static void updateLog4jConfiguration(Class targetClass, String log4jPath) + throws Exception { + Properties customProperties = new Properties(); + FileInputStream fs = null; + InputStream is = null; + try { + fs = new FileInputStream(log4jPath); + is = targetClass.getResourceAsStream("/log4j.properties"); + customProperties.load(fs); + Properties originalProperties = new Properties(); + originalProperties.load(is); + for (Entry entry : customProperties.entrySet()) { + originalProperties.setProperty(entry.getKey().toString(), entry.getValue().toString()); + } + LogManager.resetConfiguration(); + PropertyConfigurator.configure(originalProperties); + } finally { + IOUtils.closeQuietly(is); + IOUtils.closeQuietly(fs); + } + } +} diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java b/devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java new file mode 100644 index 000000000000..ed9ece45bf61 --- /dev/null +++ b/devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java @@ -0,0 +1,22 @@ +package org.ray.on.yarn; + +import org.apache.hadoop.yarn.api.records.Container; + +public class RayNodeContext { + + RayNodeContext(String role) { + this.role = role; + } + + String role; + + boolean isRunning = false; + + boolean isAlocating = false; + + String instanceId = null; + + Container container = null; + + int failCounter = 0; +} diff --git a/devops/yarn/src/test/resources/log4j.properties b/devops/yarn/src/test/resources/log4j.properties new file mode 100644 index 000000000000..81a3f6ad5d24 --- /dev/null +++ b/devops/yarn/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n From dbc42c41289652a70e3a26bda6866fe6846431a5 Mon Sep 17 00:00:00 2001 From: Song Guyang Date: Fri, 15 Jun 2018 15:08:07 +0800 Subject: [PATCH 2/2] some fix about ray on yarn project --- devops/yarn/README.md | 47 ---- devops/yarn/README.rst | 61 +++++ devops/yarn/checkstyle.xml | 241 ------------------ devops/yarn/pom.xml | 4 +- .../ray/{on => }/yarn/ApplicationMaster.java | 2 +- .../java/org/ray/{on => }/yarn/Client.java | 4 +- .../org/ray/{on => }/yarn/DsConstants.java | 2 +- .../{on => }/yarn/Log4jPropertyHelper.java | 2 +- .../org/ray/{on => }/yarn/RayNodeContext.java | 2 +- 9 files changed, 69 insertions(+), 296 deletions(-) delete mode 100644 devops/yarn/README.md create mode 100644 devops/yarn/README.rst delete mode 100644 devops/yarn/checkstyle.xml rename devops/yarn/src/main/java/org/ray/{on => }/yarn/ApplicationMaster.java (99%) rename devops/yarn/src/main/java/org/ray/{on => }/yarn/Client.java (99%) rename devops/yarn/src/main/java/org/ray/{on => }/yarn/DsConstants.java (97%) rename devops/yarn/src/main/java/org/ray/{on => }/yarn/Log4jPropertyHelper.java (97%) rename devops/yarn/src/main/java/org/ray/{on => }/yarn/RayNodeContext.java (92%) diff --git a/devops/yarn/README.md b/devops/yarn/README.md deleted file mode 100644 index c836978f4533..000000000000 --- a/devops/yarn/README.md +++ /dev/null @@ -1,47 +0,0 @@ -# Prerequisites - -1. Yarn app of Ray is run under the Hadoop environment version 2.8.0. You can get the Hadoop binary from [here](http://archive.apache.org/dist/hadoop/common/hadoop-2.8.0/hadoop-2.8.0.tar.gz). - -# Workthrough - -#### Yarn environment and configuration - -Firstly, you should have an available hadoop yarn environment and configuration. - -#### Prepare ray on yarn jar file - -```shell -$ make clean package -``` - -#### Prepare deploy zip file - -```shell -$ cd deploy -$ zip -r deploy.zip . -``` -Please modify the script 'deploy/run.sh' on-demand. - -#### Run - -```shell -$ /path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --jar ./target/ray-on-yarn-1.0.jar --rayArchive ./deploy/deploy.zip --containerVcores 2 --containerMemory 2048 --priority 10 --shellCmdPriority 10 --numRoles 1 1 --queue ray --headNodeStaticArgs "'--num-cpus 4 --num-gpus 4'" --workNodeStaticArgs "'--num-cpus 2 --num-gpus 2'" -``` - -Please modify the command line on-demand. Some detail about the input args is in the help doc. - -```shell -/path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --help -``` - -#### Monitoring - -Please check the logs depend on your yarn platform. - -#### Stop - -```shell -$ /path/to/hadoop-2.8.0/bin/yarn application -kill {app_id} -``` - -`{app_id}` shall be replaced by the ID of the corresponding Yarn application, e.g. `application_1505745052163_0107`. diff --git a/devops/yarn/README.rst b/devops/yarn/README.rst new file mode 100644 index 000000000000..a680df70747b --- /dev/null +++ b/devops/yarn/README.rst @@ -0,0 +1,61 @@ +Prerequisites +============= + +1. Yarn app of Ray is run under the Hadoop environment version 2.8.0. + You can get the Hadoop binary from + `here `__. + +Walkthrough +=========== + +Yarn environment and configuration +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Firstly, you should have an available hadoop yarn environment and +configuration. + +Prepare ray on yarn jar file +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. code:: shell + + $ make clean package + +Prepare deploy zip file +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code:: shell + + $ cd deploy + $ zip -r deploy.zip . + +Please modify the script 'deploy/run.sh' on-demand. + +Run +^^^ + +.. code:: shell + + $ /path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --jar ./target/ray-on-yarn-1.0.jar --rayArchive ./deploy/deploy.zip --containerVcores 2 --containerMemory 2048 --priority 10 --shellCmdPriority 10 --numRoles 1 1 --queue ray --headNodeStaticArgs "'--num-cpus 4 --num-gpus 4'" --workNodeStaticArgs "'--num-cpus 2 --num-gpus 2'" + +Please modify the command line on-demand. Some detail about the input +args is in the help doc. + +.. code:: shell + + /path/to/hadoop-2.8.0/bin/yarn jar ./target/ray-on-yarn-1.0.jar org.ray.on.yarn.Client --help + +Monitoring +^^^^^^^^^^ + +Please check the logs depend on your yarn platform. + +Stop +^^^^ + +.. code:: shell + + $ /path/to/hadoop-2.8.0/bin/yarn application -kill {app_id} + +``{app_id}`` shall be replaced by the ID of the corresponding Yarn +application, e.g. ``application_1505745052163_0107``. diff --git a/devops/yarn/checkstyle.xml b/devops/yarn/checkstyle.xml deleted file mode 100644 index fc5904d1d710..000000000000 --- a/devops/yarn/checkstyle.xml +++ /dev/null @@ -1,241 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/devops/yarn/pom.xml b/devops/yarn/pom.xml index 8603940d05d9..ac3d89245bf4 100644 --- a/devops/yarn/pom.xml +++ b/devops/yarn/pom.xml @@ -66,7 +66,7 @@ - checkstyle.xml + ../../java/checkstyle.xml UTF-8 true false @@ -81,4 +81,4 @@ - \ No newline at end of file + diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java b/devops/yarn/src/main/java/org/ray/yarn/ApplicationMaster.java similarity index 99% rename from devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java rename to devops/yarn/src/main/java/org/ray/yarn/ApplicationMaster.java index b8bc88ba4da0..3438fc65de24 100644 --- a/devops/yarn/src/main/java/org/ray/on/yarn/ApplicationMaster.java +++ b/devops/yarn/src/main/java/org/ray/yarn/ApplicationMaster.java @@ -1,4 +1,4 @@ -package org.ray.on.yarn; +package org.ray.yarn; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/Client.java b/devops/yarn/src/main/java/org/ray/yarn/Client.java similarity index 99% rename from devops/yarn/src/main/java/org/ray/on/yarn/Client.java rename to devops/yarn/src/main/java/org/ray/yarn/Client.java index 0b69a8b0e98b..190bd3bf3402 100644 --- a/devops/yarn/src/main/java/org/ray/on/yarn/Client.java +++ b/devops/yarn/src/main/java/org/ray/yarn/Client.java @@ -1,4 +1,4 @@ -package org.ray.on.yarn; +package org.ray.yarn; import com.google.common.collect.Maps; import java.io.IOException; @@ -176,7 +176,7 @@ public static void main(String[] args) { } public Client(Configuration conf) throws Exception { - this("org.ray.on.yarn.ApplicationMaster", conf); + this("org.ray.yarn.ApplicationMaster", conf); } Client(String appMasterMainClass, Configuration conf) { diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java b/devops/yarn/src/main/java/org/ray/yarn/DsConstants.java similarity index 97% rename from devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java rename to devops/yarn/src/main/java/org/ray/yarn/DsConstants.java index 29949f5e8e26..e67ba26aac54 100644 --- a/devops/yarn/src/main/java/org/ray/on/yarn/DsConstants.java +++ b/devops/yarn/src/main/java/org/ray/yarn/DsConstants.java @@ -1,4 +1,4 @@ -package org.ray.on.yarn; +package org.ray.yarn; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java b/devops/yarn/src/main/java/org/ray/yarn/Log4jPropertyHelper.java similarity index 97% rename from devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java rename to devops/yarn/src/main/java/org/ray/yarn/Log4jPropertyHelper.java index 6e37adba59d1..75149cb9f895 100644 --- a/devops/yarn/src/main/java/org/ray/on/yarn/Log4jPropertyHelper.java +++ b/devops/yarn/src/main/java/org/ray/yarn/Log4jPropertyHelper.java @@ -1,4 +1,4 @@ -package org.ray.on.yarn; +package org.ray.yarn; import java.io.FileInputStream; import java.io.InputStream; diff --git a/devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java b/devops/yarn/src/main/java/org/ray/yarn/RayNodeContext.java similarity index 92% rename from devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java rename to devops/yarn/src/main/java/org/ray/yarn/RayNodeContext.java index ed9ece45bf61..08ca6cf455fc 100644 --- a/devops/yarn/src/main/java/org/ray/on/yarn/RayNodeContext.java +++ b/devops/yarn/src/main/java/org/ray/yarn/RayNodeContext.java @@ -1,4 +1,4 @@ -package org.ray.on.yarn; +package org.ray.yarn; import org.apache.hadoop.yarn.api.records.Container;