From 39085789cb7dcdd75e3ac2dbb9daadab02909269 Mon Sep 17 00:00:00 2001 From: chicm-ms <38930155+chicm-ms@users.noreply.github.com> Date: Mon, 8 Oct 2018 19:21:51 +0800 Subject: [PATCH] Multi-phase training service (#148) * Dev enas - multi-phase hyper parameters support (#96) * Multi-phase support * Updates * Updates * updates * updates * updates * Merge master to dev-enas (#117) * Multi-phase support * update document (#92) * Edit readme.md * updated a word * Update GetStarted.md * Update GetStarted.md * refact readme, getstarted and write your trial md. * Update README.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Fix nnictl bugs and add new feature (#75) * fix nnictl bug * fix nnictl create bug * add experiment status logic * add more information for nnictl * fix Evolution Tuner bug * refactor code * fix code in updater.py * fix nnictl --help * fix classArgs bug * update check response.status_code logic * Updates * remove Buffer warning (#100) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * Updates * updates * updates * updates * Add support for debugging mode * fix setup.py (#115) * Add DAG model configuration format for SQuAD example. * Explain config format for SQuAD QA model. * Add more detailed introduction about the evolution algorithm. * Merge master to dev-enas (#118) * update document (#92) * Edit readme.md * updated a word * Update GetStarted.md * Update GetStarted.md * refact readme, getstarted and write your trial md. * Update README.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Update WriteYourTrial.md * Fix nnictl bugs and add new feature (#75) * fix nnictl bug * fix nnictl create bug * add experiment status logic * add more information for nnictl * fix Evolution Tuner bug * refactor code * fix code in updater.py * fix nnictl --help * fix classArgs bug * update check response.status_code logic * remove Buffer warning (#100) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * Add support for debugging mode * fix setup.py (#115) * Add DAG model configuration format for SQuAD example. * Explain config format for SQuAD QA model. * Add more detailed introduction about the evolution algorithm. * Fix install.sh add add trial log path (#109) * fix nnictl bug * fix nnictl create bug * add experiment status logic * add more information for nnictl * fix Evolution Tuner bug * refactor code * fix code in updater.py * fix nnictl --help * fix classArgs bug * update check response.status_code logic * show trial log path * update document * fix install.sh * set default vallue for maxTrialNum and maxExecDuration * fix nnictl * support multiPhase (#127) * fix nnictl bug * support multiPhase * Fix multiphase datastore problem (#125) * Fix multiphase datastore problem * updates * updates * updates * updates * Pull latest code (#2) * webui logpath and document (#135) * Add webui document and logpath as a href * fix tslint * fix comments by Chengmin * Pai training service bug fix and enhancement (#136) * Add NNI installation scripts * Update pai script, update NNI_out_dir * Update NNI dir in nni sdk local.py * Create .nni folder in nni sdk local.py * Add check before creating .nni folder * Fix typo for PAI_INSTALL_NNI_SHELL_FORMAT * Improve annotation (#138) * Improve annotation * Minor bugfix * Selectively install through pip (#139) Selectively install through pip * update setup.py * fix paiTrainingService bugs (#137) * fix nnictl bug * add hdfs host validation * fix bugs * fix dockerfile * fix install.sh * update install.sh * fix dockerfile * Set timeout for HDFSUtility exists function * remove unused TODO * fix sdk * add optional for outputDir and dataDir * refactor dockerfile.base * Remove unused import in hdfsclientUtility * Add documentation for NNI PAI mode experiment (#141) * Add documentation for NNI PAI mode * Fix typo based on PR comments * Exit with subprocess return code of trial keeper * Remove additional exit code * Fix typo based on PR comments * update doc for smac tuner (#140) * Revert "Selectively install through pip (#139)" due to potential pip install issue (#142) * Revert "Selectively install through pip (#139)" This reverts commit 1d174836d3146a0363e9c9c88094bf9cff865faa. * Add exit code of subprocess for trial_keeper * Update README, add link to PAImode doc * fix bug (#147) * Refactor nnictl and add config_pai.yml (#144) * fix nnictl bug * add hdfs host validation * fix bugs * fix dockerfile * fix install.sh * update install.sh * fix dockerfile * Set timeout for HDFSUtility exists function * remove unused TODO * fix sdk * add optional for outputDir and dataDir * refactor dockerfile.base * Remove unused import in hdfsclientUtility * add config_pai.yml * refactor nnictl create logic and add colorful print * fix nnictl stop logic * add annotation for config_pai.yml * add document for start experiment * fix config.yml * fix document * Fix trial keeper wrongly exit issue (#152) * Fix trial keeper bug, use actual exitcode to exit rather than 1 * Fix bug of table sort (#145) * Update doc for PAIMode and v0.2 release notes (#153) * Update v0.2 documentation regards to release note and PAI training service * Update document to describe NNI docker image * Bug fix for SQuAD example tuner. (#134) * Update Makefile (#151) * test * update setup.py * update Makefile and install.sh * rever setup.py * change color * update doc * update doc * fix auto-completion's extra space * update Makefile * update webui * Update doc image (#163) * update doc * trivial * trivial * trivial * trivial * trivial * trivial * update image * update image size * Update ga squad (#104) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * update readme * sklearn examples (#169) * fix nnictl bug * fix install.sh * add sklearn-regression example * add sklearn classification * update sklearn * update example * remove additional code * Update batch tuner (#158) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * update readme * update batch tuner * Quickly fix cascading search space bug in tuner (#156) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * update readme * quickly fix cascading searchspace bug in tuner * Add iterative search space example (#119) * update readme in ga_squad * update readme * fix typo * Update README.md * Update README.md * Update README.md * update readme * add iterative search space example * update * update readme * change name * updates * updates * Updates CI * updates --- .travis.yml | 2 +- src/nni_manager/common/datastore.ts | 6 +- src/nni_manager/common/manager.ts | 1 + src/nni_manager/common/trainingService.ts | 9 +- src/nni_manager/common/utils.ts | 5 +- src/nni_manager/core/commands.ts | 5 +- src/nni_manager/core/nniDataStore.ts | 62 +++++- src/nni_manager/core/nnimanager.ts | 27 ++- .../rest_server/restValidationSchemas.ts | 1 + .../local/localTrainingService.ts | 27 ++- .../remoteMachineTrainingService.ts | 44 ++++- .../test/remoteMachineTrainingService.test.ts | 10 +- src/sdk/pynni/nni/__main__.py | 8 +- src/sdk/pynni/nni/multi_phase/__init__.py | 0 .../nni/multi_phase/multi_phase_dispatcher.py | 178 ++++++++++++++++++ .../nni/multi_phase/multi_phase_tuner.py | 87 +++++++++ src/sdk/pynni/nni/platform/local.py | 29 ++- src/sdk/pynni/nni/protocol.py | 3 +- src/sdk/pynni/nni/trial.py | 6 +- src/sdk/pynni/tests/test_multi_phase_tuner.py | 89 +++++++++ test/naive/run.py | 2 +- tools/nnicmd/config_schema.py | 1 + tools/nnicmd/launcher.py | 4 + 23 files changed, 559 insertions(+), 47 deletions(-) create mode 100644 src/sdk/pynni/nni/multi_phase/__init__.py create mode 100644 src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py create mode 100644 src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py create mode 100644 src/sdk/pynni/tests/test_multi_phase_tuner.py diff --git a/.travis.yml b/.travis.yml index a9f4a1d734..04052b8b32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ before_install: - sudo sh -c 'PATH=/usr/local/node/bin:$PATH yarn global add serve' install: - make - - make install + - make easy-install - export PATH=$HOME/.nni/bin:$PATH before_script: - cd test/naive diff --git a/src/nni_manager/common/datastore.ts b/src/nni_manager/common/datastore.ts index b86b0a95fe..d3cca0479b 100644 --- a/src/nni_manager/common/datastore.ts +++ b/src/nni_manager/common/datastore.ts @@ -22,8 +22,8 @@ import { ExperimentProfile, TrialJobStatistics } from './manager'; import { TrialJobDetail, TrialJobStatus } from './trainingService'; -type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED'; -type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM'; +type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER'; +type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM' | 'REQUEST_PARAMETER'; interface ExperimentProfileRecord { readonly timestamp: number; @@ -62,7 +62,7 @@ interface TrialJobInfo { status: TrialJobStatus; startTime?: number; endTime?: number; - hyperParameters?: string; + hyperParameters?: string[]; logPath?: string; finalMetricData?: string; stderrPath?: string; diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index 10fb9a4227..1d02a1775d 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -31,6 +31,7 @@ interface ExperimentParams { maxExecDuration: number; //seconds maxTrialNum: number; searchSpace: string; + multiPhase?: boolean; tuner: { className: string; builtinTunerName?: string; diff --git a/src/nni_manager/common/trainingService.ts b/src/nni_manager/common/trainingService.ts index 0b8708394c..7bcc575c34 100644 --- a/src/nni_manager/common/trainingService.ts +++ b/src/nni_manager/common/trainingService.ts @@ -37,11 +37,16 @@ interface JobApplicationForm { readonly jobType: JobType; } +interface HyperParameters { + readonly value: string; + readonly index: number; +} + /** * define TrialJobApplicationForm */ interface TrialJobApplicationForm extends JobApplicationForm { - readonly hyperParameters: string; + readonly hyperParameters: HyperParameters; } /** @@ -116,6 +121,6 @@ abstract class TrainingService { export { TrainingService, TrainingServiceError, TrialJobStatus, TrialJobApplicationForm, - TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, + TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, HyperParameters, HostJobApplicationForm, JobApplicationForm, JobType }; diff --git a/src/nni_manager/common/utils.ts b/src/nni_manager/common/utils.ts index e83e40e919..20609598a0 100644 --- a/src/nni_manager/common/utils.ts +++ b/src/nni_manager/common/utils.ts @@ -158,8 +158,11 @@ function parseArg(names: string[]): string { * @param assessor: similiar as tuner * */ -function getMsgDispatcherCommand(tuner: any, assessor: any): string { +function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean = false): string { let command: string = `python3 -m nni --tuner_class_name ${tuner.className}`; + if (multiPhase) { + command += ' --multi_phase'; + } if (tuner.classArgs !== undefined) { command += ` --tuner_args ${JSON.stringify(JSON.stringify(tuner.classArgs))}`; diff --git a/src/nni_manager/core/commands.ts b/src/nni_manager/core/commands.ts index 37bba31d9e..19204b2f31 100644 --- a/src/nni_manager/core/commands.ts +++ b/src/nni_manager/core/commands.ts @@ -27,6 +27,7 @@ const TRIAL_END = 'EN'; const TERMINATE = 'TE'; const NEW_TRIAL_JOB = 'TR'; +const SEND_TRIAL_JOB_PARAMETER = 'SP'; const NO_MORE_TRIAL_JOBS = 'NO'; const KILL_TRIAL_JOB = 'KI'; @@ -39,6 +40,7 @@ const TUNER_COMMANDS: Set = new Set([ TERMINATE, NEW_TRIAL_JOB, + SEND_TRIAL_JOB_PARAMETER, NO_MORE_TRIAL_JOBS ]); @@ -63,5 +65,6 @@ export { NO_MORE_TRIAL_JOBS, KILL_TRIAL_JOB, TUNER_COMMANDS, - ASSESSOR_COMMANDS + ASSESSOR_COMMANDS, + SEND_TRIAL_JOB_PARAMETER }; diff --git a/src/nni_manager/core/nniDataStore.ts b/src/nni_manager/core/nniDataStore.ts index 790a5680c0..81afeb3af3 100644 --- a/src/nni_manager/core/nniDataStore.ts +++ b/src/nni_manager/core/nniDataStore.ts @@ -26,6 +26,7 @@ import * as component from '../common/component'; import { Database, DataStore, MetricData, MetricDataRecord, MetricType, TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore'; import { isNewExperiment } from '../common/experimentStartupInfo'; +import { getExperimentId } from '../common/experimentStartupInfo'; import { getLogger, Logger } from '../common/log'; import { ExperimentProfile, TrialJobStatistics } from '../common/manager'; import { TrialJobStatus } from '../common/trainingService'; @@ -35,6 +36,7 @@ class NNIDataStore implements DataStore { private db: Database = component.get(Database); private log: Logger = getLogger(); private initTask!: Deferred; + private multiPhase: boolean | undefined; public init(): Promise { if (this.initTask !== undefined) { @@ -112,13 +114,19 @@ class NNIDataStore implements DataStore { } public async getTrialJob(trialJobId: string): Promise { - const trialJobs = await this.queryTrialJobs(undefined, trialJobId); + const trialJobs: TrialJobInfo[] = await this.queryTrialJobs(undefined, trialJobId); return trialJobs[0]; } public async storeMetricData(trialJobId: string, data: string): Promise { - const metrics = JSON.parse(data) as MetricData; + const metrics: MetricData = JSON.parse(data); + // REQUEST_PARAMETER is used to request new parameters for multiphase trial job, + // it is not metrics, so it is skipped here. + if (metrics.type === 'REQUEST_PARAMETER') { + + return; + } assert(trialJobId === metrics.trial_job_id); await this.db.storeMetricData(trialJobId, JSON.stringify({ trialJobId: metrics.trial_job_id, @@ -160,25 +168,56 @@ class NNIDataStore implements DataStore { private async getFinalMetricData(trialJobId: string): Promise { const metrics: MetricDataRecord[] = await this.getMetricData(trialJobId, 'FINAL'); - if (metrics.length > 1) { - this.log.error(`Found multiple final results for trial job: ${trialJobId}`); + + const multiPhase: boolean = await this.isMultiPhase(); + + if (metrics.length > 1 && !multiPhase) { + this.log.error(`Found multiple FINAL results for trial job ${trialJobId}`); + } + + return metrics[metrics.length - 1]; + } + + private async isMultiPhase(): Promise { + if (this.multiPhase === undefined) { + this.multiPhase = (await this.getExperimentProfile(getExperimentId())).params.multiPhase; } - return metrics[0]; + if (this.multiPhase !== undefined) { + return this.multiPhase; + } else { + return false; + } } - private getJobStatusByLatestEvent(event: TrialJobEvent): TrialJobStatus { + private getJobStatusByLatestEvent(oldStatus: TrialJobStatus, event: TrialJobEvent): TrialJobStatus { switch (event) { case 'USER_TO_CANCEL': return 'USER_CANCELED'; case 'ADD_CUSTOMIZED': return 'WAITING'; + case 'ADD_HYPERPARAMETER': + return oldStatus; default: } return event; } + private mergeHyperParameters(hyperParamList: string[], newParamStr: string): string[] { + const mergedHyperParams: any[] = []; + const newParam: any = JSON.parse(newParamStr); + for (const hyperParamStr of hyperParamList) { + const hyperParam: any = JSON.parse(hyperParamStr); + mergedHyperParams.push(hyperParam); + } + if (mergedHyperParams.filter((value: any) => value.parameter_index === newParam.parameter_index).length <= 0) { + mergedHyperParams.push(newParam); + } + + return mergedHyperParams.map((value: any) => { return JSON.stringify(value); }); + } + private getTrialJobsByReplayEvents(trialJobEvents: TrialJobEventRecord[]): Map { const map: Map = new Map(); // assume data is stored by time ASC order @@ -192,7 +231,8 @@ class NNIDataStore implements DataStore { } else { jobInfo = { id: record.trialJobId, - status: this.getJobStatusByLatestEvent(record.event) + status: this.getJobStatusByLatestEvent('UNKNOWN', record.event), + hyperParameters: [] }; } if (!jobInfo) { @@ -221,9 +261,13 @@ class NNIDataStore implements DataStore { } default: } - jobInfo.status = this.getJobStatusByLatestEvent(record.event); + jobInfo.status = this.getJobStatusByLatestEvent(jobInfo.status, record.event); if (record.data !== undefined && record.data.trim().length > 0) { - jobInfo.hyperParameters = record.data; + if (jobInfo.hyperParameters !== undefined) { + jobInfo.hyperParameters = this.mergeHyperParameters(jobInfo.hyperParameters, record.data); + } else { + assert(false, 'jobInfo.hyperParameters is undefined'); + } } map.set(record.trialJobId, jobInfo); } diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index f562598b11..c84688c038 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -37,7 +37,7 @@ import { import { delay , getLogDir, getMsgDispatcherCommand} from '../common/utils'; import { ADD_CUSTOMIZED_TRIAL_JOB, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, REPORT_METRIC_DATA, - REQUEST_TRIAL_JOBS, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE + REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE } from './commands'; import { createDispatcherInterface, IpcInterface } from './ipcInterface'; import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs'; @@ -116,7 +116,7 @@ class NNIManager implements Manager { await this.storeExperimentProfile(); this.log.debug('Setup tuner...'); - const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); console.log(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( //expParams.tuner.tunerCommand, @@ -140,7 +140,7 @@ class NNIManager implements Manager { this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId); const expParams: ExperimentParams = this.experimentProfile.params; - const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); console.log(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( dispatcherCommand, @@ -462,7 +462,10 @@ class NNIManager implements Manager { this.currSubmittedTrialNum++; const trialJobAppForm: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: content + hyperParameters: { + value: content, + index: 0 + } }; const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm); this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail)); @@ -474,6 +477,22 @@ class NNIManager implements Manager { } } break; + case SEND_TRIAL_JOB_PARAMETER: + const tunerCommand: any = JSON.parse(content); + assert(tunerCommand.parameter_index >= 0); + assert(tunerCommand.trial_job_id !== undefined); + + const trialJobForm: TrialJobApplicationForm = { + jobType: 'TRIAL', + hyperParameters: { + value: content, + index: tunerCommand.parameter_index + } + }; + await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm); + await this.dataStore.storeTrialJobEvent( + 'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined); + break; case NO_MORE_TRIAL_JOBS: this.trialJobsMaintainer.setNoMoreTrials(); break; diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index a94f9f11b8..01985cddb2 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -57,6 +57,7 @@ export namespace ValidationSchemas { trialConcurrency: joi.number().min(0).required(), searchSpace: joi.string().required(), maxExecDuration: joi.number().min(0).required(), + multiPhase: joi.boolean(), tuner: joi.object({ builtinTunerName: joi.string().valid('TPE', 'Random', 'Anneal', 'Evolution', 'SMAC', 'BatchTuner'), codeDir: joi.string(), diff --git a/src/nni_manager/training_service/local/localTrainingService.ts b/src/nni_manager/training_service/local/localTrainingService.ts index 50eb9b3846..b66dd8d68c 100644 --- a/src/nni_manager/training_service/local/localTrainingService.ts +++ b/src/nni_manager/training_service/local/localTrainingService.ts @@ -30,10 +30,11 @@ import { getLogger, Logger } from '../../common/log'; import { TrialConfig } from '../common/trialConfig'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { - HostJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm, + HostJobApplicationForm, JobApplicationForm, HyperParameters, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus } from '../../common/trainingService'; import { delay, getExperimentRootDir, uniqueString } from '../../common/utils'; +import { file } from 'tmp'; const tkill = require('tree-kill'); @@ -210,8 +211,18 @@ class LocalTrainingService implements TrainingService { * @param trialJobId trial job id * @param form job application form */ - public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { - throw new MethodNotImplementedError(); + public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`updateTrialJob failed: ${trialJobId} not found`); + } + if (form.jobType === 'TRIAL') { + await this.writeParameterFile(trialJobDetail.workingDirectory, (form).hyperParameters); + } else { + throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`); + } + + return trialJobDetail; } /** @@ -332,10 +343,7 @@ class LocalTrainingService implements TrainingService { await cpp.exec(`mkdir -p ${path.join(trialJobDetail.workingDirectory, '.nni')}`); await cpp.exec(`touch ${path.join(trialJobDetail.workingDirectory, '.nni', 'metrics')}`); await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, 'run.sh'), runScriptLines.join('\n'), { encoding: 'utf8' }); - await fs.promises.writeFile( - path.join(trialJobDetail.workingDirectory, 'parameter.cfg'), - (trialJobDetail.form).hyperParameters, - { encoding: 'utf8' }); + await this.writeParameterFile(trialJobDetail.workingDirectory, (trialJobDetail.form).hyperParameters); const process: cp.ChildProcess = cp.exec(`bash ${path.join(trialJobDetail.workingDirectory, 'run.sh')}`); this.setTrialJobStatus(trialJobDetail, 'RUNNING'); @@ -402,6 +410,11 @@ class LocalTrainingService implements TrainingService { } } } + + private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise { + const filepath: string = path.join(directory, `parameter_${hyperParameters.index}.cfg`); + await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' }); + } } export { LocalTrainingService }; diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index e1cff16f22..a4be7a1b0d 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -34,7 +34,7 @@ import { getExperimentId } from '../../common/experimentStartupInfo'; import { getLogger, Logger } from '../../common/log'; import { ObservableTimer } from '../../common/observableTimer'; import { - HostJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric + HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService'; import { delay, getExperimentRootDir, uniqueString } from '../../common/utils'; import { GPUSummary } from '../common/gpuData'; @@ -198,8 +198,24 @@ class RemoteMachineTrainingService implements TrainingService { * @param trialJobId trial job id * @param form job application form */ - public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { - throw new MethodNotImplementedError(); + public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + this.log.info(`updateTrialJob: form: ${JSON.stringify(form)}`); + const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`updateTrialJob failed: ${trialJobId} not found`); + } + if (form.jobType === 'TRIAL') { + const rmMeta: RemoteMachineMeta | undefined = (trialJobDetail).rmMeta; + if (rmMeta !== undefined) { + await this.writeParameterFile(trialJobId, (form).hyperParameters, rmMeta); + } else { + throw new Error(`updateTrialJob failed: ${trialJobId} rmMeta not found`); + } + } else { + throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`); + } + + return trialJobDetail; } /** @@ -442,15 +458,13 @@ class RemoteMachineTrainingService implements TrainingService { //create tmp trial working folder locally. await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); - // Write file content ( run.sh and parameter.cfg ) to local tmp files + // Write file content ( run.sh and parameter_0.cfg ) to local tmp files await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptContent, { encoding: 'utf8' }); - await fs.promises.writeFile(path.join(trialLocalTempFolder, 'parameter.cfg'), form.hyperParameters, { encoding: 'utf8' }); // Copy local tmp files to remote machine await SSHClientUtility.copyFileToRemote( path.join(trialLocalTempFolder, 'run.sh'), path.join(trialWorkingFolder, 'run.sh'), sshClient); - await SSHClientUtility.copyFileToRemote( - path.join(trialLocalTempFolder, 'parameter.cfg'), path.join(trialWorkingFolder, 'parameter.cfg'), sshClient); + await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta); // Copy files in codeDir to remote working directory await SSHClientUtility.copyDirectoryToRemote(this.trialConfig.codeDir, trialWorkingFolder, sshClient); @@ -562,6 +576,22 @@ class RemoteMachineTrainingService implements TrainingService { return jobpidPath; } + + private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters, rmMeta: RemoteMachineMeta): Promise { + const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta); + if (sshClient === undefined) { + throw new Error('sshClient is undefined.'); + } + + const trialWorkingFolder: string = path.join(this.remoteExpRootDir, 'trials', trialJobId); + const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId); + + const fileName: string = `parameter_${hyperParameters.index}.cfg`; + const localFilepath: string = path.join(trialLocalTempFolder, fileName); + await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' }); + + await SSHClientUtility.copyFileToRemote(localFilepath, path.join(trialWorkingFolder, fileName), sshClient); + } } export { RemoteMachineTrainingService }; diff --git a/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts b/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts index b55e041e1d..7509ea2ade 100644 --- a/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts +++ b/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts @@ -100,7 +100,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => { TrialConfigMetadataKey.TRIAL_CONFIG, `{"command":"sleep 1h && echo ","codeDir":"${localCodeDir}","gpuNum":1}`); const form: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: 'mock hyperparameters' + hyperParameters: { + value: 'mock hyperparameters', + index: 0 + } }; const trialJob = await remoteMachineTrainingService.submitTrialJob(form); @@ -135,7 +138,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => { // submit job const form: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: 'mock hyperparameters' + hyperParameters: { + value: 'mock hyperparameters', + index: 0 + } }; const jobDetail: TrialJobDetail = await remoteMachineTrainingService.submitTrialJob(form); // Add metrics listeners diff --git a/src/sdk/pynni/nni/__main__.py b/src/sdk/pynni/nni/__main__.py index 206cd1f5c1..5454e98343 100644 --- a/src/sdk/pynni/nni/__main__.py +++ b/src/sdk/pynni/nni/__main__.py @@ -29,7 +29,7 @@ from .constants import ModuleName, ClassName, ClassArgs from nni.msg_dispatcher import MsgDispatcher - +from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher logger = logging.getLogger('nni.main') logger.debug('START') @@ -90,6 +90,7 @@ def parse_args(): help='Assessor directory') parser.add_argument('--assessor_class_filename', type=str, required=False, help='Assessor class file path') + parser.add_argument('--multi_phase', action='store_true') flags, _ = parser.parse_known_args() return flags @@ -132,7 +133,10 @@ def main(): if assessor is None: raise AssertionError('Failed to create Assessor instance') - dispatcher = MsgDispatcher(tuner, assessor) + if args.multi_phase: + dispatcher = MultiPhaseMsgDispatcher(tuner, assessor) + else: + dispatcher = MsgDispatcher(tuner, assessor) try: dispatcher.run() diff --git a/src/sdk/pynni/nni/multi_phase/__init__.py b/src/sdk/pynni/nni/multi_phase/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py new file mode 100644 index 0000000000..ec7d2be0f1 --- /dev/null +++ b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py @@ -0,0 +1,178 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT +# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ================================================================================================== + +import logging +from collections import defaultdict +import json_tricks + +from nni.protocol import CommandType, send +from nni.msg_dispatcher_base import MsgDispatcherBase +from nni.assessor import AssessResult + +_logger = logging.getLogger(__name__) + +# Assessor global variables +_trial_history = defaultdict(dict) +'''key: trial job ID; value: intermediate results, mapping from sequence number to data''' + +_ended_trials = set() +'''trial_job_id of all ended trials. +We need this because NNI manager may send metrics after reporting a trial ended. +TODO: move this logic to NNI manager +''' + +def _sort_history(history): + ret = [ ] + for i, _ in enumerate(history): + if i in history: + ret.append(history[i]) + else: + break + return ret + +# Tuner global variables +_next_parameter_id = 0 +_trial_params = {} +'''key: trial job ID; value: parameters''' +_customized_parameter_ids = set() + +def _create_parameter_id(): + global _next_parameter_id # pylint: disable=global-statement + _next_parameter_id += 1 + return _next_parameter_id - 1 + +def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, parameter_index=None): + _trial_params[parameter_id] = params + ret = { + 'parameter_id': parameter_id, + 'parameter_source': 'customized' if customized else 'algorithm', + 'parameters': params + } + if trial_job_id is not None: + ret['trial_job_id'] = trial_job_id + if parameter_index is not None: + ret['parameter_index'] = parameter_index + else: + ret['parameter_index'] = 0 + return json_tricks.dumps(ret) + +class MultiPhaseMsgDispatcher(MsgDispatcherBase): + def __init__(self, tuner, assessor=None): + super() + self.tuner = tuner + self.assessor = assessor + if assessor is None: + _logger.debug('Assessor is not configured') + + def load_checkpoint(self): + self.tuner.load_checkpoint() + if self.assessor is not None: + self.assessor.load_checkpoint() + + def save_checkpoint(self): + self.tuner.save_checkpoint() + if self.assessor is not None: + self.assessor.save_checkpoint() + + def handle_request_trial_jobs(self, data): + # data: number or trial jobs + ids = [_create_parameter_id() for _ in range(data)] + params_list = self.tuner.generate_multiple_parameters(ids) + assert len(ids) == len(params_list) + for i, _ in enumerate(ids): + send(CommandType.NewTrialJob, _pack_parameter(ids[i], params_list[i])) + return True + + def handle_update_search_space(self, data): + self.tuner.update_search_space(data) + return True + + def handle_add_customized_trial(self, data): + # data: parameters + id_ = _create_parameter_id() + _customized_parameter_ids.add(id_) + send(CommandType.NewTrialJob, _pack_parameter(id_, data, customized=True)) + return True + + def handle_report_metric_data(self, data): + trial_job_id = data['trial_job_id'] + if data['type'] == 'FINAL': + id_ = data['parameter_id'] + if id_ in _customized_parameter_ids: + self.tuner.receive_customized_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) + else: + self.tuner.receive_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) + elif data['type'] == 'PERIODICAL': + if self.assessor is not None: + self._handle_intermediate_metric_data(data) + else: + pass + elif data['type'] == 'REQUEST_PARAMETER': + assert data['trial_job_id'] is not None + assert data['parameter_index'] is not None + param_id = _create_parameter_id() + param = self.tuner.generate_parameters(param_id, trial_job_id) + send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'], parameter_index=data['parameter_index'])) + else: + raise ValueError('Data type not supported: {}'.format(data['type'])) + + return True + + def handle_trial_end(self, data): + trial_job_id = data['trial_job_id'] + _ended_trials.add(trial_job_id) + if trial_job_id in _trial_history: + _trial_history.pop(trial_job_id) + if self.assessor is not None: + self.assessor.trial_end(trial_job_id, data['event'] == 'SUCCEEDED') + return True + + def _handle_intermediate_metric_data(self, data): + if data['type'] != 'PERIODICAL': + return True + if self.assessor is None: + return True + + trial_job_id = data['trial_job_id'] + if trial_job_id in _ended_trials: + return True + + history = _trial_history[trial_job_id] + history[data['sequence']] = data['value'] + ordered_history = _sort_history(history) + if len(ordered_history) < data['sequence']: # no user-visible update since last time + return True + + try: + result = self.assessor.assess_trial(trial_job_id, ordered_history) + except Exception as e: + _logger.exception('Assessor error') + + if isinstance(result, bool): + result = AssessResult.Good if result else AssessResult.Bad + elif not isinstance(result, AssessResult): + msg = 'Result of Assessor.assess_trial must be an object of AssessResult, not %s' + raise RuntimeError(msg % type(result)) + + if result is AssessResult.Bad: + _logger.debug('BAD, kill %s', trial_job_id) + send(CommandType.KillTrialJob, json_tricks.dumps(trial_job_id)) + else: + _logger.debug('GOOD') diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py b/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py new file mode 100644 index 0000000000..1fb10ab676 --- /dev/null +++ b/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT +# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ================================================================================================== + + +import logging + +from nni.recoverable import Recoverable + +_logger = logging.getLogger(__name__) + + +class MultiPhaseTuner(Recoverable): + # pylint: disable=no-self-use,unused-argument + + def generate_parameters(self, parameter_id, trial_job_id=None): + """Returns a set of trial (hyper-)parameters, as a serializable object. + User code must override either this function or 'generate_multiple_parameters()'. + parameter_id: int + """ + raise NotImplementedError('Tuner: generate_parameters not implemented') + + def generate_multiple_parameters(self, parameter_id_list): + """Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects. + Call 'generate_parameters()' by 'count' times by default. + User code must override either this function or 'generate_parameters()'. + parameter_id_list: list of int + """ + return [self.generate_parameters(parameter_id) for parameter_id in parameter_id_list] + + def receive_trial_result(self, parameter_id, parameters, reward, trial_job_id): + """Invoked when a trial reports its final result. Must override. + parameter_id: int + parameters: object created by 'generate_parameters()' + reward: object reported by trial + """ + raise NotImplementedError('Tuner: receive_trial_result not implemented') + + def receive_customized_trial_result(self, parameter_id, parameters, reward, trial_job_id): + """Invoked when a trial added by WebUI reports its final result. Do nothing by default. + parameter_id: int + parameters: object created by user + reward: object reported by trial + """ + _logger.info('Customized trial job %s ignored by tuner', parameter_id) + + def update_search_space(self, search_space): + """Update the search space of tuner. Must override. + search_space: JSON object + """ + raise NotImplementedError('Tuner: update_search_space not implemented') + + def load_checkpoint(self): + """Load the checkpoint of tuner. + path: checkpoint directory for tuner + """ + checkpoin_path = self.get_checkpoint_path() + _logger.info('Load checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) + + def save_checkpoint(self): + """Save the checkpoint of tuner. + path: checkpoint directory for tuner + """ + checkpoin_path = self.get_checkpoint_path() + _logger.info('Save checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) + + def _on_exit(self): + pass + + def _on_error(self): + pass diff --git a/src/sdk/pynni/nni/platform/local.py b/src/sdk/pynni/nni/platform/local.py index 7a9df82971..1c3b196bf4 100644 --- a/src/sdk/pynni/nni/platform/local.py +++ b/src/sdk/pynni/nni/platform/local.py @@ -18,11 +18,12 @@ # OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ================================================================================================== - -import json import os +import json +import time +import json_tricks -from ..common import init_logger +from ..common import init_logger, env_args _sysdir = os.environ['NNI_SYS_DIR'] if not os.path.exists(os.path.join(_sysdir, '.nni')): @@ -35,10 +36,28 @@ _log_file_path = os.path.join(_outputdir, 'trial.log') init_logger(_log_file_path) +_param_index = 0 + +def request_next_parameter(): + metric = json_tricks.dumps({ + 'trial_job_id': env_args.trial_job_id, + 'type': 'REQUEST_PARAMETER', + 'sequence': 0, + 'parameter_index': _param_index + }) + send_metric(metric) def get_parameters(): - params_file = open(os.path.join(_sysdir, 'parameter.cfg'), 'r') - return json.load(params_file) + global _param_index + params_filepath = os.path.join(_sysdir, 'parameter_{}.cfg'.format(_param_index)) + if not os.path.isfile(params_filepath): + request_next_parameter() + while not os.path.isfile(params_filepath): + time.sleep(3) + params_file = open(params_filepath, 'r') + params = json.load(params_file) + _param_index += 1 + return params def send_metric(string): data = (string + '\n').encode('utf8') diff --git a/src/sdk/pynni/nni/protocol.py b/src/sdk/pynni/nni/protocol.py index 6c0e71506d..ada5527bfa 100644 --- a/src/sdk/pynni/nni/protocol.py +++ b/src/sdk/pynni/nni/protocol.py @@ -34,6 +34,7 @@ class CommandType(Enum): # out NewTrialJob = b'TR' + SendTrialJobParameter = b'SP' NoMoreTrialJobs = b'NO' KillTrialJob = b'KI' @@ -55,7 +56,7 @@ def send(command, data): data = data.encode('utf8') assert len(data) < 1000000, 'Command too long' msg = b'%b%06d%b' % (command.value, len(data), data) - logging.getLogger(__name__).debug('Sending command, data: [%s]' % data) + logging.getLogger(__name__).debug('Sending command, data: [%s]' % msg) _out_file.write(msg) _out_file.flush() diff --git a/src/sdk/pynni/nni/trial.py b/src/sdk/pynni/nni/trial.py index e5884f8dac..27dc864d23 100644 --- a/src/sdk/pynni/nni/trial.py +++ b/src/sdk/pynni/nni/trial.py @@ -32,11 +32,13 @@ ] -_params = platform.get_parameters() +_params = None def get_parameters(): """Returns a set of (hyper-)paremeters generated by Tuner.""" + global _params + _params = platform.get_parameters() return _params['parameters'] @@ -51,6 +53,7 @@ def report_intermediate_result(metric): metric: serializable object. """ global _intermediate_seq + assert _params is not None, 'nni.get_parameters() needs to be called before report_intermediate_result' metric = json_tricks.dumps({ 'parameter_id': _params['parameter_id'], 'trial_job_id': env_args.trial_job_id, @@ -66,6 +69,7 @@ def report_final_result(metric): """Reports final result to tuner. metric: serializable object. """ + assert _params is not None, 'nni.get_parameters() needs to be called before report_final_result' metric = json_tricks.dumps({ 'parameter_id': _params['parameter_id'], 'trial_job_id': env_args.trial_job_id, diff --git a/src/sdk/pynni/tests/test_multi_phase_tuner.py b/src/sdk/pynni/tests/test_multi_phase_tuner.py new file mode 100644 index 0000000000..72b477999e --- /dev/null +++ b/src/sdk/pynni/tests/test_multi_phase_tuner.py @@ -0,0 +1,89 @@ +import logging +import random +from io import BytesIO + +import nni +import nni.protocol +from nni.protocol import CommandType, send, receive +from nni.multi_phase.multi_phase_tuner import MultiPhaseTuner +from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher + +from unittest import TestCase, main + +class NaiveMultiPhaseTuner(MultiPhaseTuner): + ''' + supports only choices + ''' + def __init__(self): + self.search_space = None + + def generate_parameters(self, parameter_id, trial_job_id=None): + """Returns a set of trial (hyper-)parameters, as a serializable object. + User code must override either this function or 'generate_multiple_parameters()'. + parameter_id: int + """ + generated_parameters = {} + if self.search_space is None: + raise AssertionError('Search space not specified') + for k in self.search_space: + param = self.search_space[k] + if not param['_type'] == 'choice': + raise ValueError('Only choice type is supported') + param_values = param['_value'] + generated_parameters[k] = param_values[random.randint(0, len(param_values)-1)] + logging.getLogger(__name__).debug(generated_parameters) + return generated_parameters + + + def receive_trial_result(self, parameter_id, parameters, reward, trial_job_id): + logging.getLogger(__name__).debug('receive_trial_result: {},{},{},{}'.format(parameter_id, parameters, reward, trial_job_id)) + + def receive_customized_trial_result(self, parameter_id, parameters, reward, trial_job_id): + pass + + def update_search_space(self, search_space): + self.search_space = search_space + + +_in_buf = BytesIO() +_out_buf = BytesIO() + +def _reverse_io(): + _in_buf.seek(0) + _out_buf.seek(0) + nni.protocol._out_file = _in_buf + nni.protocol._in_file = _out_buf + +def _restore_io(): + _in_buf.seek(0) + _out_buf.seek(0) + nni.protocol._in_file = _in_buf + nni.protocol._out_file = _out_buf + +def _test_tuner(): + _reverse_io() # now we are sending to Tuner's incoming stream + send(CommandType.UpdateSearchSpace, "{\"learning_rate\": {\"_value\": [0.0001, 0.001, 0.002, 0.005, 0.01], \"_type\": \"choice\"}, \"optimizer\": {\"_value\": [\"Adam\", \"SGD\"], \"_type\": \"choice\"}}") + send(CommandType.RequestTrialJobs, '2') + send(CommandType.ReportMetricData, '{"parameter_id":0,"type":"PERIODICAL","value":10,"trial_job_id":"abc"}') + send(CommandType.ReportMetricData, '{"parameter_id":1,"type":"FINAL","value":11,"trial_job_id":"abc"}') + send(CommandType.AddCustomizedTrialJob, '{"param":-1}') + send(CommandType.ReportMetricData, '{"parameter_id":2,"type":"FINAL","value":22,"trial_job_id":"abc"}') + send(CommandType.RequestTrialJobs, '1') + send(CommandType.TrialEnd, '{"trial_job_id":"abc"}') + _restore_io() + + tuner = NaiveMultiPhaseTuner() + dispatcher = MultiPhaseMsgDispatcher(tuner) + dispatcher.run() + + _reverse_io() # now we are receiving from Tuner's outgoing stream + + command, data = receive() # this one is customized + print(command, data) + +class MultiPhaseTestCase(TestCase): + def test_tuner(self): + _test_tuner() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/naive/run.py b/test/naive/run.py index 239bedcd2c..f54fe7ab71 100644 --- a/test/naive/run.py +++ b/test/naive/run.py @@ -54,7 +54,7 @@ def run(): if trial > current_trial: current_trial = trial print('Trial #%d done' % trial) - + subprocess.run(['nnictl', 'log', 'stderr']) assert tuner_status == 'DONE' and assessor_status == 'DONE', 'Failed to finish in 1 min' ss1 = json.load(open('search_space.json')) diff --git a/tools/nnicmd/config_schema.py b/tools/nnicmd/config_schema.py index 9eff5bfa66..a4bb503291 100644 --- a/tools/nnicmd/config_schema.py +++ b/tools/nnicmd/config_schema.py @@ -29,6 +29,7 @@ Optional('maxTrialNum'): And(int, lambda x: 1 <= x <= 99999), 'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai']), Optional('searchSpacePath'): os.path.exists, +Optional('multiPhase'): bool, 'useAnnotation': bool, 'tuner': Or({ 'builtinTunerName': Or('TPE', 'Random', 'Anneal', 'Evolution', 'SMAC', 'BatchTuner'), diff --git a/tools/nnicmd/launcher.py b/tools/nnicmd/launcher.py index b223551bea..1e4bd25f88 100644 --- a/tools/nnicmd/launcher.py +++ b/tools/nnicmd/launcher.py @@ -114,6 +114,8 @@ def set_pai_config(experiment_config, port): if not response or not response.status_code == 200: if response is not None: err_message = response.text + with open(STDERR_FULL_PATH, 'a+') as fout: + fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':'))) return False, err_message #set trial_config @@ -128,6 +130,8 @@ def set_experiment(experiment_config, mode, port): request_data['maxExecDuration'] = experiment_config['maxExecDuration'] request_data['maxTrialNum'] = experiment_config['maxTrialNum'] request_data['searchSpace'] = experiment_config.get('searchSpace') + if experiment_config.get('multiPhase'): + request_data['multiPhase'] = experiment_config.get('multiPhase') request_data['tuner'] = experiment_config['tuner'] if 'assessor' in experiment_config: request_data['assessor'] = experiment_config['assessor']