Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
PAI Training Service implementation (#128)
Browse files Browse the repository at this point in the history
* PAI Training service implementation
**1. Implement PAITrainingService
**2. Add trial-keeper python module, and modify setup.py to install the module
**3. Add PAItrainingService rest server to collect metrics from PAI container.
  • Loading branch information
yds05 authored Sep 27, 2018
1 parent cc5372e commit d3506e3
Show file tree
Hide file tree
Showing 35 changed files with 2,104 additions and 98 deletions.
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ build:
#$(_INFO) Building nnictl $(_END)
cd tools && python3 setup.py build


# Standard installation target
# Must be invoked after building
.PHONY: install
Expand Down Expand Up @@ -207,7 +206,6 @@ install-python-modules:
#$(_INFO) Installing nnictl $(_END)
cd tools && python3 setup.py install $(PIP_MODE)


.PHONY: install-node-modules
install-node-modules:
mkdir -p $(INSTALL_PREFIX)/nni
Expand All @@ -227,7 +225,7 @@ install-dev-modules:

#$(_INFO) Installing nnictl $(_END)
cd tools && $(PIP_INSTALL) $(PIP_MODE) -e .

mkdir -p $(INSTALL_PREFIX)/nni

#$(_INFO) Installing NNI Manager $(_END)
Expand Down
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def run(self):

setup(
name = 'NNI',
version = '0.1.0',
version = '0.2.0',
author = 'Microsoft NNI Team',
author_email = 'nni@microsoft.com',
description = 'Neural Network Intelligence project',
Expand All @@ -47,7 +47,8 @@ def run(self):
package_dir = {
'nni_annotation': 'tools/nni_annotation',
'nni': 'src/sdk/pynni/nni',
'nnicmd': 'tools/nnicmd'
'nnicmd': 'tools/nnicmd',
'trial_tool':'tools/trial_tool'
},
python_requires = '>=3.5',
install_requires = [
Expand All @@ -59,7 +60,8 @@ def run(self):
'pyyaml',
'requests',
'scipy',
'schema'
'schema',
'pyhdfs'
],

cmdclass={
Expand Down
18 changes: 16 additions & 2 deletions src/nni_manager/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,19 @@ function cleanupUnitTest(): void {
Container.restore(ExperimentStartupInfo);
}

export { getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getDefaultDatabaseDir, mkDirP, delay, prepareUnitTest,
parseArg, cleanupUnitTest, uniqueString, randomSelect };
/**
* Get IPv4 address of current machine
*/
function getIPV4Address(): string {
let ipv4Address : string = '';

for(const item of os.networkInterfaces().eth0) {
if(item.family === 'IPv4') {
ipv4Address = item.address;
}
}
return ipv4Address;
}

export { getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getDefaultDatabaseDir, getIPV4Address,
mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect };
7 changes: 5 additions & 2 deletions src/nni_manager/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import { LocalTrainingServiceForGPU } from './training_service/local/localTraini
import {
RemoteMachineTrainingService
} from './training_service/remote_machine/remoteMachineTrainingService';
import { PAITrainingService } from './training_service/pai/paiTrainingService'


function initStartupInfo(startExpMode: string, resumeExperimentId: string) {
Expand All @@ -49,6 +50,8 @@ async function initContainer(platformMode: string): Promise<void> {
Container.bind(TrainingService).to(LocalTrainingServiceForGPU).scope(Scope.Singleton);
} else if (platformMode === 'remote') {
Container.bind(TrainingService).to(RemoteMachineTrainingService).scope(Scope.Singleton);
} else if (platformMode === 'pai'){
Container.bind(TrainingService).to(PAITrainingService).scope(Scope.Singleton);
} else {
throw new Error(`Error: unsupported mode: ${mode}`);
}
Expand All @@ -61,7 +64,7 @@ async function initContainer(platformMode: string): Promise<void> {
}

function usage(): void {
console.info('usage: node main.js --port <port> --mode <local/remote> --start_mode <new/resume> --experiment_id <id>');
console.info('usage: node main.js --port <port> --mode <local/remote/pai> --start_mode <new/resume> --experiment_id <id>');
}

let port: number = NNIRestServer.DEFAULT_PORT;
Expand All @@ -71,7 +74,7 @@ if (strPort && strPort.length > 0) {
}

const mode: string = parseArg(['--mode', '-m']);
if (!['local', 'remote'].includes(mode)) {
if (!['local', 'remote', 'pai'].includes(mode)) {
usage();
process.exit(1);
}
Expand Down
4 changes: 3 additions & 1 deletion src/nni_manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"tree-kill": "^1.2.0",
"ts-deferred": "^1.0.4",
"typescript-ioc": "^1.2.4",
"typescript-string-operations": "^1.3.1"
"typescript-string-operations": "^1.3.1",
"webhdfs":"^1.2.0"
},
"devDependencies": {
"@types/chai": "^4.1.4",
Expand All @@ -40,6 +41,7 @@
"chai": "^4.1.2",
"mocha": "^5.2.0",
"request": "^2.87.0",
"rmdir": "^1.2.0",
"tmp": "^0.0.33",
"ts-node": "^7.0.0",
"tslint": "^5.11.0",
Expand Down
14 changes: 12 additions & 2 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,19 @@ export namespace ValidationSchemas {
passphrase: joi.string()
})),
trial_config: joi.object({
gpuNum: joi.number().min(0).required(),
image: joi.string().min(1),
codeDir: joi.string().min(1).required(),
command: joi.string().min(1).required()
dataDir: joi.string(),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
}),
pai_config: joi.object({
userName: joi.string().min(1).required(),
passWord: joi.string().min(1).required(),
host: joi.string().min(1).required()
})
}
};
Expand Down
37 changes: 37 additions & 0 deletions src/nni_manager/training_service/common/jobMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

'use strict';

import { TrialJobStatus } from '../../common/trainingService';

// tslint:disable-next-line:max-classes-per-file
export class JobMetrics {
public readonly jobId: string;
public readonly metrics: string[];
public readonly jobStatus: TrialJobStatus;
public readonly endTimestamp: number;

constructor(jobId : string, metrics : string[], jobStatus : TrialJobStatus, endTimestamp : number) {
this.jobId = jobId;
this.metrics = metrics;
this.jobStatus = jobStatus;
this.endTimestamp = endTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export enum TrialConfigMetadataKey {
MACHINE_LIST = 'machine_list',
TRIAL_CONFIG = 'trial_config',
EXPERIMENT_ID = 'experimentId',
RANDOM_SCHEDULER = 'random_scheduler'
RANDOM_SCHEDULER = 'random_scheduler',
PAI_CLUSTER_CONFIG = 'pai_config'
}
200 changes: 200 additions & 0 deletions src/nni_manager/training_service/pai/hdfsClientUtility.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

import * as path from 'path';
import * as fs from 'fs';
import { Deferred } from 'ts-deferred';
import { getLogger } from '../../common/log';

/**
* HDFS client utility, including copy file/directory
*/
export namespace HDFSClientUtility {
/**
* Copy a local file to hdfs directory
*
* @param localFilePath local file path(source)
* @param hdfsFilePath hdfs file path(target)
* @param hdfsClient hdfs client
*/
export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
fs.exists(localFilePath, (exists : boolean) => {
// Detect if local file exist
if (exists) {
var localFileStream = fs.createReadStream(localFilePath);
var hdfsFileStream = hdfsClient.createWriteStream(hdfsFilePath);
localFileStream.pipe(hdfsFileStream);
hdfsFileStream.on('finish', function onFinish () {
deferred.resolve();
});
hdfsFileStream.on('error', (err : any) => {
getLogger().error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
deferred.reject(err);
});
} else {
getLogger().error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
deferred.reject('file not exist!');
}
});
return deferred.promise;
}

/**
* Recursively copy local directory to hdfs directory
*
* @param localDirectory local directory
* @param hdfsDirectory HDFS directory
* @param hdfsClient HDFS client
*/
export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
// TODO: fs.readdirSync doesn't support ~($HOME)
const fileNameArray: string[] = fs.readdirSync(localDirectory);

for(var fileName of fileNameArray){
const fullFilePath: string = path.join(localDirectory, fileName);
try {
if (fs.lstatSync(fullFilePath).isFile()) {
await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
} else {
// If filePath is a directory, recuisively copy it to remote directory
await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
}
} catch(error) {
deferred.reject(error);
}
}
// All files/directories are copied successfully, resolve
deferred.resolve();

return deferred.promise;
}

/**
* Read content from HDFS file
*
* @param hdfsPath HDFS file path
* @param hdfsClient HDFS client
*/
export async function readFileFromHDFS(hdfsPath : string, hdfsClient :any) : Promise<Buffer> {
const deferred: Deferred<Buffer> = new Deferred<Buffer>();
let buffer : Buffer = Buffer.alloc(0);

const exist : boolean = await pathExists(hdfsPath, hdfsClient);
if(!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}

const remoteFileStream = hdfsClient.createReadStream(hdfsPath);
remoteFileStream.on('error', (err : any) => {
// Reject with the error
deferred.reject(err);
});

remoteFileStream.on('data', (chunk : any) => {
// Concat the data chunk to buffer
buffer = Buffer.concat([buffer, chunk]);
});

remoteFileStream.on('finish', function onFinish () {
// Upload is done, resolve
deferred.resolve(buffer);
});

return deferred.promise;
}

/**
* Check if an HDFS path already exists
*
* @param hdfsPath target path need to check in HDFS
* @param hdfsClient HDFS client
*/
export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.exists(hdfsPath, (exist : boolean ) => {
deferred.resolve(exist);
})

return deferred.promise;
}

/**
* Mkdir in HDFS, use default permission 755
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
*/
export function mkdir(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();

hdfsClient.mkdir(hdfsPath, (err : any)=> {
if(!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
}
});

return deferred.promise;
}

/**
* Read directory contents
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
*/
export async function readdir(hdfsPath : string, hdfsClient : any) : Promise<string[]> {
const deferred : Deferred<string[]> = new Deferred<string[]>();
const exist : boolean = await pathExists(hdfsPath, hdfsClient);
if(!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}

hdfsClient.readdir(hdfsPath, (err : any, files : any[] ) => {
if(err) {
deferred.reject(err);
}

deferred.resolve(files);
});

return deferred.promise;
}

/**
* Delete HDFS path
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
* @param recursive Mark if need to delete recursively
*/
export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.unlink(hdfsPath, recursive, (err : any)=> {
if(!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
}
});
return deferred.promise;
}
}
Loading

0 comments on commit d3506e3

Please sign in to comment.