diff --git a/Makefile b/Makefile index 54d1db1927..4d7d0a11b3 100644 --- a/Makefile +++ b/Makefile @@ -92,7 +92,6 @@ build: #$(_INFO) Building nnictl $(_END) cd tools && python3 setup.py build - # Standard installation target # Must be invoked after building .PHONY: install @@ -207,7 +206,6 @@ install-python-modules: #$(_INFO) Installing nnictl $(_END) cd tools && python3 setup.py install $(PIP_MODE) - .PHONY: install-node-modules install-node-modules: mkdir -p $(INSTALL_PREFIX)/nni @@ -227,7 +225,7 @@ install-dev-modules: #$(_INFO) Installing nnictl $(_END) cd tools && $(PIP_INSTALL) $(PIP_MODE) -e . - + mkdir -p $(INSTALL_PREFIX)/nni #$(_INFO) Installing NNI Manager $(_END) diff --git a/setup.py b/setup.py index 14c42828cb..bfdda8a283 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ def run(self): setup( name = 'NNI', - version = '0.1.0', + version = '0.2.0', author = 'Microsoft NNI Team', author_email = 'nni@microsoft.com', description = 'Neural Network Intelligence project', @@ -47,7 +47,8 @@ def run(self): package_dir = { 'nni_annotation': 'tools/nni_annotation', 'nni': 'src/sdk/pynni/nni', - 'nnicmd': 'tools/nnicmd' + 'nnicmd': 'tools/nnicmd', + 'trial_tool':'tools/trial_tool' }, python_requires = '>=3.5', install_requires = [ @@ -59,7 +60,8 @@ def run(self): 'pyyaml', 'requests', 'scipy', - 'schema' + 'schema', + 'pyhdfs' ], cmdclass={ diff --git a/src/nni_manager/common/utils.ts b/src/nni_manager/common/utils.ts index f272ea22b1..e83e40e919 100644 --- a/src/nni_manager/common/utils.ts +++ b/src/nni_manager/common/utils.ts @@ -225,5 +225,19 @@ function cleanupUnitTest(): void { Container.restore(ExperimentStartupInfo); } -export { getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getDefaultDatabaseDir, mkDirP, delay, prepareUnitTest, - parseArg, cleanupUnitTest, uniqueString, randomSelect }; +/** + * Get IPv4 address of current machine + */ +function getIPV4Address(): string { + let ipv4Address : string = ''; + + for(const item of os.networkInterfaces().eth0) { + if(item.family === 'IPv4') { + ipv4Address = item.address; + } + } + return ipv4Address; +} + +export { getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getDefaultDatabaseDir, getIPV4Address, + mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect }; diff --git a/src/nni_manager/main.ts b/src/nni_manager/main.ts index 6d9c9fa64b..f3d386eccd 100644 --- a/src/nni_manager/main.ts +++ b/src/nni_manager/main.ts @@ -36,6 +36,7 @@ import { LocalTrainingServiceForGPU } from './training_service/local/localTraini import { RemoteMachineTrainingService } from './training_service/remote_machine/remoteMachineTrainingService'; +import { PAITrainingService } from './training_service/pai/paiTrainingService' function initStartupInfo(startExpMode: string, resumeExperimentId: string) { @@ -49,6 +50,8 @@ async function initContainer(platformMode: string): Promise { Container.bind(TrainingService).to(LocalTrainingServiceForGPU).scope(Scope.Singleton); } else if (platformMode === 'remote') { Container.bind(TrainingService).to(RemoteMachineTrainingService).scope(Scope.Singleton); + } else if (platformMode === 'pai'){ + Container.bind(TrainingService).to(PAITrainingService).scope(Scope.Singleton); } else { throw new Error(`Error: unsupported mode: ${mode}`); } @@ -61,7 +64,7 @@ async function initContainer(platformMode: string): Promise { } function usage(): void { - console.info('usage: node main.js --port --mode --start_mode --experiment_id '); + console.info('usage: node main.js --port --mode --start_mode --experiment_id '); } let port: number = NNIRestServer.DEFAULT_PORT; @@ -71,7 +74,7 @@ if (strPort && strPort.length > 0) { } const mode: string = parseArg(['--mode', '-m']); -if (!['local', 'remote'].includes(mode)) { +if (!['local', 'remote', 'pai'].includes(mode)) { usage(); process.exit(1); } diff --git a/src/nni_manager/package.json b/src/nni_manager/package.json index 46522044fd..04ee4df3c2 100644 --- a/src/nni_manager/package.json +++ b/src/nni_manager/package.json @@ -23,7 +23,8 @@ "tree-kill": "^1.2.0", "ts-deferred": "^1.0.4", "typescript-ioc": "^1.2.4", - "typescript-string-operations": "^1.3.1" + "typescript-string-operations": "^1.3.1", + "webhdfs":"^1.2.0" }, "devDependencies": { "@types/chai": "^4.1.4", @@ -40,6 +41,7 @@ "chai": "^4.1.2", "mocha": "^5.2.0", "request": "^2.87.0", + "rmdir": "^1.2.0", "tmp": "^0.0.33", "ts-node": "^7.0.0", "tslint": "^5.11.0", diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 6c172349c6..24b07836a7 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -33,9 +33,19 @@ export namespace ValidationSchemas { passphrase: joi.string() })), trial_config: joi.object({ - gpuNum: joi.number().min(0).required(), + image: joi.string().min(1), codeDir: joi.string().min(1).required(), - command: joi.string().min(1).required() + dataDir: joi.string(), + outputDir: joi.string(), + cpuNum: joi.number().min(1), + memoryMB: joi.number().min(100), + gpuNum: joi.number().min(0).required(), + command: joi.string().min(1).required() + }), + pai_config: joi.object({ + userName: joi.string().min(1).required(), + passWord: joi.string().min(1).required(), + host: joi.string().min(1).required() }) } }; diff --git a/src/nni_manager/training_service/common/jobMetrics.ts b/src/nni_manager/training_service/common/jobMetrics.ts new file mode 100644 index 0000000000..a1abe64574 --- /dev/null +++ b/src/nni_manager/training_service/common/jobMetrics.ts @@ -0,0 +1,37 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import { TrialJobStatus } from '../../common/trainingService'; + +// tslint:disable-next-line:max-classes-per-file +export class JobMetrics { + public readonly jobId: string; + public readonly metrics: string[]; + public readonly jobStatus: TrialJobStatus; + public readonly endTimestamp: number; + + constructor(jobId : string, metrics : string[], jobStatus : TrialJobStatus, endTimestamp : number) { + this.jobId = jobId; + this.metrics = metrics; + this.jobStatus = jobStatus; + this.endTimestamp = endTimestamp; + } +} diff --git a/src/nni_manager/training_service/common/trialConfigMetadataKey.ts b/src/nni_manager/training_service/common/trialConfigMetadataKey.ts index e9749e562e..12df449ee1 100644 --- a/src/nni_manager/training_service/common/trialConfigMetadataKey.ts +++ b/src/nni_manager/training_service/common/trialConfigMetadataKey.ts @@ -26,5 +26,6 @@ export enum TrialConfigMetadataKey { MACHINE_LIST = 'machine_list', TRIAL_CONFIG = 'trial_config', EXPERIMENT_ID = 'experimentId', - RANDOM_SCHEDULER = 'random_scheduler' + RANDOM_SCHEDULER = 'random_scheduler', + PAI_CLUSTER_CONFIG = 'pai_config' } diff --git a/src/nni_manager/training_service/pai/hdfsClientUtility.ts b/src/nni_manager/training_service/pai/hdfsClientUtility.ts new file mode 100644 index 0000000000..07dcc2a744 --- /dev/null +++ b/src/nni_manager/training_service/pai/hdfsClientUtility.ts @@ -0,0 +1,200 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import * as path from 'path'; +import * as fs from 'fs'; +import { Deferred } from 'ts-deferred'; +import { getLogger } from '../../common/log'; + +/** + * HDFS client utility, including copy file/directory + */ +export namespace HDFSClientUtility { + /** + * Copy a local file to hdfs directory + * + * @param localFilePath local file path(source) + * @param hdfsFilePath hdfs file path(target) + * @param hdfsClient hdfs client + */ + export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise { + const deferred: Deferred = new Deferred(); + fs.exists(localFilePath, (exists : boolean) => { + // Detect if local file exist + if (exists) { + var localFileStream = fs.createReadStream(localFilePath); + var hdfsFileStream = hdfsClient.createWriteStream(hdfsFilePath); + localFileStream.pipe(hdfsFileStream); + hdfsFileStream.on('finish', function onFinish () { + deferred.resolve(); + }); + hdfsFileStream.on('error', (err : any) => { + getLogger().error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`); + deferred.reject(err); + }); + } else { + getLogger().error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`); + deferred.reject('file not exist!'); + } + }); + return deferred.promise; + } + + /** + * Recursively copy local directory to hdfs directory + * + * @param localDirectory local directory + * @param hdfsDirectory HDFS directory + * @param hdfsClient HDFS client + */ + export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise{ + const deferred: Deferred = new Deferred(); + // TODO: fs.readdirSync doesn't support ~($HOME) + const fileNameArray: string[] = fs.readdirSync(localDirectory); + + for(var fileName of fileNameArray){ + const fullFilePath: string = path.join(localDirectory, fileName); + try { + if (fs.lstatSync(fullFilePath).isFile()) { + await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient); + } else { + // If filePath is a directory, recuisively copy it to remote directory + await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient); + } + } catch(error) { + deferred.reject(error); + } + } + // All files/directories are copied successfully, resolve + deferred.resolve(); + + return deferred.promise; + } + + /** + * Read content from HDFS file + * + * @param hdfsPath HDFS file path + * @param hdfsClient HDFS client + */ + export async function readFileFromHDFS(hdfsPath : string, hdfsClient :any) : Promise { + const deferred: Deferred = new Deferred(); + let buffer : Buffer = Buffer.alloc(0); + + const exist : boolean = await pathExists(hdfsPath, hdfsClient); + if(!exist) { + deferred.reject(`${hdfsPath} doesn't exists`); + } + + const remoteFileStream = hdfsClient.createReadStream(hdfsPath); + remoteFileStream.on('error', (err : any) => { + // Reject with the error + deferred.reject(err); + }); + + remoteFileStream.on('data', (chunk : any) => { + // Concat the data chunk to buffer + buffer = Buffer.concat([buffer, chunk]); + }); + + remoteFileStream.on('finish', function onFinish () { + // Upload is done, resolve + deferred.resolve(buffer); + }); + + return deferred.promise; + } + + /** + * Check if an HDFS path already exists + * + * @param hdfsPath target path need to check in HDFS + * @param hdfsClient HDFS client + */ + export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise { + const deferred : Deferred = new Deferred(); + hdfsClient.exists(hdfsPath, (exist : boolean ) => { + deferred.resolve(exist); + }) + + return deferred.promise; + } + + /** + * Mkdir in HDFS, use default permission 755 + * + * @param hdfsPath the path in HDFS. It could be either file or directory + * @param hdfsClient + */ + export function mkdir(hdfsPath : string, hdfsClient : any) : Promise { + const deferred : Deferred = new Deferred(); + + hdfsClient.mkdir(hdfsPath, (err : any)=> { + if(!err) { + deferred.resolve(true); + } else { + deferred.reject(err.message); + } + }); + + return deferred.promise; + } + + /** + * Read directory contents + * + * @param hdfsPath the path in HDFS. It could be either file or directory + * @param hdfsClient + */ + export async function readdir(hdfsPath : string, hdfsClient : any) : Promise { + const deferred : Deferred = new Deferred(); + const exist : boolean = await pathExists(hdfsPath, hdfsClient); + if(!exist) { + deferred.reject(`${hdfsPath} doesn't exists`); + } + + hdfsClient.readdir(hdfsPath, (err : any, files : any[] ) => { + if(err) { + deferred.reject(err); + } + + deferred.resolve(files); + }); + + return deferred.promise; + } + + /** + * Delete HDFS path + * @param hdfsPath the path in HDFS. It could be either file or directory + * @param hdfsClient + * @param recursive Mark if need to delete recursively + */ + export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise { + const deferred : Deferred = new Deferred(); + hdfsClient.unlink(hdfsPath, recursive, (err : any)=> { + if(!err) { + deferred.resolve(true); + } else { + deferred.reject(err.message); + } + }); + return deferred.promise; + } +} diff --git a/src/nni_manager/training_service/pai/paiConfig.ts b/src/nni_manager/training_service/pai/paiConfig.ts new file mode 100644 index 0000000000..aa84021ec4 --- /dev/null +++ b/src/nni_manager/training_service/pai/paiConfig.ts @@ -0,0 +1,123 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import {TrialConfig} from '../common/trialConfig' + +export class PAITaskRole { + // Name for the task role + public readonly name: string; + // Number of tasks for the task role, no less than 1 + public readonly taskNumber: number; + // CPU number for one task in the task role, no less than 1 + public readonly cpuNumber: number; + // Memory for one task in the task role, no less than 100 + public readonly memoryMB: number; + // GPU number for one task in the task role, no less than 0 + public readonly gpuNumber: number; + // Executable command for tasks in the task role, can not be empty + public readonly command: string; + + /** + * Constructor + * @param name Name for the task role + * @param taskNumber Number of tasks for the task role, no less than 1 + * @param cpuNumber CPU number for one task in the task role, no less than 1 + * @param memoryMB Memory for one task in the task role, no less than 100 + * @param gpuNumber GPU number for one task in the task role, no less than 0 + * @param command Executable command for tasks in the task role, can not be empty + */ + constructor(name : string, taskNumber : number, cpuNumber : number, memoryMB : number, gpuNumber : number, command : string) { + this.name = name; + this.taskNumber = taskNumber; + this.cpuNumber = cpuNumber; + this.memoryMB = memoryMB; + this.gpuNumber = gpuNumber; + this.command = command; + } +} + +export class PAIJobConfig{ + // Name for the job, need to be unique + public readonly jobName: string; + // URL pointing to the Docker image for all tasks in the job + public readonly image: string; + // Data directory existing on HDFS + public readonly dataDir: string; + // Output directory on HDFS + public readonly outputDir: string; + // Code directory on HDFS + public readonly codeDir: string; + + // List of taskRole, one task role at least + public taskRoles: PAITaskRole[]; + + /** + * Constructor + * @param jobName Name for the job, need to be unique + * @param image URL pointing to the Docker image for all tasks in the job + * @param dataDir Data directory existing on HDFS + * @param outputDir Output directory on HDFS + * @param taskRoles List of taskRole, one task role at least + */ + constructor(jobName: string, image : string, dataDir : string, outputDir : string, codeDir : string, taskRoles : PAITaskRole[]){ + this.jobName = jobName; + this.image = image; + this.dataDir = dataDir; + this.outputDir = outputDir; + this.codeDir = codeDir; + this.taskRoles = taskRoles; + } +} + +export class PAIClusterConfig { + public readonly userName: string; + public readonly passWord: string; + public readonly host: string; + + /** + * Constructor + * @param userName User name of PAI Cluster + * @param passWord password of PAI Cluster + * @param host Host IP of PAI Cluster + */ + constructor(userName: string, passWord : string, host : string){ + this.userName = userName; + this.passWord = passWord; + this.host = host; + } +} + +export class NNIPAITrialConfig extends TrialConfig{ + public readonly cpuNum: number; + public readonly memoryMB: number; + public readonly image: string; + public readonly dataDir: string; + public outputDir: string; + + constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string, dataDir: string, outputDir: string) { + super(command, codeDir, gpuNum); + this.cpuNum = cpuNum; + this.memoryMB = memoryMB; + this.image = image; + this.dataDir = dataDir; + this.outputDir = outputDir; + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/pai/paiData.ts b/src/nni_manager/training_service/pai/paiData.ts new file mode 100644 index 0000000000..4b954deb4e --- /dev/null +++ b/src/nni_manager/training_service/pai/paiData.ts @@ -0,0 +1,61 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from 'common/trainingService'; + +export class PAITrialJobDetail implements TrialJobDetail { + public id: string; + public status: TrialJobStatus; + public paiJobName: string; + public submitTime: number; + public startTime?: number; + public endTime?: number; + public tags?: string[]; + public url?: string; + public workingDirectory: string; + public form: JobApplicationForm; + public hdfsLogPath: string; + + constructor(id: string, status: TrialJobStatus, paiJobName : string, + submitTime: number, workingDirectory: string, form: JobApplicationForm, hdfsLogPath: string) { + this.id = id; + this.status = status; + this.paiJobName = paiJobName; + this.submitTime = submitTime; + this.workingDirectory = workingDirectory; + this.form = form; + this.tags = []; + this.hdfsLogPath = hdfsLogPath; + } +} + +export const PAI_TRIAL_COMMAND_FORMAT: string = +`pip3 install -v --user git+https://github.com/Microsoft/nni.git@master +&& export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={0} NNI_TRIAL_JOB_ID={1} NNI_EXP_ID={2} +&& cd $NNI_SYS_DIR && mkdir .nni +&& python3 -m trial_tool.trial_keeper --trial_command '{3}' --nnimanager_ip '{4}' --pai_hdfs_output_dir '{5}' +--pai_hdfs_host '{6}' --pai_user_name {7}`; + +export const PAI_OUTPUT_DIR_FORMAT: string = +`hdfs://{0}:9000/`; + +export const PAI_LOG_PATH_FORMAT: string = +`http://{0}:50070/explorer.html#{1}` diff --git a/src/nni_manager/training_service/pai/paiJobInfoCollector.ts b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts new file mode 100644 index 0000000000..041151c47d --- /dev/null +++ b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts @@ -0,0 +1,136 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as request from 'request'; +import { EventEmitter } from 'events'; +import { Deferred } from 'ts-deferred'; +import { getLogger, Logger } from '../../common/log'; +import { NNIError, NNIErrorNames } from '../../common/errors'; +import { PAITrialJobDetail } from './paiData'; +import { PAIClusterConfig } from './paiConfig'; +import { TrialJobStatus } from '../../common/trainingService'; + +/** + * Collector PAI jobs info from PAI cluster, and update pai job status locally + */ +export class PAIJobInfoCollector { + private readonly trialJobsMap : Map; + private readonly log: Logger = getLogger(); + private readonly statusesNeedToCheck : TrialJobStatus[]; + private readonly finalStatuses : TrialJobStatus[]; + + constructor(jobMap: Map) { + this.trialJobsMap = jobMap; + this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING']; + this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED']; + } + + public async updateTrialStatusFromPAI(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise { + if (!paiClusterConfig || !paiToken) { + return Promise.resolve(); + } + + const updatePaiTrialJobs : Promise[] = []; + for(let [trialJobId, paiTrialJob] of this.trialJobsMap) { + if (!paiTrialJob) { + throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`); + } + updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(paiTrialJob, paiToken, paiClusterConfig)) + } + + await Promise.all(updatePaiTrialJobs); + } + + private getSinglePAITrialJobInfo(paiTrialJob : PAITrialJobDetail, paiToken : string, paiClusterConfig: PAIClusterConfig) : Promise { + const deferred : Deferred = new Deferred(); + if (!this.statusesNeedToCheck.includes(paiTrialJob.status)) { + deferred.resolve(); + return deferred.promise; + } + + // Rest call to get PAI job info and update status + // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API + const getJobInfoRequest: request.Options = { + uri: `http://${paiClusterConfig.host}:9186/api/v1/jobs/${paiTrialJob.paiJobName}`, + method: 'GET', + json: true, + headers: { + "Content-Type": "application/json", + "Authorization": 'Bearer ' + paiToken + } + }; + //TODO : pass in request timeout param? + request(getJobInfoRequest, (error: Error, response: request.Response, body: any) => { + if (error || response.statusCode >= 500) { + this.log.error(`PAI Training service: get job info for trial ${paiTrialJob.id} from PAI Cluster failed!`); + // Queried PAI job info failed, set job status to UNKNOWN + if(paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') { + paiTrialJob.status = 'UNKNOWN'; + } + } else { + if(response.body.jobStatus && response.body.jobStatus.state) { + switch(response.body.jobStatus.state) { + case 'WAITING': + paiTrialJob.status = 'WAITING'; + break; + case 'RUNNING': + paiTrialJob.status = 'RUNNING'; + if(!paiTrialJob.startTime) { + paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime; + } + if(!paiTrialJob.url) { + paiTrialJob.url = response.body.jobStatus.appTrackingUrl; + } + break; + case 'SUCCEEDED': + paiTrialJob.status = 'SUCCEEDED'; + break; + case 'STOPPED': + paiTrialJob.status = 'USER_CANCELED'; + break; + case 'FAILED': + paiTrialJob.status = 'FAILED'; + break; + default: + paiTrialJob.status = 'UNKNOWN'; + break; + } + // For final job statues, update startTime, endTime and url + if(this.finalStatuses.includes(paiTrialJob.status)) { + if(!paiTrialJob.startTime) { + paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime; + } + if(!paiTrialJob.endTime) { + paiTrialJob.endTime = response.body.jobStatus.completedTime; + } + // Set pai trial job's url to WebHDFS output path + if(paiTrialJob.hdfsLogPath) { + paiTrialJob.url = paiTrialJob.hdfsLogPath; + } + } + } + } + deferred.resolve(); + }); + + return deferred.promise; + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/pai/paiJobRestServer.ts b/src/nni_manager/training_service/pai/paiJobRestServer.ts index 6375eee1c5..098ea74333 100644 --- a/src/nni_manager/training_service/pai/paiJobRestServer.ts +++ b/src/nni_manager/training_service/pai/paiJobRestServer.ts @@ -17,4 +17,82 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ - \ No newline at end of file +'use strict'; + +import { Request, Response, Router } from 'express'; +import * as bodyParser from 'body-parser'; +import * as component from '../../common/component'; +import { getExperimentId } from '../../common/experimentStartupInfo'; +import { Inject } from 'typescript-ioc'; +import { PAITrainingService } from './paiTrainingService'; +import { RestServer } from '../../common/restServer' + +/** + * PAI Training service Rest server, provides rest API to support pai job metrics update + * + */ +@component.Singleton +export class PAIJobRestServer extends RestServer{ + /** NNI main rest service default port */ + private static readonly DEFAULT_PORT: number = 51189; + + private readonly API_ROOT_URL: string = '/api/v1/nni-pai'; + + private readonly expId: string = getExperimentId(); + + @Inject + private readonly paiTrainingService : PAITrainingService; + + /** + * constructor to provide NNIRestServer's own rest property, e.g. port + */ + constructor() { + super(); + this.port = PAIJobRestServer.DEFAULT_PORT; + this.paiTrainingService = component.get(PAITrainingService); + } + + /** + * NNIRestServer's own router registration + */ + protected registerRestHandler(): void { + this.app.use(bodyParser.json()); + this.app.use(this.API_ROOT_URL, this.createRestHandler()); + } + + private createRestHandler() : Router { + const router: Router = Router(); + + // tslint:disable-next-line:typedef + router.use((req: Request, res: Response, next) => { + this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`); + res.setHeader('Content-Type', 'application/json'); + next(); + }); + + router.post(`/update-metrics/${this.expId}/:trialId`, (req: Request, res: Response) => { + try { + this.log.info(`Get update-metrics request, trial job id is ${req.params.trialId}`); + this.log.info(`update-metrics body is ${JSON.stringify(req.body)}`); + + // Split metrics array into single metric, then emit + // Warning: If not split metrics into single ones, the behavior will be UNKNOWN + for (const singleMetric of req.body.metrics) { + this.paiTrainingService.MetricsEmitter.emit('metric', { + id : req.body.jobId, + data : singleMetric + }); + } + + res.send(); + } + catch(err) { + this.log.error(`json parse metrics error: ${err}`); + res.status(500); + res.send(err.message); + } + }); + + return router; + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/pai/paiTrainingService.ts b/src/nni_manager/training_service/pai/paiTrainingService.ts new file mode 100644 index 0000000000..8ed0e306f7 --- /dev/null +++ b/src/nni_manager/training_service/pai/paiTrainingService.ts @@ -0,0 +1,401 @@ + +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict' + +import * as component from '../../common/component'; +import * as cpp from 'child-process-promise'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as request from 'request'; + +import { Deferred } from 'ts-deferred'; +import { EventEmitter } from 'events'; +import { getExperimentId } from '../../common/experimentStartupInfo'; +import { HDFSClientUtility } from './hdfsClientUtility' +import { MethodNotImplementedError } from '../../common/errors'; +import { getLogger, Logger } from '../../common/log'; +import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; +import { + JobApplicationForm, TrainingService, TrialJobApplicationForm, + TrialJobDetail, TrialJobMetric +} from '../../common/trainingService'; +import { delay, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; +import { PAIJobRestServer } from './paiJobRestServer' +import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData'; +import { PAIJobInfoCollector } from './paiJobInfoCollector'; +import { String } from 'typescript-string-operations'; +import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig'; + +var WebHDFS = require('webhdfs'); + +/** + * Training Service implementation for OpenPAI (Open Platform for AI) + * Refer https://github.com/Microsoft/pai for more info about OpenPAI + */ +@component.Singleton +class PAITrainingService implements TrainingService { + private readonly log!: Logger; + private readonly metricsEmitter: EventEmitter; + private readonly trialJobsMap: Map; + private readonly expRootDir: string; + private paiTrialConfig: NNIPAITrialConfig | undefined; + private paiClusterConfig?: PAIClusterConfig; + private stopping: boolean = false; + private hdfsClient: any; + private paiToken? : string; + private experimentId! : string; + private readonly paiJobCollector : PAIJobInfoCollector; + private readonly hdfsDirPattern: string; + + constructor() { + this.log = getLogger(); + this.metricsEmitter = new EventEmitter(); + this.trialJobsMap = new Map(); + // Root dir on HDFS + this.expRootDir = path.join('/nni', 'experiments', getExperimentId()); + this.experimentId = getExperimentId(); + this.paiJobCollector = new PAIJobInfoCollector(this.trialJobsMap); + this.hdfsDirPattern = 'hdfs://(?([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?/.*)?'; + } + + public async run(): Promise { + const restServer: PAIJobRestServer = component.get(PAIJobRestServer); + await restServer.start(); + this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`); + while (!this.stopping) { + await this.paiJobCollector.updateTrialStatusFromPAI(this.paiToken, this.paiClusterConfig); + await delay(3000); + } + } + + public async listTrialJobs(): Promise { + const jobs: TrialJobDetail[] = []; + + this.trialJobsMap.forEach(async (value: PAITrialJobDetail, key: string) => { + if (value.form.jobType === 'TRIAL') { + jobs.push(await this.getTrialJob(key)); + } + }); + + return Promise.resolve(jobs); + } + + public getTrialJob(trialJobId: string): Promise { + if(!this.paiClusterConfig) { + throw new Error('PAI Cluster config is not initialized'); + } + + const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); + + if (!paiTrialJob) { + return Promise.reject(`trial job ${trialJobId} not found`) + } + + return Promise.resolve(paiTrialJob); + } + + public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { + this.metricsEmitter.on('metric', listener); + } + + public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { + this.metricsEmitter.off('metric', listener); + } + + public async submitTrialJob(form: JobApplicationForm): Promise { + const deferred : Deferred = new Deferred(); + if(!this.paiClusterConfig) { + throw new Error('PAI Cluster config is not initialized'); + } + if (!this.paiTrialConfig) { + throw new Error('trial config is not initialized'); + } + if (!this.paiToken) { + throw new Error('PAI token is not initialized'); + } + + this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`); + + const trialJobId: string = uniqueString(5); + //TODO: use HDFS working folder instead + const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId); + + const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); + //create tmp trial working folder locally. + await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`); + await cpp.exec(`cp -r ${this.paiTrialConfig.codeDir} ${trialLocalTempFolder}`); + + // Write file content ( parameter.cfg ) to local tmp folders + const trialForm : TrialJobApplicationForm = (form) + if(trialForm) { + await fs.promises.writeFile(path.join(trialLocalTempFolder, 'parameter.cfg'), trialForm.hyperParameters, { encoding: 'utf8' }); + } + + // Step 1. Prepare PAI job configuration + const paiJobName : string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; + const hdfsCodeDir : string = path.join(this.expRootDir, trialJobId); + + const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern); + + if(hdfsDirContent === null) { + throw new Error('Trial outputDir format Error'); + } + const groups = hdfsDirContent.groups; + if(groups === undefined) { + throw new Error('Trial outputDir format Error'); + } + + const hdfsHost = groups['host']; + let hdfsBaseDirectory = groups['baseDir']; + if(hdfsBaseDirectory === undefined) { + hdfsBaseDirectory = "/"; + } + const hdfsOutputDir : string = path.join(hdfsBaseDirectory, this.experimentId, trialJobId); + const hdfsLogPath : string = String.Format( + PAI_LOG_PATH_FORMAT, + hdfsHost, + hdfsOutputDir); + + const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail( + trialJobId, + 'WAITING', + paiJobName, + Date.now(), + trialWorkingFolder, + form, + hdfsLogPath); + this.trialJobsMap.set(trialJobId, trialJobDetail); + + const nniPaiTrialCommand : string = String.Format( + PAI_TRIAL_COMMAND_FORMAT, + // PAI will copy job's codeDir into /root directory + `/root/${trialJobId}`, + trialJobId, + this.experimentId, + this.paiTrialConfig.command, + getIPV4Address(), + hdfsOutputDir, + hdfsHost, + this.paiClusterConfig.userName + ).replace(/\r\n|\n|\r/gm, ''); + + console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`); + const paiTaskRoles : PAITaskRole[] = [new PAITaskRole('nni_trail_' + trialJobId, + // Task role number + 1, + // Task CPU number + this.paiTrialConfig.cpuNum, + // Task memory + this.paiTrialConfig.memoryMB, + // Task GPU number + this.paiTrialConfig.gpuNum, + // Task command + nniPaiTrialCommand)]; + + const paiJobConfig : PAIJobConfig = new PAIJobConfig( + // Job name + paiJobName, + // Docker image + this.paiTrialConfig.image, + // dataDir + this.paiTrialConfig.dataDir, + // outputDir + this.paiTrialConfig.outputDir, + // codeDir + `$PAI_DEFAULT_FS_URI${hdfsCodeDir}`, + // TODO: Add Virutal Cluster + // PAI Task roles + paiTaskRoles); + + // Step 2. Upload code files in codeDir onto HDFS + try { + await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient); + } catch (error) { + this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`); + throw new Error(error.message); + } + + // Step 3. Submit PAI job via Rest call + // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API + const submitJobRequest: request.Options = { + uri: `http://${this.paiClusterConfig.host}:9186/api/v1/jobs`, + method: 'POST', + json: true, + body: paiJobConfig, + headers: { + "Content-Type": "application/json", + "Authorization": 'Bearer ' + this.paiToken + } + }; + request(submitJobRequest, (error: Error, response: request.Response, body: any) => { + if (error || response.statusCode >= 400) { + this.log.error(`PAI Training service: Submit trial ${trialJobId} to PAI Cluster failed!`); + trialJobDetail.status = 'FAILED'; + deferred.reject(error ? error.message : 'Submit trial failed, http code: ' + response.statusCode); + } else { + trialJobDetail.submitTime = Date.now(); + deferred.resolve(trialJobDetail); + } + }); + + return deferred.promise; + } + + public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + throw new MethodNotImplementedError(); + } + + public get isMultiPhaseJobSupported(): boolean { + return false; + } + + public cancelTrialJob(trialJobId: string): Promise { + const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); + const deferred : Deferred = new Deferred(); + if(!trialJobDetail) { + this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`); + return Promise.reject(); + } + + if(!this.paiClusterConfig) { + throw new Error('PAI Cluster config is not initialized'); + } + if (!this.paiToken) { + throw new Error('PAI token is not initialized'); + } + + const stopJobRequest: request.Options = { + uri: `http://${this.paiClusterConfig.host}:9186/api/v1/jobs/${trialJobDetail.paiJobName}/executionType`, + method: 'PUT', + json: true, + body: {'value' : 'STOP'}, + headers: { + "Content-Type": "application/json", + "Authorization": 'Bearer ' + this.paiToken + } + }; + request(stopJobRequest, (error: Error, response: request.Response, body: any) => { + if (error || response.statusCode >= 400) { + this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`); + deferred.reject(error ? error.message : 'Stop trial failed, http code: ' + response.statusCode); + } else { + deferred.resolve(); + } + }); + + return deferred.promise; + } + + public setClusterMetadata(key: string, value: string): Promise { + const deferred : Deferred = new Deferred(); + + switch (key) { + case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG: + //TODO: try catch exception when setting up HDFS client and get PAI token + this.paiClusterConfig = JSON.parse(value); + + this.hdfsClient = WebHDFS.createClient({ + user: this.paiClusterConfig.userName, + port: 50070, + host: this.paiClusterConfig.host + }); + + // Get PAI authentication token + const authentication_req: request.Options = { + uri: `http://${this.paiClusterConfig.host}:9186/api/v1/token`, + method: 'POST', + json: true, + body: { + username: this.paiClusterConfig.userName, + password: this.paiClusterConfig.passWord + } + }; + + request(authentication_req, (error: Error, response: request.Response, body: any) => { + if (error) { + //TODO: should me make the setClusterMetadata's return type to Promise? + this.log.error(`Get PAI token failed: ${error.message}`); + deferred.reject(); + } else { + if(response.statusCode !== 200){ + this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`); + deferred.reject(); + } + this.paiToken = body.token; + + deferred.resolve(); + } + }); + break; + case TrialConfigMetadataKey.TRIAL_CONFIG: + if (!this.paiClusterConfig){ + this.log.error('pai cluster config is not initialized'); + deferred.reject(); + break; + } + this.paiTrialConfig = JSON.parse(value); + //paiTrialConfig.outputDir could be null if it is not set in nnictl + if(this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null){ + this.paiTrialConfig.outputDir = String.Format( + PAI_OUTPUT_DIR_FORMAT, + this.paiClusterConfig.host + ).replace(/\r\n|\n|\r/gm, ''); + } + deferred.resolve(); + break; + default: + //Reject for unknown keys + throw new Error(`Uknown key: ${key}`); + } + + return deferred.promise; + } + + public getClusterMetadata(key: string): Promise { + const deferred : Deferred = new Deferred(); + + deferred.resolve(); + return deferred.promise; + } + + public async cleanUp(): Promise { + this.stopping = true; + + const deferred : Deferred = new Deferred(); + const restServer: PAIJobRestServer = component.get(PAIJobRestServer); + try { + await restServer.stop(); + deferred.resolve(); + this.log.info('PAI Training service rest server stopped successfully.'); + } catch (error) { + this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`); + deferred.reject(error); + } + + return deferred.promise; + } + + public get MetricsEmitter() : EventEmitter { + return this.metricsEmitter; + } +} + +export { PAITrainingService } \ No newline at end of file diff --git a/src/nni_manager/training_service/pai/paiTrialConfig.ts b/src/nni_manager/training_service/pai/paiTrialConfig.ts new file mode 100644 index 0000000000..583db9e725 --- /dev/null +++ b/src/nni_manager/training_service/pai/paiTrialConfig.ts @@ -0,0 +1,39 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import {TrialConfig} from '../common/trialConfig' + +export class PAITrialConfig extends TrialConfig{ + public readonly cpuNum: number; + public readonly memoryMB: number; + public readonly image: string; + public readonly dataDir: string; + public readonly outputDir: string; + + constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string, dataDir: string, outputDir: string) { + super(command, codeDir, gpuNum); + this.cpuNum = cpuNum; + this.memoryMB = memoryMB; + this.image = image; + this.dataDir = dataDir; + this.outputDir = outputDir; + } +} \ No newline at end of file diff --git a/src/nni_manager/training_service/remote_machine/metricsCollector.ts b/src/nni_manager/training_service/remote_machine/metricsCollector.ts index 3e346e7000..eb59a51d99 100644 --- a/src/nni_manager/training_service/remote_machine/metricsCollector.ts +++ b/src/nni_manager/training_service/remote_machine/metricsCollector.ts @@ -25,7 +25,8 @@ import * as path from 'path'; import { Client } from 'ssh2'; import { getLogger, Logger } from '../../common/log'; import { TrialJobStatus, TrialJobDetail } from '../../common/trainingService'; -import { JobMetrics, RemoteCommandResult, RemoteMachineMeta, RemoteMachineTrialJobDetail } from './remoteMachineData'; +import { JobMetrics } from '../common/jobMetrics'; +import { RemoteCommandResult, RemoteMachineMeta, RemoteMachineTrialJobDetail } from './remoteMachineData'; import { SSHClientUtility } from './sshClientUtility'; export class MetricsCollector { diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts index 1e52458790..0cd3a028dc 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts @@ -65,21 +65,6 @@ export class RemoteCommandResult { } } -// tslint:disable-next-line:max-classes-per-file -export class JobMetrics { - public readonly jobId: string; - public readonly metrics: string[]; - public readonly jobStatus: TrialJobStatus; - public readonly endTimestamp: number; - - constructor(jobId : string, metrics : string[], jobStatus : TrialJobStatus, endTimestamp : number) { - this.jobId = jobId; - this.metrics = metrics; - this.jobStatus = jobStatus; - this.endTimestamp = endTimestamp; - } -} - /** * RemoteMachineTrialJobDetail */ @@ -121,7 +106,7 @@ export enum ScheduleResultType { REQUIRE_EXCEED_TOTAL } -export const REMOTEMACHINERUNSHELLFORMAT: string = +export const REMOTEMACHINE_RUN_SHELL_FORMAT: string = `#!/bin/bash export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_TRIAL_JOB_ID={1} NNI_OUTPUT_DIR={0} cd $NNI_SYS_DIR @@ -129,7 +114,7 @@ echo $$ >{2} eval {3}{4} 2>{5} echo $? \`date +%s%3N\` >{6}`; -export const HOSTJOBSHELLFORMAT: string = +export const HOST_JOB_SHELL_FORMAT: string = `#!/bin/bash cd {0} echo $$ >{1} diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index 772b93ff5d..e1cff16f22 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -43,8 +43,8 @@ import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { GPUScheduler } from './gpuScheduler'; import { MetricsCollector } from './metricsCollector'; import { - HOSTJOBSHELLFORMAT, RemoteCommandResult, RemoteMachineMeta, - REMOTEMACHINERUNSHELLFORMAT, RemoteMachineScheduleInfo, RemoteMachineScheduleResult, + HOST_JOB_SHELL_FORMAT, RemoteCommandResult, RemoteMachineMeta, + REMOTEMACHINE_RUN_SHELL_FORMAT, RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail, ScheduleResultType } from './remoteMachineData'; import { SSHClientUtility } from './sshClientUtility'; @@ -427,7 +427,7 @@ class RemoteMachineTrainingService implements TrainingService { // RemoteMachineRunShellFormat is the run shell format string, // See definition in remoteMachineData.ts const runScriptContent: string = String.Format( - REMOTEMACHINERUNSHELLFORMAT, + REMOTEMACHINE_RUN_SHELL_FORMAT, trialWorkingFolder, trialJobId, path.join(trialWorkingFolder, '.nni', 'jobpid'), @@ -470,7 +470,7 @@ class RemoteMachineTrainingService implements TrainingService { await cpp.exec(`mkdir -p ${localDir}`); await SSHClientUtility.remoteExeCommand(`mkdir -p ${remoteDir}`, sshClient); const runScriptContent: string = String.Format( - HOSTJOBSHELLFORMAT, remoteDir, path.join(remoteDir, 'jobpid'), form.cmd, path.join(remoteDir, 'code') + HOST_JOB_SHELL_FORMAT, remoteDir, path.join(remoteDir, 'jobpid'), form.cmd, path.join(remoteDir, 'code') ); await fs.promises.writeFile(path.join(localDir, 'run.sh'), runScriptContent, { encoding: 'utf8' }); await SSHClientUtility.copyFileToRemote( diff --git a/src/nni_manager/training_service/test/hdfsClientUtility.test.ts b/src/nni_manager/training_service/test/hdfsClientUtility.test.ts new file mode 100644 index 0000000000..b8cf30e83a --- /dev/null +++ b/src/nni_manager/training_service/test/hdfsClientUtility.test.ts @@ -0,0 +1,143 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; +import * as chai from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import * as tmp from 'tmp'; +import { cleanupUnitTest, prepareUnitTest, uniqueString } from '../../common/utils'; +import { HDFSClientUtility } from '../pai/hdfsClientUtility'; + +var WebHDFS = require('webhdfs'); +var rmdir = require('rmdir'); + +describe('WebHDFS', function () { + /* + To enable web HDFS client unit test, HDFS information needs to be configured in: + Default/.vscode/hdfsInfo.json, whose content looks like: + { + "user": "user1", + "port": 50070, + "host": "10.0.0.0" + } + */ + let skip: boolean = false; + let testHDFSInfo: any; + let hdfsClient: any; + try { + testHDFSInfo = JSON.parse(fs.readFileSync('../../.vscode/hdfsInfo.json', 'utf8')); + console.log(testHDFSInfo); + hdfsClient = WebHDFS.createClient({ + user: testHDFSInfo.user, + port: testHDFSInfo.port, + host: testHDFSInfo.host + }); + } catch (err) { + console.log('Please configure rminfo.json to enable remote machine unit test.'); + skip = true; + } + + before(() => { + chai.should(); + chai.use(chaiAsPromised); + tmp.setGracefulCleanup(); + prepareUnitTest(); + }); + + after(() => { + cleanupUnitTest(); + }); + + it('Test HDFS utility path functions', async () => { + if (skip) { + return; + } + const testPath : string = '/nni_unittest_' + uniqueString(6); + let exists : boolean = await HDFSClientUtility.pathExists(testPath, hdfsClient); + // The new random named path is expected to not exist + chai.expect(exists).to.be.equals(false); + + const mkdirResult : boolean = await HDFSClientUtility.mkdir(testPath, hdfsClient); + // Mkdir is expected to be successful + chai.expect(mkdirResult).to.be.equals(true); + + exists = await HDFSClientUtility.pathExists(testPath, hdfsClient); + // The newly created path is expected to exist + chai.expect(exists).to.be.equals(true); + + const deleteResult : boolean = await HDFSClientUtility.deletePath(testPath, hdfsClient); + // Delete path is expected to be successful + chai.expect(deleteResult).to.be.equals(true); + + exists = await HDFSClientUtility.pathExists(testPath, hdfsClient); + // The deleted path is not expected to exist + chai.expect(exists).to.be.equals(false); + }); + + it('Test HDFS utility copyFileToHdfs', async() => { + if (skip) { + return; + } + // Prepare local directory and files + const tmpLocalDirectoryPath : string = path.join(os.tmpdir(), 'nni_unittest_dir_' + uniqueString(6)); + const tmpDataFilePath : string = path.join(tmpLocalDirectoryPath, 'file_' + uniqueString(6)); + const testFileData : string = 'TestContent123'; + fs.mkdirSync(tmpLocalDirectoryPath); + fs.writeFileSync(tmpDataFilePath, testFileData); + + const testHDFSFilePath : string = '/nni_unittest_' + uniqueString(6); + let exists : boolean = await HDFSClientUtility.pathExists(testHDFSFilePath, hdfsClient); + // The new random named path is expected to not exist + chai.expect(exists).to.be.equals(false); + + await HDFSClientUtility.copyFileToHdfs(tmpDataFilePath, testHDFSFilePath, hdfsClient); + exists = await HDFSClientUtility.pathExists(testHDFSFilePath, hdfsClient); + // After copy local file to HDFS, the target file path in HDFS is expected to exist + chai.expect(exists).to.be.equals(true); + + const buffer : Buffer = await HDFSClientUtility.readFileFromHDFS(testHDFSFilePath, hdfsClient); + const actualFileData : string = buffer.toString('utf8'); + // The file content read from HDFS is expected to equal to the content of local file + chai.expect(actualFileData).to.be.equals(testFileData); + + const testHDFSDirPath : string = path.join('/nni_unittest_' + uniqueString(6) + '_dir'); + + await HDFSClientUtility.copyDirectoryToHdfs(tmpLocalDirectoryPath, testHDFSDirPath, hdfsClient); + + const files : any[] = await HDFSClientUtility.readdir(testHDFSDirPath, hdfsClient); + + // Expected file count under HDFS target directory is 1 + chai.expect(files.length).to.be.equals(1); + + // Expected file name under HDFS target directory is equal to local file name + chai.expect(files[0].pathSuffix).to.be.equals(path.parse(tmpDataFilePath).base); + + // Cleanup + rmdir(tmpLocalDirectoryPath); + + let deleteRestult : boolean = await HDFSClientUtility.deletePath(testHDFSFilePath, hdfsClient); + chai.expect(deleteRestult).to.be.equals(true); + + deleteRestult = await HDFSClientUtility.deletePath(testHDFSDirPath, hdfsClient); + chai.expect(deleteRestult).to.be.equals(true); + }); +}); \ No newline at end of file diff --git a/src/nni_manager/training_service/test/paiTrainingService.test.ts b/src/nni_manager/training_service/test/paiTrainingService.test.ts new file mode 100644 index 0000000000..4294e4ddc1 --- /dev/null +++ b/src/nni_manager/training_service/test/paiTrainingService.test.ts @@ -0,0 +1,95 @@ +/** + * Copyright (c) Microsoft Corporation + * All rights reserved. + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING + * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +'use strict'; + +import * as chai from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; +import * as fs from 'fs'; +import * as tmp from 'tmp'; +import * as component from '../../common/component'; +import { cleanupUnitTest, prepareUnitTest } from '../../common/utils'; +import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; +import { PAITrainingService } from '../pai/paiTrainingService'; + +// TODO: copy mockedTrail.py to local folder +const localCodeDir: string = tmp.dirSync().name +const mockedTrialPath: string = './training_service/test/mockedTrial.py' +fs.copyFileSync(mockedTrialPath, localCodeDir + '/mockedTrial.py') + +describe('Unit Test for PAITrainingService', () => { + let skip: boolean = false; + let testPaiClusterInfo: any; + let paiCluster: any; + let paiTrialConfig : any; + try { + testPaiClusterInfo = JSON.parse(fs.readFileSync('../../.vscode/paiCluster.json', 'utf8')); + paiCluster = `{\"userName\":\"${testPaiClusterInfo.userName}\",\"passWord\":\"${testPaiClusterInfo.passWord}\",\"host\":\"${testPaiClusterInfo.host}\"}`; + paiTrialConfig = `{\"command\":\"echo hello && ls\",\"codeDir\":\"/home/desy/nni/examples/trials/mnist",\"gpuNum\":\"1\", +\"cpuNum\":\"1\",\"memoryMB\":\"8196\",\"image\":\"openpai/pai.example.tensorflow\",\"dataDir\":\"\",\"outputDir\":\"\"}`; + } catch (err) { + console.log('Please configure rminfo.json to enable remote machine unit test.'); + skip = true; + } + + let paiTrainingService: PAITrainingService; + + console.log(tmp.dirSync().name); + + before(() => { + chai.should(); + chai.use(chaiAsPromised); + prepareUnitTest(); + }); + + after(() => { + cleanupUnitTest(); + }); + + beforeEach(() => { + if (skip) { + return; + } + paiTrainingService = component.get(PAITrainingService); + paiTrainingService.run(); + }); + + afterEach(() => { + if (skip) { + return; + } + paiTrainingService.cleanUp(); + }); + + it('Get PAI token', async () => { + if (skip) { + return; + } + console.log(`paiCluster is ${paiCluster}`) + await paiTrainingService.setClusterMetadata(TrialConfigMetadataKey.PAI_CLUSTER_CONFIG, paiCluster); + await paiTrainingService.setClusterMetadata(TrialConfigMetadataKey.TRIAL_CONFIG, paiTrialConfig); + try { + const trialDetail = await paiTrainingService.submitTrialJob({jobType : 'TRIAL'}); + chai.expect(trialDetail.status).to.be.equals('WAITING'); + } catch(error) { + console.log('Submit job failed:' + error); + chai.assert(error) + } + }); +}); \ No newline at end of file diff --git a/src/nni_manager/yarn.lock b/src/nni_manager/yarn.lock index 8611053414..b8ca788520 100644 --- a/src/nni_manager/yarn.lock +++ b/src/nni_manager/yarn.lock @@ -224,7 +224,7 @@ accepts@~1.3.5: mime-types "~2.1.18" negotiator "0.6.1" -ajv@^5.1.0: +ajv@^5.1.0, ajv@^5.3.0: version "5.5.2" resolved "https://registry.yarnpkg.com/ajv/-/ajv-5.5.2.tgz#73b5eeca3fab653e3d3f9422b341ad42205dc965" dependencies: @@ -310,6 +310,10 @@ aws4@^1.6.0: version "1.7.0" resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.7.0.tgz#d4d0e9b9dbfca77bf08eeb0a8a471550fe39e289" +aws4@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.8.0.tgz#f0e003d9ca9e7f59c7a508945d7b2ef9a04a542f" + babel-code-frame@^6.22.0: version "6.26.0" resolved "https://registry.yarnpkg.com/babel-code-frame/-/babel-code-frame-6.26.0.tgz#63fd43f7dc1e3bb7ce35947db8fe369a3f58c74b" @@ -364,6 +368,10 @@ buffer-from@^1.0.0, buffer-from@^1.1.0: version "1.1.1" resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.1.tgz#32713bc028f75c02fdb710d7c7bcec1f2c6070ef" +buffer-stream-reader@^0.1.1: + version "0.1.1" + resolved "https://registry.yarnpkg.com/buffer-stream-reader/-/buffer-stream-reader-0.1.1.tgz#ca8bf93631deedd8b8f8c3bb44991cc30951e259" + builtin-modules@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/builtin-modules/-/builtin-modules-1.1.1.tgz#270f076c5a72c02f5b65a47df94c5fe3a278892f" @@ -455,6 +463,12 @@ combined-stream@1.0.6, combined-stream@~1.0.5: dependencies: delayed-stream "~1.0.0" +combined-stream@~1.0.6: + version "1.0.7" + resolved "https://registry.yarnpkg.com/combined-stream/-/combined-stream-1.0.7.tgz#2d1d24317afb8abe95d6d2c0b07b57813539d828" + dependencies: + delayed-stream "~1.0.0" + commander@2.15.1: version "2.15.1" resolved "https://registry.yarnpkg.com/commander/-/commander-2.15.1.tgz#df46e867d0fc2aec66a34662b406a9ccafff5b0f" @@ -635,7 +649,7 @@ extend@2.0.x: version "2.0.2" resolved "https://registry.yarnpkg.com/extend/-/extend-2.0.2.tgz#1b74985400171b85554894459c978de6ef453ab7" -extend@~3.0.1: +extend@^3.0.0, extend@~3.0.1, extend@~3.0.2: version "3.0.2" resolved "https://registry.yarnpkg.com/extend/-/extend-3.0.2.tgz#f8b1136b4071fbd8eb140aff858b1019ec2915fa" @@ -671,7 +685,7 @@ forever-agent@~0.6.1: version "0.6.1" resolved "https://registry.yarnpkg.com/forever-agent/-/forever-agent-0.6.1.tgz#fbc71f0c41adeb37f96c577ad1ed42d8fdacca91" -form-data@~2.3.1: +form-data@~2.3.1, form-data@~2.3.2: version "2.3.2" resolved "https://registry.yarnpkg.com/form-data/-/form-data-2.3.2.tgz#4970498be604c20c005d4f5c23aecd21d6b49099" dependencies: @@ -763,6 +777,13 @@ har-validator@~5.0.3: ajv "^5.1.0" har-schema "^2.0.0" +har-validator@~5.1.0: + version "5.1.0" + resolved "https://registry.yarnpkg.com/har-validator/-/har-validator-5.1.0.tgz#44657f5688a22cfd4b72486e81b3a3fb11742c29" + dependencies: + ajv "^5.3.0" + har-schema "^2.0.0" + has-ansi@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/has-ansi/-/has-ansi-2.0.0.tgz#34f5049ce1ecdf2b0649af3ef24e45ed35416d91" @@ -870,6 +891,10 @@ is-typedarray@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/is-typedarray/-/is-typedarray-1.0.0.tgz#e479c80858df0c1b11ddda6940f96011fcda4a9a" +is@~0.2.6: + version "0.2.7" + resolved "http://registry.npmjs.org/is/-/is-0.2.7.tgz#3b34a2c48f359972f35042849193ae7264b63562" + isarray@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/isarray/-/isarray-1.0.0.tgz#bb935d48582cba168c06834957a54a3e07124f11" @@ -958,12 +983,22 @@ mime-db@~1.35.0: version "1.35.0" resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.35.0.tgz#0569d657466491283709663ad379a99b90d9ab47" +mime-db@~1.36.0: + version "1.36.0" + resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.36.0.tgz#5020478db3c7fe93aad7bbcc4dcf869c43363397" + mime-types@^2.1.12, mime-types@~2.1.17, mime-types@~2.1.18: version "2.1.19" resolved "https://registry.yarnpkg.com/mime-types/-/mime-types-2.1.19.tgz#71e464537a7ef81c15f2db9d97e913fc0ff606f0" dependencies: mime-db "~1.35.0" +mime-types@~2.1.19: + version "2.1.20" + resolved "https://registry.yarnpkg.com/mime-types/-/mime-types-2.1.20.tgz#930cb719d571e903738520f8470911548ca2cc19" + dependencies: + mime-db "~1.36.0" + mime@1.4.1: version "1.4.1" resolved "https://registry.yarnpkg.com/mime/-/mime-1.4.1.tgz#121f9ebc49e3766f311a76e1fa1c8003c4b03aa6" @@ -1066,6 +1101,19 @@ node-version@^1.0.0: version "1.2.0" resolved "https://registry.yarnpkg.com/node-version/-/node-version-1.2.0.tgz#34fde3ffa8e1149bd323983479dda620e1b5060d" +node.extend@1.0.8: + version "1.0.8" + resolved "https://registry.yarnpkg.com/node.extend/-/node.extend-1.0.8.tgz#bab04379f7383f4587990c9df07b6a7f65db772b" + dependencies: + is "~0.2.6" + object-keys "~0.4.0" + +node.flow@1.2.3: + version "1.2.3" + resolved "https://registry.yarnpkg.com/node.flow/-/node.flow-1.2.3.tgz#e1c44a82aeca8d78b458a77fb3dc642f2eba2649" + dependencies: + node.extend "1.0.8" + nopt@^4.0.1: version "4.0.1" resolved "https://registry.yarnpkg.com/nopt/-/nopt-4.0.1.tgz#d0d4685afd5415193c8c7505602d0d17cd64474d" @@ -1101,10 +1149,18 @@ oauth-sign@~0.8.2: version "0.8.2" resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.8.2.tgz#46a6ab7f0aead8deae9ec0565780b7d4efeb9d43" +oauth-sign@~0.9.0: + version "0.9.0" + resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.9.0.tgz#47a7b016baa68b5fa0ecf3dee08a85c679ac6455" + object-assign@^4.0.1, object-assign@^4.1.0: version "4.1.1" resolved "https://registry.yarnpkg.com/object-assign/-/object-assign-4.1.1.tgz#2109adc7965887cfc05cbbd442cac8bfbb360863" +object-keys@~0.4.0: + version "0.4.0" + resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336" + on-finished@~2.3.0: version "2.3.0" resolved "https://registry.yarnpkg.com/on-finished/-/on-finished-2.3.0.tgz#20f1336481b083cd75337992a16971aa2d906947" @@ -1199,6 +1255,10 @@ pseudomap@^1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/pseudomap/-/pseudomap-1.0.2.tgz#f052a28da70e618917ef0a8ac34c1ae5a68286b3" +psl@^1.1.24: + version "1.1.29" + resolved "https://registry.yarnpkg.com/psl/-/psl-1.1.29.tgz#60f580d360170bb722a797cc704411e6da850c67" + punycode@^1.4.1: version "1.4.1" resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e" @@ -1207,7 +1267,7 @@ qs@6.5.1: version "6.5.1" resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.1.tgz#349cdf6eef89ec45c12d7d5eb3fc0c870343a6d8" -qs@~6.5.1: +qs@~6.5.1, qs@~6.5.2: version "6.5.2" resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.2.tgz#cb3ae806e8740444584ef154ce8ee98d403f3e36" @@ -1249,6 +1309,31 @@ reflect-metadata@^0.1.10: version "0.1.12" resolved "https://registry.yarnpkg.com/reflect-metadata/-/reflect-metadata-0.1.12.tgz#311bf0c6b63cd782f228a81abe146a2bfa9c56f2" +request@^2.74.0: + version "2.88.0" + resolved "https://registry.yarnpkg.com/request/-/request-2.88.0.tgz#9c2fca4f7d35b592efe57c7f0a55e81052124fef" + dependencies: + aws-sign2 "~0.7.0" + aws4 "^1.8.0" + caseless "~0.12.0" + combined-stream "~1.0.6" + extend "~3.0.2" + forever-agent "~0.6.1" + form-data "~2.3.2" + har-validator "~5.1.0" + http-signature "~1.2.0" + is-typedarray "~1.0.0" + isstream "~0.1.2" + json-stringify-safe "~5.0.1" + mime-types "~2.1.19" + oauth-sign "~0.9.0" + performance-now "^2.1.0" + qs "~6.5.2" + safe-buffer "^5.1.2" + tough-cookie "~2.4.3" + tunnel-agent "^0.6.0" + uuid "^3.3.2" + request@^2.87.0: version "2.87.0" resolved "https://registry.yarnpkg.com/request/-/request-2.87.0.tgz#32f00235cd08d482b4d0d68db93a829c0ed5756e" @@ -1294,6 +1379,12 @@ rimraf@^2.6.1: dependencies: glob "^7.0.5" +rmdir@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/rmdir/-/rmdir-1.2.0.tgz#4fe0357cb06168c258e73e968093dc4e8a0f3253" + dependencies: + node.flow "1.2.3" + rx@^4.1.0: version "4.1.0" resolved "https://registry.yarnpkg.com/rx/-/rx-4.1.0.tgz#a5f13ff79ef3b740fe30aa803fb09f98805d4782" @@ -1510,6 +1601,13 @@ tough-cookie@~2.3.3: dependencies: punycode "^1.4.1" +tough-cookie@~2.4.3: + version "2.4.3" + resolved "https://registry.yarnpkg.com/tough-cookie/-/tough-cookie-2.4.3.tgz#53f36da3f47783b0925afa06ff9f3b165280f781" + dependencies: + psl "^1.1.24" + punycode "^1.4.1" + tree-kill@^1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/tree-kill/-/tree-kill-1.2.0.tgz#5846786237b4239014f05db156b643212d4c6f36" @@ -1612,7 +1710,7 @@ utils-merge@1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713" -uuid@^3.1.0: +uuid@^3.1.0, uuid@^3.3.2: version "3.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131" @@ -1628,6 +1726,14 @@ verror@1.10.0: core-util-is "1.0.2" extsprintf "^1.2.0" +webhdfs@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/webhdfs/-/webhdfs-1.2.0.tgz#c41b08ae33944a0220863bfd4b6719b9aaec1d37" + dependencies: + buffer-stream-reader "^0.1.1" + extend "^3.0.0" + request "^2.74.0" + which@^1.2.9: version "1.3.1" resolved "https://registry.yarnpkg.com/which/-/which-1.3.1.tgz#a45043d54f5805316da8d62f9f50918d3da70b0a" diff --git a/src/sdk/pynni/nni/platform/__init__.py b/src/sdk/pynni/nni/platform/__init__.py index e0b44e49cb..fed452fc47 100644 --- a/src/sdk/pynni/nni/platform/__init__.py +++ b/src/sdk/pynni/nni/platform/__init__.py @@ -27,7 +27,7 @@ from .standalone import * elif env_args.platform == 'unittest': from .test import * -elif env_args.platform in ('local', 'remote'): +elif env_args.platform in ('local', 'remote', 'pai'): from .local import * else: raise RuntimeError('Unknown platform %s' % env_args.platform) diff --git a/src/sdk/pynni/setup.py b/src/sdk/pynni/setup.py index fee463e371..fae0ceac41 100644 --- a/src/sdk/pynni/setup.py +++ b/src/sdk/pynni/setup.py @@ -27,7 +27,7 @@ def read(fname): setuptools.setup( name = 'nni', - version = '0.0.1', + version = '0.2.0', packages = setuptools.find_packages(exclude=['tests']), python_requires = '>=3.5', @@ -44,7 +44,7 @@ def read(fname): author_email = 'nni@microsoft.com', description = 'Python SDK for Neural Network Intelligence project', license = 'MIT', - url = 'https://msrasrg.visualstudio.com/NeuralNetworkIntelligence', + url = 'https://github.com/Microsoft/nni', long_description = read('README.md') ) diff --git a/tools/nnicmd/config_schema.py b/tools/nnicmd/config_schema.py index 1695377ca3..83b476d0c2 100644 --- a/tools/nnicmd/config_schema.py +++ b/tools/nnicmd/config_schema.py @@ -21,7 +21,7 @@ import os from schema import Schema, And, Use, Optional, Regex, Or -CONFIG_SCHEMA = Schema({ +common_schema = { 'authorName': str, 'experimentName': str, 'trialConcurrency': And(int, lambda n: 1 <=n <= 999999), @@ -44,11 +44,6 @@ Optional('classArgs'): dict, Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), }), -'trial':{ - 'command': str, - 'codeDir': os.path.exists, - 'gpuNum': And(int, lambda x: 0 <= x <= 99999) - }, Optional('assessor'): Or({ 'builtinAssessorName': lambda x: x in ['Medianstop'], 'classArgs': { @@ -58,10 +53,41 @@ 'codeDir': os.path.exists, 'classFileName': str, 'className': str, - 'classArgs': { - 'optimize_mode': lambda x: x in ['maximize', 'minimize']}, - 'gpuNum': And(int, lambda x: 0 <= x <= 99999), + Optional('classArgs'): dict, + Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), }), +} + +common_trial_schema = { +'trial':{ + 'command': str, + 'codeDir': os.path.exists, + 'gpuNum': And(int, lambda x: 0 <= x <= 99999) + } +} + +pai_trial_schema = { +'trial':{ + 'command': str, + 'codeDir': os.path.exists, + 'gpuNum': And(int, lambda x: 0 <= x <= 99999), + 'cpuNum': And(int, lambda x: 0 <= x <= 99999), + 'memoryMB': int, + 'image': str, + 'dataDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'), + 'outputDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?') + } +} + +pai_config_schema = { +'paiConfig':{ + 'userName': str, + 'passWord': str, + 'host': str +} +} + +machine_list_schima = { Optional('machineList'):[Or({ 'ip': str, 'port': And(int, lambda x: 0 < x < 65535), @@ -73,37 +99,11 @@ 'username': str, 'sshKeyPath': os.path.exists, Optional('passphrase'): str -})], -Optional('pai'): -{ - 'jobName': str, - "image": str, - "authFile": os.path.exists, - "dataDir": os.path.exists, - "outputDir": os.path.exists, - "codeDir": os.path.exists, - "virtualCluster": str, - "taskRoles": [ - { - "name": str, - "taskNumber": And(int, lambda x: 0 <= x <= 99999), - "cpuNumber": And(int, lambda x: 0 <= x <= 99999), - "memoryMB": And(int, lambda x: 0 <= x <= 99999), - "shmMB": And(int, lambda x: 0 <= x <= 99999), - "gpuNumber": And(int, lambda x: 0 <= x <= 99999), - "portList": [ - { - "label": str, - "beginAt": str, - "portNumber": And(int, lambda x: 0 < x < 65535) - } - ], - "command": str, - "minFailedTaskCount": And(int, lambda x: 0 <= x <= 99999), - "minSucceededTaskCount": And(int, lambda x: 0 <= x <= 99999) - } - ], - "gpuType": str, - "retryCount": And(int, lambda x: 0 <= x <= 99999) +})] } -}) + +LOCAL_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema}) + +REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schima}) + +PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema}) \ No newline at end of file diff --git a/tools/nnicmd/launcher.py b/tools/nnicmd/launcher.py index a0b382b1ec..dfafae87f9 100644 --- a/tools/nnicmd/launcher.py +++ b/tools/nnicmd/launcher.py @@ -65,6 +65,16 @@ def set_trial_config(experiment_config, port): value_dict['command'] = experiment_config['trial']['command'] value_dict['codeDir'] = experiment_config['trial']['codeDir'] value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] + if experiment_config['trial'].get('cpuNum'): + value_dict['cpuNum'] = experiment_config['trial']['cpuNum'] + if experiment_config['trial'].get('memoryMB'): + value_dict['memoryMB'] = experiment_config['trial']['memoryMB'] + if experiment_config['trial'].get('image'): + value_dict['image'] = experiment_config['trial']['image'] + if experiment_config['trial'].get('dataDir'): + value_dict['dataDir'] = experiment_config['trial']['dataDir'] + if experiment_config['trial'].get('outputDir'): + value_dict['outputDir'] = experiment_config['trial']['outputDir'] request_data['trial_config'] = value_dict response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 20) if check_response(response): @@ -95,6 +105,20 @@ def set_remote_config(experiment_config, port): #set trial_config return set_trial_config(experiment_config, port), err_message +def set_pai_config(experiment_config, port): + '''set pai configuration''' + pai_config_data = dict() + pai_config_data['pai_config'] = experiment_config['paiConfig'] + response = rest_put(cluster_metadata_url(port), json.dumps(pai_config_data), 20) + err_message = '' + if not response or not response.status_code == 200: + if response is not None: + err_message = response.text + return False, err_message + + #set trial_config + return set_trial_config(experiment_config, port), err_message + def set_experiment(experiment_config, mode, port): '''Call startExperiment (rest POST /experiment) with yaml file content''' request_data = dict() @@ -114,7 +138,7 @@ def set_experiment(experiment_config, mode, port): {'key':'codeDir', 'value':experiment_config['trial']['codeDir']}) request_data['clusterMetaData'].append( {'key': 'command', 'value': experiment_config['trial']['command']}) - else: + elif experiment_config['trainingServicePlatform'] == 'remote': request_data['clusterMetaData'].append( {'key': 'machine_list', 'value': experiment_config['machineList']}) value_dict = dict() @@ -123,6 +147,25 @@ def set_experiment(experiment_config, mode, port): value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] request_data['clusterMetaData'].append( {'key': 'trial_config', 'value': value_dict}) + elif experiment_config['trainingServicePlatform'] == 'pai': + request_data['clusterMetaData'].append( + {'key': 'pai_config', 'value': experiment_config['paiConfig']}) + value_dict = dict() + value_dict['command'] = experiment_config['trial']['command'] + value_dict['codeDir'] = experiment_config['trial']['codeDir'] + value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] + if experiment_config['trial'].get('cpuNum'): + value_dict['cpuNum'] = experiment_config['trial']['cpuNum'] + if experiment_config['trial'].get('memoryMB'): + value_dict['memoryMB'] = experiment_config['trial']['memoryMB'] + if experiment_config['trial'].get('image'): + value_dict['image'] = experiment_config['trial']['image'] + if experiment_config['trial'].get('dataDir'): + value_dict['dataDir'] = experiment_config['trial']['dataDir'] + if experiment_config['trial'].get('outputDir'): + value_dict['outputDir'] = experiment_config['trial']['outputDir'] + request_data['clusterMetaData'].append( + {'key': 'trial_config', 'value': value_dict}) response = rest_post(experiment_url(port), json.dumps(request_data), 20) if check_response(response): @@ -203,6 +246,21 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No except Exception: raise Exception(ERROR_INFO % 'Rest server stopped!') exit(0) + + #set pai config + if experiment_config['trainingServicePlatform'] == 'pai': + print_normal('Setting pai config...') + config_result, err_msg = set_pai_config(experiment_config, REST_PORT) + if config_result: + print_normal('Success!') + else: + print_error('Failed! Error is: {}'.format(err_msg)) + try: + cmds = ['pkill', '-P', str(rest_process.pid)] + call(cmds) + except Exception: + raise Exception(ERROR_INFO % 'Rest server stopped!') + exit(0) # start a new experiment print_normal('Starting experiment...') @@ -228,9 +286,10 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No else: print_normal('Starting web ui...') webui_process = start_web_ui(webuiport) - nni_config.set_config('webuiPid', webui_process.pid) - print_normal('Starting web ui success!') - print_normal('{0} {1}'.format('Web UI url:', ' '.join(nni_config.get_config('webuiUrl')))) + if webui_process: + nni_config.set_config('webuiPid', webui_process.pid) + print_normal('Starting web ui success!') + print_normal('{0} {1}'.format('Web UI url:', ' '.join(nni_config.get_config('webuiUrl')))) print_normal(EXPERIMENT_SUCCESS_INFO % (experiment_id, REST_PORT)) diff --git a/tools/nnicmd/launcher_utils.py b/tools/nnicmd/launcher_utils.py index 384e3a6eb7..30c9cea13e 100644 --- a/tools/nnicmd/launcher_utils.py +++ b/tools/nnicmd/launcher_utils.py @@ -20,8 +20,8 @@ import os import json -from .config_schema import CONFIG_SCHEMA -from .common_utils import get_json_content +from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA +from .common_utils import get_json_content, print_error def expand_path(experiment_config, key): '''Change '~' to user home directory''' @@ -81,8 +81,17 @@ def validate_search_space_content(experiment_config): def validate_common_content(experiment_config): '''Validate whether the common values in experiment_config is valid''' + if not experiment_config.get('trainingServicePlatform') or \ + experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai']: + print_error('Please set correct trainingServicePlatform!') + exit(0) + schema_dict = { + 'local': LOCAL_CONFIG_SCHEMA, + 'remote': REMOTE_CONFIG_SCHEMA, + 'pai': PAI_CONFIG_SCHEMA + } try: - CONFIG_SCHEMA.validate(experiment_config) + schema_dict.get(experiment_config['trainingServicePlatform']).validate(experiment_config) #set default value if experiment_config.get('maxExecDuration') is None: experiment_config['maxExecDuration'] = '999d' diff --git a/tools/setup.py b/tools/setup.py index d789c265e6..7b368f4267 100644 --- a/tools/setup.py +++ b/tools/setup.py @@ -2,8 +2,8 @@ setuptools.setup( name = 'nnictl', - version = '0.0.1', - packages = setuptools.find_packages(), + version = '0.2.0', + packages = setuptools.find_packages(exclude=['*test*']), python_requires = '>=3.5', install_requires = [ @@ -11,12 +11,13 @@ 'pyyaml', 'psutil', 'astor', - 'schema' + 'schema', + 'pyhdfs' ], author = 'Microsoft NNI Team', author_email = 'nni@microsoft.com', description = 'NNI control for Neural Network Intelligence project', license = 'MIT', - url = 'https://msrasrg.visualstudio.com/NeuralNetworkIntelligence', + url = 'https://github.com/Microsoft/nni', ) diff --git a/tools/trial_tool/__init__.py b/tools/trial_tool/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/trial_tool/constants.py b/tools/trial_tool/constants.py new file mode 100644 index 0000000000..7ff3d7847f --- /dev/null +++ b/tools/trial_tool/constants.py @@ -0,0 +1,37 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import os + +API_ROOT_URL = '/api/v1/nni-pai' + +BASE_URL = 'http://{}' + +DEFAULT_REST_PORT = 51189 + +HOME_DIR = os.path.join(os.environ['HOME'], 'nni') + +LOG_DIR = os.path.join(HOME_DIR, 'trial-keeper', 'log') + +STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout') + +STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr') + +UPDATE_METRICS_API = '/update-metrics' \ No newline at end of file diff --git a/tools/trial_tool/hdfsClientUtility.py b/tools/trial_tool/hdfsClientUtility.py new file mode 100644 index 0000000000..0b6daeb2c4 --- /dev/null +++ b/tools/trial_tool/hdfsClientUtility.py @@ -0,0 +1,64 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import os +from pyhdfs import HdfsClient + +def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient): + '''Copy directory from local to hdfs''' + if not os.path.exists(localDirectory): + raise Exception('Local Directory does not exist!') + hdfsClient.mkdirs(hdfsDirectory) + result = True + for file in os.listdir(localDirectory): + file_path = os.path.join(localDirectory, file) + if os.path.isdir(file_path): + hdfs_directory = os.path.join(hdfsDirectory, file) + try: + result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient) + except Exception as exception: + print(exception) + result = False + else: + hdfs_file_path = os.path.join(hdfsDirectory, file) + try: + result = result and copyFileToHdfs(file_path, hdfs_file_path, hdfsClient) + except Exception as exception: + print(exception) + result = False + return result + +def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True): + '''Copy a local file to hdfs directory''' + if not os.path.exists(localFilePath): + raise Exception('Local file Path does not exist!') + if os.path.isdir(localFilePath): + raise Exception('localFile should not a directory!') + if hdfsClient.exists(hdfsFilePath): + if override: + hdfsClient.delete(hdfsFilePath) + else: + return False + try: + hdfsClient.copy_from_local(localFilePath, hdfsFilePath) + return True + except Exception as exception: + print(exception) + return False \ No newline at end of file diff --git a/tools/trial_tool/metrics_reader.py b/tools/trial_tool/metrics_reader.py new file mode 100644 index 0000000000..6178e657a3 --- /dev/null +++ b/tools/trial_tool/metrics_reader.py @@ -0,0 +1,124 @@ +# ============================================================================================================================== # +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ============================================================================================================================== # + +import argparse +import errno +import json +import os +import re +import requests + +from .constants import BASE_URL, DEFAULT_REST_PORT +from .rest_utils import rest_get, rest_post, rest_put, rest_delete +from .url_utils import gen_update_metrics_url + +NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] +NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] +NNI_EXP_ID = os.environ['NNI_EXP_ID'] +LEN_FIELD_SIZE = 6 +MAGIC = 'ME' + +print('In metrics_reader, NNI_SYS_DIR is {}'.format(NNI_SYS_DIR)) + +class TrialMetricsReader(): + ''' + Read metrics data from a trial job + ''' + def __init__(self, rest_port = DEFAULT_REST_PORT): + self.offset_filename = os.path.join(NNI_SYS_DIR, '.nni', 'metrics_offset') + self.metrics_filename = os.path.join(NNI_SYS_DIR, '.nni', 'metrics') + self.rest_port = rest_port + + def _metrics_file_is_empty(self): + if not os.path.isfile(self.metrics_filename): + return True + statinfo = os.stat(self.metrics_filename) + return statinfo.st_size == 0 + + def _get_offset(self): + offset = 0 + if os.path.isfile(self.offset_filename): + with open(self.offset_filename, 'r') as f: + offset = int(f.readline()) + return offset + + def _write_offset(self, offset): + statinfo = os.stat(self.metrics_filename) + if offset < 0 or offset > statinfo.st_size: + raise ValueError('offset value is invalid: {}'.format(offset)) + + with open(self.offset_filename, 'w') as f: + f.write(str(offset)+'\n') + + def _read_all_available_records(self, offset): + new_offset = offset + metrics = [] + with open(self.metrics_filename, 'r') as f: + print('offset is {}'.format(offset)) + f.seek(offset) + while True: + magic_string = f.read(len(MAGIC)) + # empty data means EOF + if not magic_string: + break + strdatalen = f.read(LEN_FIELD_SIZE) + # empty data means EOF + if not strdatalen: + raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset)) + datalen = int(strdatalen) + data = f.read(datalen) + + if datalen > 0 and len(data) == datalen: + print('data is \'{}\''.format(data)) + new_offset = f.tell() + metrics.append(data) + else: + raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset)) + self._write_offset(new_offset) + return metrics + + def read_trial_metrics(self): + ''' + Read available metrics data for a trial + ''' + if self._metrics_file_is_empty(): + print('metrics is empty') + return [] + + offset = self._get_offset() + return self._read_all_available_records(offset) + +def read_experiment_metrics(nnimanager_ip): + ''' + Read metrics data for specified trial jobs + ''' + result = {} + try: + reader = TrialMetricsReader() + result['jobId'] = NNI_TRIAL_JOB_ID + result['metrics'] = reader.read_trial_metrics() + print('Result metrics is {}'.format(json.dumps(result))) + if len(result['metrics']) > 0: + response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), DEFAULT_REST_PORT, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10) + print('Response code is {}'.format(response.status_code)) + except Exception: + #TODO error logging to file + pass + + return json.dumps(result) \ No newline at end of file diff --git a/tools/trial_tool/rest_utils.py b/tools/trial_tool/rest_utils.py new file mode 100644 index 0000000000..f506653c4e --- /dev/null +++ b/tools/trial_tool/rest_utils.py @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +import time +import requests + +def rest_get(url, timeout): + '''Call rest get method''' + try: + response = requests.get(url, timeout=timeout) + return response + except Exception: + return None + +def rest_post(url, data, timeout): + '''Call rest post method''' + try: + response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\ + data=data, timeout=timeout) + return response + except Exception: + return None + +def rest_put(url, data, timeout): + '''Call rest put method''' + try: + response = requests.put(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\ + data=data, timeout=timeout) + return response + except Exception: + return None + +def rest_delete(url, timeout): + '''Call rest delete method''' + try: + response = requests.delete(url, timeout=timeout) + return response + except Exception: + return None diff --git a/tools/trial_tool/test/test_hdfsClientUtility.py b/tools/trial_tool/test/test_hdfsClientUtility.py new file mode 100644 index 0000000000..2d7cc6ed4d --- /dev/null +++ b/tools/trial_tool/test/test_hdfsClientUtility.py @@ -0,0 +1,101 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import unittest +import json +import sys +from pyhdfs import HdfsClient +sys.path.append("..") +from trial.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs +import os +import shutil +import random +import string + +class HDFSClientUtilityTest(unittest.TestCase): + '''Unit test for hdfsClientUtility.py''' + def setUp(self): + self.hdfs_file_path = '../../.vscode/hdfsInfo.json' + self.hdfs_config = None + try: + with open(self.hdfs_file_path, 'r') as file: + self.hdfs_config = json.load(file) + except Exception as exception: + print(exception) + + self.hdfs_client = HdfsClient(hosts='{0}:{1}'.format(self.hdfs_config['host'], '50070'), user_name=self.hdfs_config['userName']) + + def get_random_name(self, length): + return ''.join(random.sample(string.ascii_letters + string.digits, length)) + + def test_copy_file_run(self): + '''test copyFileToHdfs''' + file_name = self.get_random_name(8) + file_content = 'hello world!' + + with open('./{}'.format(file_name), 'w') as file: + file.write(file_content) + + result = copyFileToHdfs('./{}'.format(file_name), '/{0}/{1}'.format(self.hdfs_config['userName'], file_name), self.hdfs_client) + self.assertTrue(result) + + file_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName'])) + self.assertIn(file_name, file_list) + + hdfs_file_name = self.get_random_name(8) + self.hdfs_client.copy_to_local('/{0}/{1}'.format(self.hdfs_config['userName'], file_name), './{}'.format(hdfs_file_name)) + self.assertTrue(os.path.exists('./{}'.format(hdfs_file_name))) + + with open('./{}'.format(hdfs_file_name), 'r') as file: + content = file.readline() + self.assertEqual(file_content, content) + #clean up + os.remove('./{}'.format(file_name)) + os.remove('./{}'.format(hdfs_file_name)) + self.hdfs_client.delete('/{0}/{1}'.format(self.hdfs_config['userName'], file_name)) + + def test_copy_directory_run(self): + '''test copyDirectoryToHdfs''' + directory_name = self.get_random_name(8) + file_name_list = [self.get_random_name(8), self.get_random_name(8)] + file_content = 'hello world!' + + os.makedirs('./{}'.format(directory_name)) + for file_name in file_name_list: + with open('./{0}/{1}'.format(directory_name, file_name), 'w') as file: + file.write(file_content) + + result = copyDirectoryToHdfs('./{}'.format(directory_name), '/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client) + self.assertTrue(result) + + directory_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName'])) + self.assertIn(directory_name, directory_list) + + sub_file_list = self.hdfs_client.listdir('/{0}/{1}'.format(self.hdfs_config['userName'], directory_name)) + for file_name in file_name_list: + self.assertIn(file_name, sub_file_list) + #clean up + self.hdfs_client.delete('/{0}/{1}/{2}'.format(self.hdfs_config['userName'], directory_name, file_name)) + self.hdfs_client.delete('/{0}/{1}'.format(self.hdfs_config['userName'], directory_name)) + + shutil.rmtree('./{}'.format(directory_name)) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tools/trial_tool/trial_keeper.py b/tools/trial_tool/trial_keeper.py new file mode 100644 index 0000000000..0b0c7b7689 --- /dev/null +++ b/tools/trial_tool/trial_keeper.py @@ -0,0 +1,94 @@ +# ============================================================================================================================== # +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ============================================================================================================================== # + +import argparse +import sys +import os +from subprocess import Popen, PIPE +import time +import logging +import shlex +import re +from pyhdfs import HdfsClient + +from .hdfsClientUtility import copyDirectoryToHdfs +from .constants import HOME_DIR, LOG_DIR, STDOUT_FULL_PATH, STDERR_FULL_PATH +from .metrics_reader import read_experiment_metrics + +logger = logging.getLogger('trial_keeper') + +def main_loop(args): + '''main loop logic for trial keeper''' + + if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR) + + stdout_file = open(STDOUT_FULL_PATH, 'a+') + stderr_file = open(STDERR_FULL_PATH, 'a+') + print(shlex.split(args.trial_command)) + # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior + process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file) + print('Subprocess pid is {}'.format(process.pid)) + print('Current cwd is {}'.format(os.getcwd())) + while True: + retCode = process.poll() + ## Read experiment metrics, to avoid missing metrics + read_experiment_metrics(args.nnimanager_ip) + + if retCode is not None: + print('subprocess terminated. Exit code is {}. Quit'.format(retCode)) + #copy local directory to hdfs + nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] + hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name) + print(nni_local_output_dir, args.pai_hdfs_output_dir) + try: + if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): + print('copy directory success!') + else: + print('copy directory failed!') + except Exception as exception: + print(exception) + break + else: + print('subprocess pid: {} is still alive'.format(process.pid)) + + time.sleep(2) + +def trial_keeper_help_info(*args): + print('please run --help to see guidance') + +if __name__ == '__main__': + '''NNI Trial Keeper main function''' + PARSER = argparse.ArgumentParser() + PARSER.set_defaults(func=trial_keeper_help_info) + PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') + PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager IP') + PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs') + PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs') + PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') + args, unknown = PARSER.parse_known_args() + if args.trial_command is None: + exit(1) + + try: + main_loop(args) + except: + print('Exiting by user request') + sys.exit(1) + diff --git a/tools/trial_tool/url_utils.py b/tools/trial_tool/url_utils.py new file mode 100644 index 0000000000..69ce14ecb2 --- /dev/null +++ b/tools/trial_tool/url_utils.py @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from .constants import API_ROOT_URL, UPDATE_METRICS_API + +def gen_update_metrics_url(base_url, port, exp_id, trial_job_id): + '''Generate update trial metrics url''' + return '{0}:{1}{2}{3}/{4}/:{5}'.format(base_url, port, API_ROOT_URL, UPDATE_METRICS_API, exp_id, trial_job_id) \ No newline at end of file