diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 2a29777dd7..a298869d15 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -39,8 +39,26 @@ export namespace ValidationSchemas { outputDir: joi.string(), cpuNum: joi.number().min(1), memoryMB: joi.number().min(100), - gpuNum: joi.number().min(0).required(), - command: joi.string().min(1).required() + gpuNum: joi.number().min(0), + command: joi.string().min(1), + worker: joi.object({ + replicas: joi.number().min(1).required(), + image: joi.string().min(1), + outputDir: joi.string(), + cpuNum: joi.number().min(1), + memoryMB: joi.number().min(100), + gpuNum: joi.number().min(0).required(), + command: joi.string().min(1).required() + }), + ps: joi.object({ + replicas: joi.number().min(1).required(), + image: joi.string().min(1), + outputDir: joi.string(), + cpuNum: joi.number().min(1), + memoryMB: joi.number().min(100), + gpuNum: joi.number().min(0).required(), + command: joi.string().min(1).required() + }) }), pai_config: joi.object({ userName: joi.string().min(1).required(), diff --git a/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts b/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts index dca552ee11..d24c309b21 100644 --- a/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts +++ b/src/nni_manager/training_service/kubeflow/kubeflowConfig.ts @@ -79,15 +79,44 @@ export class NFSConfig { /** * Trial job configuration for Kubeflow */ -export class KubeflowTrialConfig extends TrialConfig { +export class KubeflowTrialConfigTemplate { + /** replication number of current role */ + public readonly replicas: number; + + /** CPU number */ public readonly cpuNum: number; + + /** Memory */ public readonly memoryMB: number; + + /** Docker image */ public readonly image: string; + + /** Trail command */ + public readonly command : string; + + /** Required GPU number for trial job. The number should be in [0,100] */ + public readonly gpuNum : number; - constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string) { - super(command, codeDir, gpuNum); + constructor(replicas: number, command : string, gpuNum : number, + cpuNum: number, memoryMB: number, image: string) { + this.replicas = replicas; + this.command = command; + this.gpuNum = gpuNum; this.cpuNum = cpuNum; this.memoryMB = memoryMB; this.image = image; } +} + +export class KubeflowTrialConfig { + public readonly codeDir: string; + public readonly ps?: KubeflowTrialConfigTemplate; + public readonly worker: KubeflowTrialConfigTemplate; + + constructor(codeDir: string, worker: KubeflowTrialConfigTemplate, ps?: KubeflowTrialConfigTemplate) { + this.codeDir = codeDir; + this.worker = worker; + this.ps = ps; + } } \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowData.ts b/src/nni_manager/training_service/kubeflow/kubeflowData.ts index f65d0cb603..0dce48732e 100644 --- a/src/nni_manager/training_service/kubeflow/kubeflowData.ts +++ b/src/nni_manager/training_service/kubeflow/kubeflowData.ts @@ -72,7 +72,7 @@ mkdir -p $NNI_OUTPUT_DIR cp -rT $NNI_CODE_DIR $NNI_SYS_DIR cd $NNI_SYS_DIR sh install_nni.sh # Check and install NNI pkg -python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR//trialkeeper_stderr +python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr ` export type KubeflowTFJobType = 'Created' | 'Running' | 'Failed' | 'Succeeded'; \ No newline at end of file diff --git a/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts b/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts index 1f3b30b404..96082fb984 100644 --- a/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts +++ b/src/nni_manager/training_service/kubeflow/kubeflowTrainingService.ts @@ -37,13 +37,15 @@ import { TrialJobDetail, TrialJobMetric } from '../../common/trainingService'; import { delay, generateParamFileName, getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; -import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSConfig } from './kubeflowConfig'; +import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, KubeflowTrialConfigTemplate, NFSConfig } from './kubeflowConfig'; import { KubeflowTrialJobDetail, KUBEFLOW_RUN_SHELL_FORMAT } from './kubeflowData'; import { KubeflowJobRestServer } from './kubeflowJobRestServer'; import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector'; var yaml = require('node-yaml'); +type DistTrainRole = 'worker' | 'ps'; + /** * Training Service implementation for Kubeflow * Refer https://github.com/kubeflow/kubeflow for more info about Kubeflow @@ -64,7 +66,7 @@ class KubeflowTrainingService implements TrainingService { private kubeflowJobInfoCollector: KubeflowJobInfoCollector; private kubeflowRestServerPort?: number; private kubeflowJobPlural?: string; - private readonly CONTAINER_MOUNT_PATH: string; + private readonly CONTAINER_MOUNT_PATH: string; constructor() { this.log = getLogger(); @@ -93,8 +95,8 @@ class KubeflowTrainingService implements TrainingService { throw new Error('Kubeflow Cluster config is not initialized'); } - if(!this.kubeflowTrialConfig) { - throw new Error('Kubeflow trial config is not initialized'); + if(!this.kubeflowTrialConfig || !this.kubeflowTrialConfig.worker) { + throw new Error('Kubeflow trial config or worker config is not initialized'); } if(!this.kubeflowJobPlural) { @@ -119,47 +121,57 @@ class KubeflowTrainingService implements TrainingService { // Write NNI installation file to local tmp files await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); - const kubeflowRunScriptContent: string = String.Format( - KUBEFLOW_RUN_SHELL_FORMAT, - `$PWD/nni/${trialJobId}`, - path.join(trialWorkingFolder, 'output'), - trialJobId, - getExperimentId(), - trialWorkingFolder, - curTrialSequenceId, - this.kubeflowTrialConfig.command, - getIPV4Address(), - this.kubeflowRestServerPort - ); - - //create tmp trial working folder locally. + // Create tmp trial working folder locally. await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); - // Write file content ( run.sh and parameter.cfg ) to local tmp files - await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), kubeflowRunScriptContent, { encoding: 'utf8' }); + // Write worker file content run_worker.sh to local tmp folders + if(this.kubeflowTrialConfig.worker) { + const workerRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder, + this.kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker'); + + await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' }); + } + + // Write parameter server file content run_ps.sh to local tmp folders + if(this.kubeflowTrialConfig.ps) { + const psRunScriptContent: string = this.genereateRunScript(trialJobId, trialWorkingFolder, + this.kubeflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps'); + + await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' }); + } // Write file content ( parameter.cfg ) to local tmp folders const trialForm : TrialJobApplicationForm = (form) if(trialForm && trialForm.hyperParameters) { await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)), trialForm.hyperParameters.value, { encoding: 'utf8' }); - } + } const kubeflowJobYamlPath = path.join(trialLocalTempFolder, `kubeflow-job-${trialJobId}.yaml`); const kubeflowJobName = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); - const podResources : any = {}; - podResources.requests = { - 'memory': `${this.kubeflowTrialConfig.memoryMB}Mi`, - 'cpu': `${this.kubeflowTrialConfig.cpuNum}`, - 'nvidia.com/gpu': `${this.kubeflowTrialConfig.gpuNum}` + const workerPodResources : any = {}; + workerPodResources.requests = { + 'memory': `${this.kubeflowTrialConfig.worker.memoryMB}Mi`, + 'cpu': `${this.kubeflowTrialConfig.worker.cpuNum}`, + 'nvidia.com/gpu': `${this.kubeflowTrialConfig.worker.gpuNum}` } - - podResources.limits = Object.assign({}, podResources.requests); + workerPodResources.limits = Object.assign({}, workerPodResources.requests); + + let psPodResources : any = undefined; + if(this.kubeflowTrialConfig.ps) { + psPodResources = {}; + psPodResources.requests = { + 'memory': `${this.kubeflowTrialConfig.ps.memoryMB}Mi`, + 'cpu': `${this.kubeflowTrialConfig.ps.cpuNum}`, + 'nvidia.com/gpu': `${this.kubeflowTrialConfig.ps.gpuNum}` + } + psPodResources.limits = Object.assign({}, psPodResources.requests); + } // Generate kubeflow job resource yaml file for K8S yaml.write( kubeflowJobYamlPath, - this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, podResources), + this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources, psPodResources), 'utf-8' ); @@ -281,6 +293,7 @@ class KubeflowTrainingService implements TrainingService { } this.kubeflowTrialConfig = JSON.parse(value); + assert(this.kubeflowClusterConfig !== undefined && this.kubeflowTrialConfig.worker !== undefined); break; default: break; @@ -339,7 +352,15 @@ class KubeflowTrainingService implements TrainingService { return this.metricsEmitter; } - private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, podResources : any) : any { + /** + * Generate kubeflow resource config file + * @param trialJobId trial job id + * @param trialWorkingFolder working folder + * @param kubeflowJobName job name + * @param workerPodResources worker pod template + * @param psPodResources ps pod template + */ + private generateKubeflowJobConfig(trialJobId: string, trialWorkingFolder: string, kubeflowJobName : string, workerPodResources : any, psPodResources?: any) : any { if(!this.kubeflowClusterConfig) { throw new Error('Kubeflow Cluster config is not initialized'); } @@ -348,6 +369,15 @@ class KubeflowTrainingService implements TrainingService { throw new Error('Kubeflow trial config is not initialized'); } + const tfReplicaSpecsObj: any = {}; + tfReplicaSpecsObj.Worker = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.worker.replicas, + this.kubeflowTrialConfig.worker.image, 'run_worker.sh', workerPodResources); + + if(this.kubeflowTrialConfig.ps) { + tfReplicaSpecsObj.Ps = this.generateReplicaConfig(trialWorkingFolder, this.kubeflowTrialConfig.ps.replicas, + this.kubeflowTrialConfig.ps.image, 'run_ps.sh', psPodResources); + } + return { apiVersion: 'kubeflow.org/v1alpha2', kind: 'TFJob', @@ -361,44 +391,84 @@ class KubeflowTrainingService implements TrainingService { } }, spec: { - tfReplicaSpecs: { - Worker: { - replicas: 1, - template: { - metadata: { - creationTimestamp: null - }, - spec: { - containers: [ - { - // Kubeflow tensorflow operator requires that containers' name must be tensorflow - // TODO: change the name based on operator's type - name: 'tensorflow', - image: this.kubeflowTrialConfig.image, - args: ["sh", `${path.join(trialWorkingFolder, 'run.sh')}`], - volumeMounts: [{ - name: 'nni-nfs-vol', - mountPath: this.CONTAINER_MOUNT_PATH - }], - resources: podResources//, - //workingDir: '/tmp/nni/nuDEP' - }], - restartPolicy: 'ExitCode', - volumes: [{ - name: 'nni-nfs-vol', - nfs: { - server: `${this.kubeflowClusterConfig.nfs.server}`, - path: `${this.kubeflowClusterConfig.nfs.path}` - } - }] - } - } - } - } + tfReplicaSpecs: tfReplicaSpecsObj } }; } + /** + * Generate tf-operator's tfjobs replica config section + * @param trialWorkingFolder trial working folder + * @param replicaNumber replica number + * @param replicaImage image + * @param runScriptFile script file name + * @param podResources pod resource config section + */ + private generateReplicaConfig(trialWorkingFolder: string, replicaNumber: number, replicaImage: string, runScriptFile: string, podResources: any): any { + if(!this.kubeflowClusterConfig) { + throw new Error('Kubeflow Cluster config is not initialized'); + } + + if(!this.kubeflowTrialConfig) { + throw new Error('Kubeflow trial config is not initialized'); + } + + return { + replicas: replicaNumber, + template: { + metadata: { + creationTimestamp: null + }, + spec: { + containers: [ + { + // Kubeflow tensorflow operator requires that containers' name must be tensorflow + // TODO: change the name based on operator's type + name: 'tensorflow', + image: replicaImage, + args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`], + volumeMounts: [{ + name: 'nni-nfs-vol', + mountPath: this.CONTAINER_MOUNT_PATH + }], + resources: podResources + }], + restartPolicy: 'ExitCode', + volumes: [{ + name: 'nni-nfs-vol', + nfs: { + server: `${this.kubeflowClusterConfig.nfs.server}`, + path: `${this.kubeflowClusterConfig.nfs.path}` + } + }] + } + } + }; + } + + /** + * Genereate run script for different roles(like worker or ps) + * @param trialJobId trial job id + * @param trialWorkingFolder working folder + * @param command + * @param trialSequenceId sequence id + */ + private genereateRunScript(trialJobId: string, trialWorkingFolder: string, + command: string, trialSequenceId: string, roleType: DistTrainRole): string { + return String.Format( + KUBEFLOW_RUN_SHELL_FORMAT, + `$PWD/nni/${trialJobId}`, + path.join(trialWorkingFolder, `${roleType}_output`), + trialJobId, + getExperimentId(), + trialWorkingFolder, + trialSequenceId, + command, + getIPV4Address(), + this.kubeflowRestServerPort + ); + } + private generateSequenceId(): number { if (this.nextTrialSequenceId === -1) { this.nextTrialSequenceId = getInitTrialSequenceId(); diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index c1cdfe688e..c8de08010d 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -92,12 +92,23 @@ kubeflow_trial_schema = { 'trial':{ - 'command': str, - 'codeDir': os.path.exists, - 'gpuNum': And(int, lambda x: 0 <= x <= 99999), - 'cpuNum': And(int, lambda x: 0 <= x <= 99999), - 'memoryMB': int, - 'image': str + 'codeDir': os.path.exists, + Optional('ps'): { + 'replicas': int, + 'command': str, + 'gpuNum': And(int, lambda x: 0 <= x <= 99999), + 'cpuNum': And(int, lambda x: 0 <= x <= 99999), + 'memoryMB': int, + 'image': str + }, + 'worker':{ + 'replicas': int, + 'command': str, + 'gpuNum': And(int, lambda x: 0 <= x <= 99999), + 'cpuNum': And(int, lambda x: 0 <= x <= 99999), + 'memoryMB': int, + 'image': str + } } } diff --git a/tools/nni_cmd/launcher.py b/tools/nni_cmd/launcher.py index 2b3382495d..5ea6c33d37 100644 --- a/tools/nni_cmd/launcher.py +++ b/tools/nni_cmd/launcher.py @@ -99,21 +99,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None def set_trial_config(experiment_config, port, config_file_name): '''set trial configuration''' request_data = dict() - value_dict = dict() - value_dict['command'] = experiment_config['trial']['command'] - value_dict['codeDir'] = experiment_config['trial']['codeDir'] - value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] - if experiment_config['trial'].get('cpuNum'): - value_dict['cpuNum'] = experiment_config['trial']['cpuNum'] - if experiment_config['trial'].get('memoryMB'): - value_dict['memoryMB'] = experiment_config['trial']['memoryMB'] - if experiment_config['trial'].get('image'): - value_dict['image'] = experiment_config['trial']['image'] - if experiment_config['trial'].get('dataDir'): - value_dict['dataDir'] = experiment_config['trial']['dataDir'] - if experiment_config['trial'].get('outputDir'): - value_dict['outputDir'] = experiment_config['trial']['outputDir'] - request_data['trial_config'] = value_dict + request_data['trial_config'] = experiment_config['trial'] response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 20) if check_response(response): return True @@ -211,31 +197,18 @@ def set_experiment(experiment_config, mode, port, config_file_name): elif experiment_config['trainingServicePlatform'] == 'remote': request_data['clusterMetaData'].append( {'key': 'machine_list', 'value': experiment_config['machineList']}) - value_dict = dict() - value_dict['command'] = experiment_config['trial']['command'] - value_dict['codeDir'] = experiment_config['trial']['codeDir'] - value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] request_data['clusterMetaData'].append( - {'key': 'trial_config', 'value': value_dict}) + {'key': 'trial_config', 'value': experiment_config['trial']}) elif experiment_config['trainingServicePlatform'] == 'pai': request_data['clusterMetaData'].append( - {'key': 'pai_config', 'value': experiment_config['paiConfig']}) - value_dict = dict() - value_dict['command'] = experiment_config['trial']['command'] - value_dict['codeDir'] = experiment_config['trial']['codeDir'] - value_dict['gpuNum'] = experiment_config['trial']['gpuNum'] - if experiment_config['trial'].get('cpuNum'): - value_dict['cpuNum'] = experiment_config['trial']['cpuNum'] - if experiment_config['trial'].get('memoryMB'): - value_dict['memoryMB'] = experiment_config['trial']['memoryMB'] - if experiment_config['trial'].get('image'): - value_dict['image'] = experiment_config['trial']['image'] - if experiment_config['trial'].get('dataDir'): - value_dict['dataDir'] = experiment_config['trial']['dataDir'] - if experiment_config['trial'].get('outputDir'): - value_dict['outputDir'] = experiment_config['trial']['outputDir'] + {'key': 'pai_config', 'value': experiment_config['paiConfig']}) request_data['clusterMetaData'].append( - {'key': 'trial_config', 'value': value_dict}) + {'key': 'trial_config', 'value': experiment_config['trial']}) + elif experiment_config['trainingServicePlatform'] == 'kubeflow': + request_data['clusterMetaData'].append( + {'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']}) + request_data['clusterMetaData'].append( + {'key': 'trial_config', 'value': experiment_config['trial']}) response = rest_post(experiment_url(port), json.dumps(request_data), 20) if check_response(response):