diff --git a/.travis.yml b/.travis.yml index 7cc4d8c0ba..f7f0865f93 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ before_install: - sudo sh -c 'PATH=/usr/local/node/bin:$PATH yarn global add serve' install: - make - - make install + - make dev-install - export PATH=$HOME/.nni/bin:$PATH before_script: - cd test/naive diff --git a/src/nni_manager/common/datastore.ts b/src/nni_manager/common/datastore.ts index b86b0a95fe..7ed2328d7d 100644 --- a/src/nni_manager/common/datastore.ts +++ b/src/nni_manager/common/datastore.ts @@ -22,7 +22,7 @@ import { ExperimentProfile, TrialJobStatistics } from './manager'; import { TrialJobDetail, TrialJobStatus } from './trainingService'; -type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED'; +type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER'; type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM'; interface ExperimentProfileRecord { @@ -62,7 +62,7 @@ interface TrialJobInfo { status: TrialJobStatus; startTime?: number; endTime?: number; - hyperParameters?: string; + hyperParameters?: string[]; logPath?: string; finalMetricData?: string; stderrPath?: string; diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index 10fb9a4227..1d02a1775d 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -31,6 +31,7 @@ interface ExperimentParams { maxExecDuration: number; //seconds maxTrialNum: number; searchSpace: string; + multiPhase?: boolean; tuner: { className: string; builtinTunerName?: string; diff --git a/src/nni_manager/common/trainingService.ts b/src/nni_manager/common/trainingService.ts index 0b8708394c..7bcc575c34 100644 --- a/src/nni_manager/common/trainingService.ts +++ b/src/nni_manager/common/trainingService.ts @@ -37,11 +37,16 @@ interface JobApplicationForm { readonly jobType: JobType; } +interface HyperParameters { + readonly value: string; + readonly index: number; +} + /** * define TrialJobApplicationForm */ interface TrialJobApplicationForm extends JobApplicationForm { - readonly hyperParameters: string; + readonly hyperParameters: HyperParameters; } /** @@ -116,6 +121,6 @@ abstract class TrainingService { export { TrainingService, TrainingServiceError, TrialJobStatus, TrialJobApplicationForm, - TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, + TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, HyperParameters, HostJobApplicationForm, JobApplicationForm, JobType }; diff --git a/src/nni_manager/common/utils.ts b/src/nni_manager/common/utils.ts index ba0650ef28..1356d0347b 100644 --- a/src/nni_manager/common/utils.ts +++ b/src/nni_manager/common/utils.ts @@ -158,8 +158,11 @@ function parseArg(names: string[]): string { * @param assessor: similiar as tuner * */ -function getMsgDispatcherCommand(tuner: any, assessor: any): string { +function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean = false): string { let command: string = `python3 -m nni --tuner_class_name ${tuner.className}`; + if (multiPhase) { + command += ' --multi_phase'; + } if (process.env.VIRTUAL_ENV) { command = path.join(process.env.VIRTUAL_ENV, 'bin/') +command; diff --git a/src/nni_manager/core/commands.ts b/src/nni_manager/core/commands.ts index 37bba31d9e..19204b2f31 100644 --- a/src/nni_manager/core/commands.ts +++ b/src/nni_manager/core/commands.ts @@ -27,6 +27,7 @@ const TRIAL_END = 'EN'; const TERMINATE = 'TE'; const NEW_TRIAL_JOB = 'TR'; +const SEND_TRIAL_JOB_PARAMETER = 'SP'; const NO_MORE_TRIAL_JOBS = 'NO'; const KILL_TRIAL_JOB = 'KI'; @@ -39,6 +40,7 @@ const TUNER_COMMANDS: Set = new Set([ TERMINATE, NEW_TRIAL_JOB, + SEND_TRIAL_JOB_PARAMETER, NO_MORE_TRIAL_JOBS ]); @@ -63,5 +65,6 @@ export { NO_MORE_TRIAL_JOBS, KILL_TRIAL_JOB, TUNER_COMMANDS, - ASSESSOR_COMMANDS + ASSESSOR_COMMANDS, + SEND_TRIAL_JOB_PARAMETER }; diff --git a/src/nni_manager/core/nniDataStore.ts b/src/nni_manager/core/nniDataStore.ts index 47c2f01dc3..1beec632be 100644 --- a/src/nni_manager/core/nniDataStore.ts +++ b/src/nni_manager/core/nniDataStore.ts @@ -118,6 +118,7 @@ class NNIDataStore implements DataStore { } public async storeMetricData(trialJobId: string, data: string): Promise { + this.log.debug(`storeMetricData: trialJobId: ${trialJobId}, data: ${data}`); const metrics = JSON.parse(data) as MetricData; assert(trialJobId === metrics.trial_job_id); await this.db.storeMetricData(trialJobId, JSON.stringify({ @@ -168,18 +169,34 @@ class NNIDataStore implements DataStore { } } - private getJobStatusByLatestEvent(event: TrialJobEvent): TrialJobStatus { + private getJobStatusByLatestEvent(oldStatus: TrialJobStatus, event: TrialJobEvent): TrialJobStatus { switch (event) { case 'USER_TO_CANCEL': return 'USER_CANCELED'; case 'ADD_CUSTOMIZED': return 'WAITING'; + case 'ADD_HYPERPARAMETER': + return oldStatus; default: } return event; } + private mergeHyperParameters(hyperParamList: string[], newParamStr: string): string[] { + const mergedHyperParams: any[] = []; + const newParam: any = JSON.parse(newParamStr); + for (const hyperParamStr of hyperParamList) { + const hyperParam: any = JSON.parse(hyperParamStr); + mergedHyperParams.push(hyperParam); + } + if (mergedHyperParams.filter((value: any) => { return value.parameter_index === newParam.parameter_index; }).length <= 0) { + mergedHyperParams.push(newParam); + } + + return mergedHyperParams.map((value: any) => { return JSON.stringify(value); }); + } + private getTrialJobsByReplayEvents(trialJobEvents: TrialJobEventRecord[]): Map { const map: Map = new Map(); // assume data is stored by time ASC order @@ -193,7 +210,8 @@ class NNIDataStore implements DataStore { } else { jobInfo = { id: record.trialJobId, - status: this.getJobStatusByLatestEvent(record.event) + status: this.getJobStatusByLatestEvent('UNKNOWN', record.event), + hyperParameters: [] }; } if (!jobInfo) { @@ -222,9 +240,13 @@ class NNIDataStore implements DataStore { } default: } - jobInfo.status = this.getJobStatusByLatestEvent(record.event); + jobInfo.status = this.getJobStatusByLatestEvent(jobInfo.status, record.event); if (record.data !== undefined && record.data.trim().length > 0) { - jobInfo.hyperParameters = record.data; + if (jobInfo.hyperParameters !== undefined) { + jobInfo.hyperParameters = this.mergeHyperParameters(jobInfo.hyperParameters, record.data); + } else { + assert(false, 'jobInfo.hyperParameters is undefined'); + } } map.set(record.trialJobId, jobInfo); } diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index 48d9fa3c83..0647e775a3 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -37,7 +37,7 @@ import { import { delay , getLogDir, getMsgDispatcherCommand} from '../common/utils'; import { ADD_CUSTOMIZED_TRIAL_JOB, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, REPORT_METRIC_DATA, - REQUEST_TRIAL_JOBS, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE + REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE } from './commands'; import { createDispatcherInterface, IpcInterface } from './ipcInterface'; import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs'; @@ -116,7 +116,7 @@ class NNIManager implements Manager { await this.storeExperimentProfile(); this.log.debug('Setup tuner...'); - const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); console.log(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( //expParams.tuner.tunerCommand, @@ -140,7 +140,7 @@ class NNIManager implements Manager { this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId); const expParams: ExperimentParams = this.experimentProfile.params; - const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.multiPhase); console.log(`dispatcher command: ${dispatcherCommand}`); this.setupTuner( dispatcherCommand, @@ -460,7 +460,10 @@ class NNIManager implements Manager { this.currSubmittedTrialNum++; const trialJobAppForm: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: content + hyperParameters: { + value: content, + index: 0 + } }; const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm); this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail)); @@ -472,6 +475,22 @@ class NNIManager implements Manager { } } break; + case SEND_TRIAL_JOB_PARAMETER: + const tunerCommand: any = JSON.parse(content); + assert(tunerCommand.parameter_index >= 0); + assert(tunerCommand.trial_job_id !== undefined); + + const trialJobForm: TrialJobApplicationForm = { + jobType: 'TRIAL', + hyperParameters: { + value: content, + index: tunerCommand.parameter_index + } + }; + await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm); + await this.dataStore.storeTrialJobEvent( + 'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined); + break; case NO_MORE_TRIAL_JOBS: this.trialJobsMaintainer.setNoMoreTrials(); break; diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 218a8c22c4..a1c32d396f 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -47,6 +47,7 @@ export namespace ValidationSchemas { trialConcurrency: joi.number().min(0).required(), searchSpace: joi.string().required(), maxExecDuration: joi.number().min(0).required(), + multiPhase: joi.boolean(), tuner: joi.object({ builtinTunerName: joi.string().valid('TPE', 'Random', 'Anneal', 'Evolution'), codeDir: joi.string(), diff --git a/src/nni_manager/training_service/local/localTrainingService.ts b/src/nni_manager/training_service/local/localTrainingService.ts index 50eb9b3846..b66dd8d68c 100644 --- a/src/nni_manager/training_service/local/localTrainingService.ts +++ b/src/nni_manager/training_service/local/localTrainingService.ts @@ -30,10 +30,11 @@ import { getLogger, Logger } from '../../common/log'; import { TrialConfig } from '../common/trialConfig'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { - HostJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm, + HostJobApplicationForm, JobApplicationForm, HyperParameters, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus } from '../../common/trainingService'; import { delay, getExperimentRootDir, uniqueString } from '../../common/utils'; +import { file } from 'tmp'; const tkill = require('tree-kill'); @@ -210,8 +211,18 @@ class LocalTrainingService implements TrainingService { * @param trialJobId trial job id * @param form job application form */ - public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { - throw new MethodNotImplementedError(); + public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + const trialJobDetail: undefined | TrialJobDetail = this.jobMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`updateTrialJob failed: ${trialJobId} not found`); + } + if (form.jobType === 'TRIAL') { + await this.writeParameterFile(trialJobDetail.workingDirectory, (form).hyperParameters); + } else { + throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`); + } + + return trialJobDetail; } /** @@ -332,10 +343,7 @@ class LocalTrainingService implements TrainingService { await cpp.exec(`mkdir -p ${path.join(trialJobDetail.workingDirectory, '.nni')}`); await cpp.exec(`touch ${path.join(trialJobDetail.workingDirectory, '.nni', 'metrics')}`); await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, 'run.sh'), runScriptLines.join('\n'), { encoding: 'utf8' }); - await fs.promises.writeFile( - path.join(trialJobDetail.workingDirectory, 'parameter.cfg'), - (trialJobDetail.form).hyperParameters, - { encoding: 'utf8' }); + await this.writeParameterFile(trialJobDetail.workingDirectory, (trialJobDetail.form).hyperParameters); const process: cp.ChildProcess = cp.exec(`bash ${path.join(trialJobDetail.workingDirectory, 'run.sh')}`); this.setTrialJobStatus(trialJobDetail, 'RUNNING'); @@ -402,6 +410,11 @@ class LocalTrainingService implements TrainingService { } } } + + private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise { + const filepath: string = path.join(directory, `parameter_${hyperParameters.index}.cfg`); + await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' }); + } } export { LocalTrainingService }; diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index 772b93ff5d..6ca4552f63 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -34,7 +34,7 @@ import { getExperimentId } from '../../common/experimentStartupInfo'; import { getLogger, Logger } from '../../common/log'; import { ObservableTimer } from '../../common/observableTimer'; import { - HostJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric + HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService'; import { delay, getExperimentRootDir, uniqueString } from '../../common/utils'; import { GPUSummary } from '../common/gpuData'; @@ -198,8 +198,24 @@ class RemoteMachineTrainingService implements TrainingService { * @param trialJobId trial job id * @param form job application form */ - public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { - throw new MethodNotImplementedError(); + public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + this.log.info(`updateTrialJob: form: ${JSON.stringify(form)}`); + const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`updateTrialJob failed: ${trialJobId} not found`); + } + if (form.jobType === 'TRIAL') { + const rmMeta: RemoteMachineMeta | undefined = (trialJobDetail).rmMeta; + if (rmMeta !== undefined) { + await this.writeParameterFile(trialJobId, (form).hyperParameters, rmMeta); + } else { + throw new Error(`updateTrialJob failed: ${trialJobId} rmMeta not found`); + } + } else { + throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`); + } + + return trialJobDetail; } /** @@ -442,15 +458,13 @@ class RemoteMachineTrainingService implements TrainingService { //create tmp trial working folder locally. await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); - // Write file content ( run.sh and parameter.cfg ) to local tmp files + // Write file content ( run.sh and parameter_0.cfg ) to local tmp files await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptContent, { encoding: 'utf8' }); - await fs.promises.writeFile(path.join(trialLocalTempFolder, 'parameter.cfg'), form.hyperParameters, { encoding: 'utf8' }); // Copy local tmp files to remote machine await SSHClientUtility.copyFileToRemote( path.join(trialLocalTempFolder, 'run.sh'), path.join(trialWorkingFolder, 'run.sh'), sshClient); - await SSHClientUtility.copyFileToRemote( - path.join(trialLocalTempFolder, 'parameter.cfg'), path.join(trialWorkingFolder, 'parameter.cfg'), sshClient); + await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta); // Copy files in codeDir to remote working directory await SSHClientUtility.copyDirectoryToRemote(this.trialConfig.codeDir, trialWorkingFolder, sshClient); @@ -562,6 +576,22 @@ class RemoteMachineTrainingService implements TrainingService { return jobpidPath; } + + private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters, rmMeta: RemoteMachineMeta): Promise { + const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta); + if (sshClient === undefined) { + throw new Error('sshClient is undefined.'); + } + + const trialWorkingFolder: string = path.join(this.remoteExpRootDir, 'trials', trialJobId); + const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId); + + const fileName: string = `parameter_${hyperParameters.index}.cfg`; + const localFilepath: string = path.join(trialLocalTempFolder, fileName); + await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' }); + + await SSHClientUtility.copyFileToRemote(localFilepath, path.join(trialWorkingFolder, fileName), sshClient); + } } export { RemoteMachineTrainingService }; diff --git a/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts b/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts index b55e041e1d..7509ea2ade 100644 --- a/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts +++ b/src/nni_manager/training_service/test/remoteMachineTrainingService.test.ts @@ -100,7 +100,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => { TrialConfigMetadataKey.TRIAL_CONFIG, `{"command":"sleep 1h && echo ","codeDir":"${localCodeDir}","gpuNum":1}`); const form: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: 'mock hyperparameters' + hyperParameters: { + value: 'mock hyperparameters', + index: 0 + } }; const trialJob = await remoteMachineTrainingService.submitTrialJob(form); @@ -135,7 +138,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => { // submit job const form: TrialJobApplicationForm = { jobType: 'TRIAL', - hyperParameters: 'mock hyperparameters' + hyperParameters: { + value: 'mock hyperparameters', + index: 0 + } }; const jobDetail: TrialJobDetail = await remoteMachineTrainingService.submitTrialJob(form); // Add metrics listeners diff --git a/src/sdk/pynni/nni/__main__.py b/src/sdk/pynni/nni/__main__.py index e3a39bac96..cf839de87d 100644 --- a/src/sdk/pynni/nni/__main__.py +++ b/src/sdk/pynni/nni/__main__.py @@ -28,6 +28,7 @@ import importlib from nni.msg_dispatcher import MsgDispatcher +from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher from nni.hyperopt_tuner.hyperopt_tuner import HyperoptTuner from nni.evolution_tuner.evolution_tuner import EvolutionTuner from nni.batch_tuner.batch_tuner import BatchTuner @@ -79,6 +80,7 @@ def parse_args(): help='Assessor directory') parser.add_argument('--assessor_class_filename', type=str, required=False, help='Assessor class file path') + parser.add_argument('--multi_phase', action='store_true') flags, _ = parser.parse_known_args() return flags @@ -110,7 +112,10 @@ def main(): if tuner is None: raise AssertionError('Failed to create Tuner instance') - dispatcher = MsgDispatcher(tuner, assessor) + if args.multi_phase: + dispatcher = MultiPhaseMsgDispatcher(tuner, assessor) + else: + dispatcher = MsgDispatcher(tuner, assessor) try: dispatcher.run() diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py new file mode 100644 index 0000000000..ec7d2be0f1 --- /dev/null +++ b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py @@ -0,0 +1,178 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT +# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ================================================================================================== + +import logging +from collections import defaultdict +import json_tricks + +from nni.protocol import CommandType, send +from nni.msg_dispatcher_base import MsgDispatcherBase +from nni.assessor import AssessResult + +_logger = logging.getLogger(__name__) + +# Assessor global variables +_trial_history = defaultdict(dict) +'''key: trial job ID; value: intermediate results, mapping from sequence number to data''' + +_ended_trials = set() +'''trial_job_id of all ended trials. +We need this because NNI manager may send metrics after reporting a trial ended. +TODO: move this logic to NNI manager +''' + +def _sort_history(history): + ret = [ ] + for i, _ in enumerate(history): + if i in history: + ret.append(history[i]) + else: + break + return ret + +# Tuner global variables +_next_parameter_id = 0 +_trial_params = {} +'''key: trial job ID; value: parameters''' +_customized_parameter_ids = set() + +def _create_parameter_id(): + global _next_parameter_id # pylint: disable=global-statement + _next_parameter_id += 1 + return _next_parameter_id - 1 + +def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, parameter_index=None): + _trial_params[parameter_id] = params + ret = { + 'parameter_id': parameter_id, + 'parameter_source': 'customized' if customized else 'algorithm', + 'parameters': params + } + if trial_job_id is not None: + ret['trial_job_id'] = trial_job_id + if parameter_index is not None: + ret['parameter_index'] = parameter_index + else: + ret['parameter_index'] = 0 + return json_tricks.dumps(ret) + +class MultiPhaseMsgDispatcher(MsgDispatcherBase): + def __init__(self, tuner, assessor=None): + super() + self.tuner = tuner + self.assessor = assessor + if assessor is None: + _logger.debug('Assessor is not configured') + + def load_checkpoint(self): + self.tuner.load_checkpoint() + if self.assessor is not None: + self.assessor.load_checkpoint() + + def save_checkpoint(self): + self.tuner.save_checkpoint() + if self.assessor is not None: + self.assessor.save_checkpoint() + + def handle_request_trial_jobs(self, data): + # data: number or trial jobs + ids = [_create_parameter_id() for _ in range(data)] + params_list = self.tuner.generate_multiple_parameters(ids) + assert len(ids) == len(params_list) + for i, _ in enumerate(ids): + send(CommandType.NewTrialJob, _pack_parameter(ids[i], params_list[i])) + return True + + def handle_update_search_space(self, data): + self.tuner.update_search_space(data) + return True + + def handle_add_customized_trial(self, data): + # data: parameters + id_ = _create_parameter_id() + _customized_parameter_ids.add(id_) + send(CommandType.NewTrialJob, _pack_parameter(id_, data, customized=True)) + return True + + def handle_report_metric_data(self, data): + trial_job_id = data['trial_job_id'] + if data['type'] == 'FINAL': + id_ = data['parameter_id'] + if id_ in _customized_parameter_ids: + self.tuner.receive_customized_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) + else: + self.tuner.receive_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) + elif data['type'] == 'PERIODICAL': + if self.assessor is not None: + self._handle_intermediate_metric_data(data) + else: + pass + elif data['type'] == 'REQUEST_PARAMETER': + assert data['trial_job_id'] is not None + assert data['parameter_index'] is not None + param_id = _create_parameter_id() + param = self.tuner.generate_parameters(param_id, trial_job_id) + send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'], parameter_index=data['parameter_index'])) + else: + raise ValueError('Data type not supported: {}'.format(data['type'])) + + return True + + def handle_trial_end(self, data): + trial_job_id = data['trial_job_id'] + _ended_trials.add(trial_job_id) + if trial_job_id in _trial_history: + _trial_history.pop(trial_job_id) + if self.assessor is not None: + self.assessor.trial_end(trial_job_id, data['event'] == 'SUCCEEDED') + return True + + def _handle_intermediate_metric_data(self, data): + if data['type'] != 'PERIODICAL': + return True + if self.assessor is None: + return True + + trial_job_id = data['trial_job_id'] + if trial_job_id in _ended_trials: + return True + + history = _trial_history[trial_job_id] + history[data['sequence']] = data['value'] + ordered_history = _sort_history(history) + if len(ordered_history) < data['sequence']: # no user-visible update since last time + return True + + try: + result = self.assessor.assess_trial(trial_job_id, ordered_history) + except Exception as e: + _logger.exception('Assessor error') + + if isinstance(result, bool): + result = AssessResult.Good if result else AssessResult.Bad + elif not isinstance(result, AssessResult): + msg = 'Result of Assessor.assess_trial must be an object of AssessResult, not %s' + raise RuntimeError(msg % type(result)) + + if result is AssessResult.Bad: + _logger.debug('BAD, kill %s', trial_job_id) + send(CommandType.KillTrialJob, json_tricks.dumps(trial_job_id)) + else: + _logger.debug('GOOD') diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py b/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py new file mode 100644 index 0000000000..1fb10ab676 --- /dev/null +++ b/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +# associated documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, distribute, +# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all copies or +# substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT +# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# ================================================================================================== + + +import logging + +from nni.recoverable import Recoverable + +_logger = logging.getLogger(__name__) + + +class MultiPhaseTuner(Recoverable): + # pylint: disable=no-self-use,unused-argument + + def generate_parameters(self, parameter_id, trial_job_id=None): + """Returns a set of trial (hyper-)parameters, as a serializable object. + User code must override either this function or 'generate_multiple_parameters()'. + parameter_id: int + """ + raise NotImplementedError('Tuner: generate_parameters not implemented') + + def generate_multiple_parameters(self, parameter_id_list): + """Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects. + Call 'generate_parameters()' by 'count' times by default. + User code must override either this function or 'generate_parameters()'. + parameter_id_list: list of int + """ + return [self.generate_parameters(parameter_id) for parameter_id in parameter_id_list] + + def receive_trial_result(self, parameter_id, parameters, reward, trial_job_id): + """Invoked when a trial reports its final result. Must override. + parameter_id: int + parameters: object created by 'generate_parameters()' + reward: object reported by trial + """ + raise NotImplementedError('Tuner: receive_trial_result not implemented') + + def receive_customized_trial_result(self, parameter_id, parameters, reward, trial_job_id): + """Invoked when a trial added by WebUI reports its final result. Do nothing by default. + parameter_id: int + parameters: object created by user + reward: object reported by trial + """ + _logger.info('Customized trial job %s ignored by tuner', parameter_id) + + def update_search_space(self, search_space): + """Update the search space of tuner. Must override. + search_space: JSON object + """ + raise NotImplementedError('Tuner: update_search_space not implemented') + + def load_checkpoint(self): + """Load the checkpoint of tuner. + path: checkpoint directory for tuner + """ + checkpoin_path = self.get_checkpoint_path() + _logger.info('Load checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) + + def save_checkpoint(self): + """Save the checkpoint of tuner. + path: checkpoint directory for tuner + """ + checkpoin_path = self.get_checkpoint_path() + _logger.info('Save checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) + + def _on_exit(self): + pass + + def _on_error(self): + pass diff --git a/src/sdk/pynni/nni/platform/local.py b/src/sdk/pynni/nni/platform/local.py index 3dda9c4c57..9a2f596c9d 100644 --- a/src/sdk/pynni/nni/platform/local.py +++ b/src/sdk/pynni/nni/platform/local.py @@ -18,23 +18,41 @@ # OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ================================================================================================== - +import json_tricks import json import os +import time -from ..common import init_logger +from ..common import init_logger, env_args _dir = os.environ['NNI_SYS_DIR'] _metric_file = open(os.path.join(_dir, '.nni', 'metrics'), 'wb') +_param_index = 0 _log_file_path = os.path.join(_dir, 'trial.log') init_logger(_log_file_path) +def _send_request_parameter_metric(): + metric = json_tricks.dumps({ + 'trial_job_id': env_args.trial_job_id, + 'type': 'REQUEST_PARAMETER', + 'sequence': 0, + 'parameter_index': _param_index + }) + send_metric(metric) def get_parameters(): - params_file = open(os.path.join(_dir, 'parameter.cfg'), 'r') - return json.load(params_file) + global _param_index + params_filepath = os.path.join(_dir, 'parameter_{}.cfg'.format(_param_index)) + if not os.path.isfile(params_filepath): + _send_request_parameter_metric() + while not os.path.isfile(params_filepath): + time.sleep(3) + params_file = open(params_filepath, 'r') + params = json.load(params_file) + _param_index += 1 + return params def send_metric(string): data = (string + '\n').encode('utf8') diff --git a/src/sdk/pynni/nni/protocol.py b/src/sdk/pynni/nni/protocol.py index 6c0e71506d..ada5527bfa 100644 --- a/src/sdk/pynni/nni/protocol.py +++ b/src/sdk/pynni/nni/protocol.py @@ -34,6 +34,7 @@ class CommandType(Enum): # out NewTrialJob = b'TR' + SendTrialJobParameter = b'SP' NoMoreTrialJobs = b'NO' KillTrialJob = b'KI' @@ -55,7 +56,7 @@ def send(command, data): data = data.encode('utf8') assert len(data) < 1000000, 'Command too long' msg = b'%b%06d%b' % (command.value, len(data), data) - logging.getLogger(__name__).debug('Sending command, data: [%s]' % data) + logging.getLogger(__name__).debug('Sending command, data: [%s]' % msg) _out_file.write(msg) _out_file.flush() diff --git a/src/sdk/pynni/nni/trial.py b/src/sdk/pynni/nni/trial.py index e5884f8dac..27dc864d23 100644 --- a/src/sdk/pynni/nni/trial.py +++ b/src/sdk/pynni/nni/trial.py @@ -32,11 +32,13 @@ ] -_params = platform.get_parameters() +_params = None def get_parameters(): """Returns a set of (hyper-)paremeters generated by Tuner.""" + global _params + _params = platform.get_parameters() return _params['parameters'] @@ -51,6 +53,7 @@ def report_intermediate_result(metric): metric: serializable object. """ global _intermediate_seq + assert _params is not None, 'nni.get_parameters() needs to be called before report_intermediate_result' metric = json_tricks.dumps({ 'parameter_id': _params['parameter_id'], 'trial_job_id': env_args.trial_job_id, @@ -66,6 +69,7 @@ def report_final_result(metric): """Reports final result to tuner. metric: serializable object. """ + assert _params is not None, 'nni.get_parameters() needs to be called before report_final_result' metric = json_tricks.dumps({ 'parameter_id': _params['parameter_id'], 'trial_job_id': env_args.trial_job_id, diff --git a/src/sdk/pynni/tests/test_multi_phase_tuner.py b/src/sdk/pynni/tests/test_multi_phase_tuner.py new file mode 100644 index 0000000000..72b477999e --- /dev/null +++ b/src/sdk/pynni/tests/test_multi_phase_tuner.py @@ -0,0 +1,89 @@ +import logging +import random +from io import BytesIO + +import nni +import nni.protocol +from nni.protocol import CommandType, send, receive +from nni.multi_phase.multi_phase_tuner import MultiPhaseTuner +from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher + +from unittest import TestCase, main + +class NaiveMultiPhaseTuner(MultiPhaseTuner): + ''' + supports only choices + ''' + def __init__(self): + self.search_space = None + + def generate_parameters(self, parameter_id, trial_job_id=None): + """Returns a set of trial (hyper-)parameters, as a serializable object. + User code must override either this function or 'generate_multiple_parameters()'. + parameter_id: int + """ + generated_parameters = {} + if self.search_space is None: + raise AssertionError('Search space not specified') + for k in self.search_space: + param = self.search_space[k] + if not param['_type'] == 'choice': + raise ValueError('Only choice type is supported') + param_values = param['_value'] + generated_parameters[k] = param_values[random.randint(0, len(param_values)-1)] + logging.getLogger(__name__).debug(generated_parameters) + return generated_parameters + + + def receive_trial_result(self, parameter_id, parameters, reward, trial_job_id): + logging.getLogger(__name__).debug('receive_trial_result: {},{},{},{}'.format(parameter_id, parameters, reward, trial_job_id)) + + def receive_customized_trial_result(self, parameter_id, parameters, reward, trial_job_id): + pass + + def update_search_space(self, search_space): + self.search_space = search_space + + +_in_buf = BytesIO() +_out_buf = BytesIO() + +def _reverse_io(): + _in_buf.seek(0) + _out_buf.seek(0) + nni.protocol._out_file = _in_buf + nni.protocol._in_file = _out_buf + +def _restore_io(): + _in_buf.seek(0) + _out_buf.seek(0) + nni.protocol._in_file = _in_buf + nni.protocol._out_file = _out_buf + +def _test_tuner(): + _reverse_io() # now we are sending to Tuner's incoming stream + send(CommandType.UpdateSearchSpace, "{\"learning_rate\": {\"_value\": [0.0001, 0.001, 0.002, 0.005, 0.01], \"_type\": \"choice\"}, \"optimizer\": {\"_value\": [\"Adam\", \"SGD\"], \"_type\": \"choice\"}}") + send(CommandType.RequestTrialJobs, '2') + send(CommandType.ReportMetricData, '{"parameter_id":0,"type":"PERIODICAL","value":10,"trial_job_id":"abc"}') + send(CommandType.ReportMetricData, '{"parameter_id":1,"type":"FINAL","value":11,"trial_job_id":"abc"}') + send(CommandType.AddCustomizedTrialJob, '{"param":-1}') + send(CommandType.ReportMetricData, '{"parameter_id":2,"type":"FINAL","value":22,"trial_job_id":"abc"}') + send(CommandType.RequestTrialJobs, '1') + send(CommandType.TrialEnd, '{"trial_job_id":"abc"}') + _restore_io() + + tuner = NaiveMultiPhaseTuner() + dispatcher = MultiPhaseMsgDispatcher(tuner) + dispatcher.run() + + _reverse_io() # now we are receiving from Tuner's outgoing stream + + command, data = receive() # this one is customized + print(command, data) + +class MultiPhaseTestCase(TestCase): + def test_tuner(self): + _test_tuner() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/test/naive/run.py b/test/naive/run.py index 239bedcd2c..f54fe7ab71 100644 --- a/test/naive/run.py +++ b/test/naive/run.py @@ -54,7 +54,7 @@ def run(): if trial > current_trial: current_trial = trial print('Trial #%d done' % trial) - + subprocess.run(['nnictl', 'log', 'stderr']) assert tuner_status == 'DONE' and assessor_status == 'DONE', 'Failed to finish in 1 min' ss1 = json.load(open('search_space.json'))