Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
[Kubeflow training service] Update kubeflow exp job config schema to …
Browse files Browse the repository at this point in the history
…support distributed training (#387)

* Support distributed training on tf-operator, for worker and ps

* Update validation rule for kubeflow config

* small code refactor adjustment for private methods

* Use different output folder for ps and worker
  • Loading branch information
yds05 authored Nov 22, 2018
1 parent a5d614d commit e341df8
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 112 deletions.
22 changes: 20 additions & 2 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,26 @@ export namespace ValidationSchemas {
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
gpuNum: joi.number().min(0),
command: joi.string().min(1),
worker: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
}),
ps: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
})
}),
pai_config: joi.object({
userName: joi.string().min(1).required(),
Expand Down
35 changes: 32 additions & 3 deletions src/nni_manager/training_service/kubeflow/kubeflowConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,44 @@ export class NFSConfig {
/**
* Trial job configuration for Kubeflow
*/
export class KubeflowTrialConfig extends TrialConfig {
export class KubeflowTrialConfigTemplate {
/** replication number of current role */
public readonly replicas: number;

/** CPU number */
public readonly cpuNum: number;

/** Memory */
public readonly memoryMB: number;

/** Docker image */
public readonly image: string;

/** Trail command */
public readonly command : string;

/** Required GPU number for trial job. The number should be in [0,100] */
public readonly gpuNum : number;

constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string) {
super(command, codeDir, gpuNum);
constructor(replicas: number, command : string, gpuNum : number,
cpuNum: number, memoryMB: number, image: string) {
this.replicas = replicas;
this.command = command;
this.gpuNum = gpuNum;
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
}
}

export class KubeflowTrialConfig {
public readonly codeDir: string;
public readonly ps?: KubeflowTrialConfigTemplate;
public readonly worker: KubeflowTrialConfigTemplate;

constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, ps?: KubeflowTrialConfigTemplate) {
this.codeDir = codeDir;
this.worker = worker;
this.ps = ps;
}
}
2 changes: 1 addition & 1 deletion src/nni_manager/training_service/kubeflow/kubeflowData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh # Check and install NNI pkg
python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR//trialkeeper_stderr
python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
`

export type KubeflowTFJobType = 'Created' | 'Running' | 'Failed' | 'Succeeded';
198 changes: 134 additions & 64 deletions src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import {
TrialJobDetail, TrialJobMetric
} from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils';
import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig';
import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, KubeflowTrialConfigTemplate, NFSConfig } from './kubeflowConfig';
import { KubeflowTrialJobDetail, KUBEFLOW_RUN_SHELL_FORMAT } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';

var yaml = require('node-yaml');

type DistTrainRole = 'worker' | 'ps';

/**
* Training Service implementation for Kubeflow
* Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow
Expand All @@ -64,7 +66,7 @@ class KubeflowTrainingService implements TrainingService {
private kubeflowJobInfoCollector: KubeflowJobInfoCollector;
private kubeflowRestServerPort?: number;
private kubeflowJobPlural?: string;
private readonly CONTAINER_MOUNT_PATH: string;
private readonly CONTAINER_MOUNT_PATH: string;

constructor() {
this.log = getLogger();
Expand Down Expand Up @@ -93,8 +95,8 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow Cluster config is not initialized');
}

if(!this.kubeflowTrialConfig) {
throw new Error('Kubeflow trial config is not initialized');
if(!this.kubeflowTrialConfig || !this.kubeflowTrialConfig.worker) {
throw new Error('Kubeflow trial config or worker config is not initialized');
}

if(!this.kubeflowJobPlural) {
Expand All @@ -119,47 +121,57 @@ class KubeflowTrainingService implements TrainingService {
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });

const kubeflowRunScriptContent: string = String.Format(
KUBEFLOW_RUN_SHELL_FORMAT,
`$PWD/nni/${trialJobId}`,
path.join(trialWorkingFolder, 'output'),
trialJobId,
getExperimentId(),
trialWorkingFolder,
curTrialSequenceId,
this.kubeflowTrialConfig.command,
getIPV4Address(),
this.kubeflowRestServerPort
);

//create tmp trial working folder locally.
// Create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);

// Write file content ( run.sh and parameter.cfg ) to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), kubeflowRunScriptContent, { encoding: 'utf8' });
// Write worker file content run_worker.sh to local tmp folders
if(this.kubeflowTrialConfig.worker) {
const workerRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker');

await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
}

// Write parameter server file content run_ps.sh to local tmp folders
if(this.kubeflowTrialConfig.ps) {
const psRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder,
this.kubeflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps');

await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
}

// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
if(trialForm && trialForm.hyperParameters) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
}
}

const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`);
const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const podResources : any = {};
podResources.requests = {
'memory': `${this.kubeflowTrialConfig.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.gpuNum}`
const workerPodResources : any = {};
workerPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.worker.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.worker.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.worker.gpuNum}`
}

podResources.limits = Object.assign({}, podResources.requests);
workerPodResources.limits = Object.assign({}, workerPodResources.requests);

let psPodResources : any = undefined;
if(this.kubeflowTrialConfig.ps) {
psPodResources = {};
psPodResources.requests = {
'memory': `${this.kubeflowTrialConfig.ps.memoryMB}Mi`,
'cpu': `${this.kubeflowTrialConfig.ps.cpuNum}`,
'nvidia.com/gpu': `${this.kubeflowTrialConfig.ps.gpuNum}`
}
psPodResources.limits = Object.assign({}, psPodResources.requests);
}

// Generate kubeflow job resource yaml file for K8S
yaml.write(
kubeflowJobYamlPath,
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, podResources),
this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, psPodResources),
'utf-8'
);

Expand Down Expand Up @@ -281,6 +293,7 @@ class KubeflowTrainingService implements TrainingService {
}

this.kubeflowTrialConfig = <KubeflowTrialConfig>JSON.parse(value);
assert(this.kubeflowClusterConfig !== undefined && this.kubeflowTrialConfig.worker !== undefined);
break;
default:
break;
Expand Down Expand Up @@ -339,7 +352,15 @@ class KubeflowTrainingService implements TrainingService {
return this.metricsEmitter;
}

private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, podResources : any) : any {
/**
* Generate kubeflow resource config file
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param kubeflowJobName job name
* @param workerPodResources worker pod template
* @param psPodResources ps pod template
*/
private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, psPodResources?: any) : any {
if(!this.kubeflowClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized');
}
Expand All @@ -348,6 +369,15 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}

const tfReplicaSpecsObj: any = {};
tfReplicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.worker.replicas,
this.kubeflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources);

if(this.kubeflowTrialConfig.ps) {
tfReplicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.ps.replicas,
this.kubeflowTrialConfig.ps.image, 'run_ps.sh', psPodResources);
}

return {
apiVersion: 'kubeflow.org/v1alpha2',
kind: 'TFJob',
Expand All @@ -361,44 +391,84 @@ class KubeflowTrainingService implements TrainingService {
}
},
spec: {
tfReplicaSpecs: {
Worker: {
replicas: 1,
template: {
metadata: {
creationTimestamp: null
},
spec: {
containers: [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: 'tensorflow',
image: this.kubeflowTrialConfig.image,
args: ["sh", `${path.join(trialWorkingFolder, 'run.sh')}`],
volumeMounts: [{
name: 'nni-nfs-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources//,
//workingDir: '/tmp/nni/nuDEP'
}],
restartPolicy: 'ExitCode',
volumes: [{
name: 'nni-nfs-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}]
}
}
}
}
tfReplicaSpecs: tfReplicaSpecsObj
}
};
}

/**
* Generate tf-operator's tfjobs replica config section
* @param trialWorkingFolder trial working folder
* @param replicaNumber replica number
* @param replicaImage image
* @param runScriptFile script file name
* @param podResources pod resource config section
*/
private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string, podResources: any): any {
if(!this.kubeflowClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized');
}

if(!this.kubeflowTrialConfig) {
throw new Error('Kubeflow trial config is not initialized');
}

return {
replicas: replicaNumber,
template: {
metadata: {
creationTimestamp: null
},
spec: {
containers: [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: 'tensorflow',
image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [{
name: 'nni-nfs-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources
}],
restartPolicy: 'ExitCode',
volumes: [{
name: 'nni-nfs-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}]
}
}
};
}

/**
* Genereate run script for different roles(like worker or ps)
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param command
* @param trialSequenceId sequence id
*/
private genereateRunScript(trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleType: DistTrainRole): string {
return String.Format(
KUBEFLOW_RUN_SHELL_FORMAT,
`$PWD/nni/${trialJobId}`,
path.join(trialWorkingFolder, `${roleType}_output`),
trialJobId,
getExperimentId(),
trialWorkingFolder,
trialSequenceId,
command,
getIPV4Address(),
this.kubeflowRestServerPort
);
}

private generateSequenceId(): number {
if (this.nextTrialSequenceId === -1) {
this.nextTrialSequenceId = getInitTrialSequenceId();
Expand Down
23 changes: 17 additions & 6 deletions tools/nni_cmd/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,23 @@

kubeflow_trial_schema = {
'trial':{
'command': str,
'codeDir': os.path.exists,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
'codeDir': os.path.exists,
Optional('ps'): {
'replicas': int,
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
},
'worker':{
'replicas': int,
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
}
}
}

Expand Down
Loading

0 comments on commit e341df8

Please sign in to comment.