Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
[Rest Server] Convert protocol to framework launcher description (#2291)
Browse files Browse the repository at this point in the history
* Convert protocol to framework description

Convert protocol to framework description for api v2.

* Fix bugs

Fix bugs.

* Use class to replace arrow function

Fix bugs, use class to replace arrow function.
  • Loading branch information
abuccts authored Mar 20, 2019
1 parent 46355fa commit 7f9959b
Showing 1 changed file with 296 additions and 0 deletions.
296 changes: 296 additions & 0 deletions src/rest-server/src/models/v2/job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// MIT License
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
// to permit persons to whom the Software is furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
// BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


// module dependencies
const axios = require('axios');
const status = require('statuses');
const keygen = require('ssh-keygen');
const mustache = require('mustache');
const userModel = require('../user');
const HDFS = require('../../util/hdfs');
const createError = require('../../util/error');
const logger = require('../../config/logger');
const azureEnv = require('../../config/azure');
const paiConfig = require('../../config/paiConfig');
const launcherConfig = require('../../config/launcher');
const yarnContainerScriptTemplate = require('../../templates/yarnContainerScript');
const dockerContainerScriptTemplate = require('../../templates/dockerContainerScript');


const generateFrameworkDescription = (jobName, userName, config) => {
const frameworkDescription = {
version: 10,
user: {
name: userName,
},
retryPolicy: {
maxRetryCount: config.jobRetryCount || 0,
fancyRetryPolicy: (config.jobRetryCount !== -2),
},
taskRoles: {},
platformSpecificParameters: {
queue: ('defaults' in config && config.defaults.virtualCluster != null) ?
config.defaults.virtualCluster : 'default',
taskNodeGpuType: null,
gangAllocation: true,
amResource: {
cpuNumber: launcherConfig.amResource.cpuNumber,
memoryMB: launcherConfig.amResource.memoryMB,
diskType: launcherConfig.amResource.diskType,
diskMB: launcherConfig.amResource.diskMB,
},
},
};
// fill in task roles
for (let taskRole of Object.keys(config.taskRoles)) {
const portList = {
ssh: {start: 0, count: 1},
http: {start: 0, count: 1},
};
if (config.taskRoles[taskRole].resourcePerInstance.ports != null) {
for (let port of Object.keys(config.taskRoles[taskRole].resourcePerInstance.ports)) {
portList[port] = {
start: 0,
count: config.taskRoles[taskRole].resourcePerInstance.ports[port],
};
}
}
// task role in framework description
const frameworkTaskRole = {
taskNumber: config.taskRoles[taskRole].instances || 1,
taskService: {
version: 0,
entryPoint: `source YarnContainerScripts/${taskRole}.sh`,
sourceLocations: [`/Container/${userName}/${jobName}/YarnContainerScripts`],
resource: {
cpuNumber: config.taskRoles[taskRole].resourcePerInstance.cpu,
memoryMB: config.taskRoles[taskRole].resourcePerInstance.memoryMB,
gpuNumber: config.taskRoles[taskRole].resourcePerInstance.gpu,
portDefinitions: portList,
diskType: 0,
diskMB: 0,
},
},
};
// fill in completion policy
if ('completion' in config.taskRoles[taskRole]) {
frameworkTaskRole.applicationCompletionPolicy = {
minFailedTaskCount: ('minFailedInstances' in config.taskRoles[taskRole].completion) ?
config.taskRoles[taskRole].completion.minFailedInstances : 1,
minSucceededTaskCount: ('minSucceededInstances' in config.taskRoles[taskRole].completion) ?
config.taskRoles[taskRole].completion.minSucceededInstances : null,
};
} else {
frameworkTaskRole.applicationCompletionPolicy = {
minFailedTaskCount: 1,
minSucceededTaskCount: null,
};
}
frameworkDescription.taskRoles[taskRole] = frameworkTaskRole;
}
return frameworkDescription;
};

// backward compatible generation for yarn container script template in api v1
const generateYarnContainerScript = (jobName, userName, config, frameworkDescription, taskRole) => {
let tasksNumber = 0;
for (let i of Object.keys(frameworkDescription.taskRoles)) {
tasksNumber += frameworkDescription.taskRoles[i].taskNumber;
}
const yarnContainerScript = mustache.render(yarnContainerScriptTemplate, {
idx: taskRole,
jobData: {
jobName: jobName,
userName: userName,
image: config.prerequisites.dockerimage[config.taskRoles[taskRole].dockerImage].uri,
authFile: null,
virtualCluster: frameworkDescription.platformSpecificParameters.queue,
},
taskData: {
name: taskRole,
taskNumber: frameworkDescription.taskRoles[taskRole].taskNumber,
cpuNumber: frameworkDescription.taskRoles[taskRole].taskService.resource.cpuNumber,
memoryMB: frameworkDescription.taskRoles[taskRole].taskService.resource.memoryMB,
gpuNumber: frameworkDescription.taskRoles[taskRole].taskService.resource.gpuNumber,
shmMB: 512,
minFailedTaskCount: frameworkDescription.taskRoles[taskRole].applicationCompletionPolicy.minFailedInstances,
minSucceededTaskCount: frameworkDescription.taskRoles[taskRole].applicationCompletionPolicy.minSucceededInstances,
},
tasksNumber: tasksNumber,
taskRoleList: Object.keys(frameworkDescription.taskRoles).join(','),
taskRolesNumber: Object.keys(frameworkDescription.taskRoles).length,
jobEnvs: null,
hdfsUri: launcherConfig.hdfsUri,
aggregatedStatusUri: launcherConfig.frameworkAggregatedStatusPath(jobName),
frameworkInfoWebhdfsUri: launcherConfig.frameworkInfoWebhdfsPath(jobName),
azRDMA: azureEnv.azRDMA === 'true' ? true : false,
reqAzRDMA: false,
inspectPidFormat: '{{.State.Pid}}',
inspectOOMKilledFormat: '{{.State.OOMKilled}}',
});
return yarnContainerScript;
};

// backward compatible generation for docker container script template in api v1
const generateDockerContainerScript = (jobName, userName, config, taskRole) => {
const dockerContainerScript = mustache.render(dockerContainerScriptTemplate, {
idx: taskRole,
jobData: {
jobName: jobName,
userName: userName,
},
taskData: {
command: config.taskRoles[taskRole].entryPoint,
},
hdfsUri: launcherConfig.hdfsUri,
webHdfsUri: launcherConfig.webhdfsUri,
paiMachineList: paiConfig.machineList,
azRDMA: azureEnv.azRDMA === 'true' ? true : false,
reqAzRDMA: false,
});
return dockerContainerScript;
};

const generateSSHKeys = (jobName) => {
return new Promise((resolve) => {
keygen({
location: jobName,
comment: `ssh key for ${jobName}`,
read: true,
destroy: true,
}, (err, out) => {
if (err) {
logger.warn('Generating ssh key files failed! Will skip generating ssh info.');
resolve(null);
} else {
const keys = {};
// private key
keys[jobName] = out.key;
// public key
keys[`${jobName}.pub`] = out.pubKey;
resolve(keys);
}
});
});
};

const prepareContainerScripts = async (jobName, userName, config) => {
// hdfs operations
const hdfs = new HDFS(launcherConfig.webhdfsUri);
// async mkdir on hdfs
const mkdir = async (path, user, mode) => {
const options = {'user.name': user, 'permission': mode};
return new Promise((resolve) => {
hdfs.createFolder(path, options, (err, result) => {
if (err) {
throw err;
} else {
resolve(result);
}
});
});
};
// async upload file on hdfs
const upload = async (path, data, user, mode) => {
const options = {'user.name': user, 'permission': mode, 'overwrite': 'true'};
return new Promise((resolve) => {
hdfs.createFile(path, data, options, (err, result) => {
if (err) {
throw err;
} else {
resolve(result);
}
});
});
};

// generate framework description
const frameworkDescription = generateFrameworkDescription(jobName, userName, config);

// prepare scripts on hdfs
const pathPrefix = `/Container/${userName}/${jobName}`;
await mkdir('/Container', 'root', '777');
for (let path of ['log', 'tmp']) {
await mkdir(`${pathPrefix}/${path}`, userName, '755');
}
for (let taskRole of Object.keys(config.taskRoles)) {
await upload(`${pathPrefix}/YarnContainerScripts/${taskRole}.sh`,
generateYarnContainerScript(jobName, userName, config, frameworkDescription, taskRole), userName, '644');
await upload(`${pathPrefix}/DockerContainerScripts/${taskRole}.sh`,
generateDockerContainerScript(jobName, userName, config, taskRole), userName, '644');
}
await upload(`${pathPrefix}/${launcherConfig.frameworkDescriptionFilename}`,
JSON.stringify(frameworkDescription, null, 2), userName, '644');

// generate ssh key
if (process.platform.toUpperCase() === 'LINUX') {
const keys = await generateSSHKeys(jobName);
if (keys) {
for (let keyname of Object.keys(keys)) {
await upload(`${pathPrefix}/ssh/keyFiles/${keyname}`, keys[keyname], userName, '755');
}
}
}

// return framework description
return frameworkDescription;
};


class Job {
constructor(jobName, userName, config) {
this.jobName = jobName;
this.userName = userName;
this.frameworkName = this.userName ? `${this.userName}~${this.jobName}` : this.jobName;
this.config = config;
}
}

Job.prototype.put = async function(next) {
// check user vc
const virtualCluster = ('defaults' in this.config && this.config.defaults.virtualCluster != null) ?
this.config.defaults.virtualCluster : 'default';
await new Promise((resolve) => {
userModel.checkUserVc(this.userName, virtualCluster, (err, result) => {
if (err) {
throw err;
} else {
resolve(result);
}
});
});

// generate framework description and prepare container scripts on hdfs
const frameworkDescription = await prepareContainerScripts(this.jobName, this.userName, this.config);

// send request to framework launcher
const response = await axios({
method: 'put',
url: launcherConfig.frameworkPath(this.frameworkName),
headers: launcherConfig.webserviceRequestHeaders(this.userName),
data: frameworkDescription,
});
if (response.status !== status('Accepted')) {
throw createError(response.status, 'UnknownError', response.data.raw_body);
} else {
await next();
}
};

// module exports
module.exports = Job;

0 comments on commit 7f9959b

Please sign in to comment.