diff --git a/docs/howto_2_CustomizedTuner.md b/docs/howto_2_CustomizedTuner.md index 5b8c65a04b..db002e0cc3 100644 --- a/docs/howto_2_CustomizedTuner.md +++ b/docs/howto_2_CustomizedTuner.md @@ -6,7 +6,7 @@ So, if user want to implement a customized Tuner, she/he only need to: 1) Inherit a tuner of a base Tuner class 2) Implement receive_trial_result and generate_parameter function -3) Write a script to run Tuner +3) Configure your customized tuner in experiment yaml config file Here ia an example: @@ -93,3 +93,6 @@ More detail example you could see: > * [evolution-tuner](../src/sdk/pynni/nni/evolution_tuner) > * [hyperopt-tuner](../src/sdk/pynni/nni/hyperopt_tuner) > * [evolution-based-customized-tuner](../examples/tuners/ga_customer_tuner) + +## Write a more advanced automl algorithm +The methods above are usually enough to write a general tuner. However, users may also want more methods, for example, intermediate results, trials' state (e.g., the methods in assessor), in order to have a more powerful automl algorithm. Therefore, we have another concept called `advisor` which directly inherits from `MsgDispatcherBase` in [`src/sdk/pynni/nni/msg_dispatcher_base.py`](../src/sdk/pynni/nni/msg_dispatcher_base.py). Please refer to [here](howto_3_CustomizedAdvisor) for how to write a customized advisor. diff --git a/docs/howto_3_CustomizedAdvisor.md b/docs/howto_3_CustomizedAdvisor.md new file mode 100644 index 0000000000..400cc5e6e3 --- /dev/null +++ b/docs/howto_3_CustomizedAdvisor.md @@ -0,0 +1,39 @@ +# **How To** - Customize Your Own Advisor + +*Advisor targets the scenario that the automl algorithm wants the methods of both tuner and assessor. Advisor is similar to tuner on that it receives trial configuration request, final results, and generate trial configurations. Also, it is similar to assessor on that it receives intermediate results, trial's end state, and could send trial kill command. Note that, if you use Advisor, tuner and assessor are not allowed to be used at the same time.* + +So, if user want to implement a customized Advisor, she/he only need to: + +1) Define an Advisor inheriting from the MsgDispatcherBase class +2) Implement the methods with prefix `handle_` except `handle_request` +3) Configure your customized Advisor in experiment yaml config file + +Here ia an example: + +**1) Define an Advisor inheriting from the MsgDispatcherBase class** +```python +from nni.msg_dispatcher_base import MsgDispatcherBase + +class CustomizedAdvisor(MsgDispatcherBase): + def __init__(self, ...): + ... +``` + +**2) Implement the methods with prefix `handle_` except `handle_request`** + +Please refer to the implementation of Hyperband ([src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py](../src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py)) for how to implement the methods. + +**3) Configure your customized Advisor in experiment yaml config file** + +Similar to tuner and assessor. NNI needs to locate your customized Advisor class and instantiate the class, so you need to specify the location of the customized Advisor class and pass literal values as parameters to the \_\_init__ constructor. + +```yaml +advisor: + codeDir: /home/abc/myadvisor + classFileName: my_customized_advisor.py + className: CustomizedAdvisor + # Any parameter need to pass to your advisor class __init__ constructor + # can be specified in this optional classArgs field, for example + classArgs: + arg1: value1 +``` diff --git a/examples/trials/mnist-hyperband/config.yml b/examples/trials/mnist-hyperband/config.yml new file mode 100644 index 0000000000..f7e21202f1 --- /dev/null +++ b/examples/trials/mnist-hyperband/config.yml @@ -0,0 +1,25 @@ +authorName: default +experimentName: example_mnist +trialConcurrency: 2 +maxExecDuration: 100h +maxTrialNum: 10000 +#choice: local, remote, pai +trainingServicePlatform: local +searchSpacePath: search_space.json +#choice: true, false +useAnnotation: false +advisor: + #choice: Hyperband + builtinAdvisorName: Hyperband + classArgs: + #R: the maximum STEPS (could be the number of mini-batches or epochs) can be + # allocated to a trial. Each trial should use STEPS to control how long it runs. + R: 100 + #eta: proportion of discarded trials + eta: 3 + #choice: maximize, minimize + optimize_mode: maximize +trial: + command: python3 mnist.py + codeDir: . + gpuNum: 0 diff --git a/examples/trials/mnist-hyperband/config_pai.yml b/examples/trials/mnist-hyperband/config_pai.yml new file mode 100644 index 0000000000..f8032502a7 --- /dev/null +++ b/examples/trials/mnist-hyperband/config_pai.yml @@ -0,0 +1,39 @@ +authorName: default +experimentName: example_mnist_hyperband +maxExecDuration: 1h +maxTrialNum: 10000 +trialConcurrency: 10 +#choice: local, remote, pai +trainingServicePlatform: pai +searchSpacePath: search_space.json +#choice: true, false +useAnnotation: false +advisor: + #choice: Hyperband + builtinAdvisorName: Hyperband + classArgs: + #R: the maximum STEPS + R: 100 + #eta: proportion of discarded trials + eta: 3 + #choice: maximize, minimize + optimize_mode: maximize +trial: + command: python3 mnist.py + codeDir: . + gpuNum: 0 + cpuNum: 1 + memoryMB: 8196 + #The docker image to run nni job on pai + image: openpai/pai.example.tensorflow + #The hdfs directory to store data on pai, format 'hdfs://host:port/directory' + dataDir: hdfs://10.10.10.10:9000/username/nni + #The hdfs directory to store output data generated by nni, format 'hdfs://host:port/directory' + outputDir: hdfs://10.10.10.10:9000/username/nni +paiConfig: + #The username to login pai + userName: username + #The password to login pai + passWord: password + #The host of restful server of pai + host: 10.10.10.10 diff --git a/examples/trials/mnist-hyperband/mnist.py b/examples/trials/mnist-hyperband/mnist.py new file mode 100644 index 0000000000..46df38224a --- /dev/null +++ b/examples/trials/mnist-hyperband/mnist.py @@ -0,0 +1,236 @@ +"""A deep MNIST classifier using convolutional layers.""" + +import logging +import math +import tempfile +import tensorflow as tf + +from tensorflow.examples.tutorials.mnist import input_data + +import nni + +FLAGS = None + +logger = logging.getLogger('mnist_AutoML') + + +class MnistNetwork(object): + ''' + MnistNetwork is for initlizing and building basic network for mnist. + ''' + def __init__(self, + channel_1_num, + channel_2_num, + conv_size, + hidden_size, + pool_size, + learning_rate, + x_dim=784, + y_dim=10): + self.channel_1_num = channel_1_num + self.channel_2_num = channel_2_num + self.conv_size = conv_size + self.hidden_size = hidden_size + self.pool_size = pool_size + self.learning_rate = learning_rate + self.x_dim = x_dim + self.y_dim = y_dim + + self.images = tf.placeholder(tf.float32, [None, self.x_dim], name='input_x') + self.labels = tf.placeholder(tf.float32, [None, self.y_dim], name='input_y') + self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') + + self.train_step = None + self.accuracy = None + + def build_network(self): + ''' + Building network for mnist + ''' + + # Reshape to use within a convolutional neural net. + # Last dimension is for "features" - there is only one here, since images are + # grayscale -- it would be 3 for an RGB image, 4 for RGBA, etc. + with tf.name_scope('reshape'): + try: + input_dim = int(math.sqrt(self.x_dim)) + except: + print( + 'input dim cannot be sqrt and reshape. input dim: ' + str(self.x_dim)) + logger.debug( + 'input dim cannot be sqrt and reshape. input dim: %s', str(self.x_dim)) + raise + x_image = tf.reshape(self.images, [-1, input_dim, input_dim, 1]) + + # First convolutional layer - maps one grayscale image to 32 feature maps. + with tf.name_scope('conv1'): + w_conv1 = weight_variable( + [self.conv_size, self.conv_size, 1, self.channel_1_num]) + b_conv1 = bias_variable([self.channel_1_num]) + h_conv1 = tf.nn.relu(conv2d(x_image, w_conv1) + b_conv1) + + # Pooling layer - downsamples by 2X. + with tf.name_scope('pool1'): + h_pool1 = max_pool(h_conv1, self.pool_size) + + # Second convolutional layer -- maps 32 feature maps to 64. + with tf.name_scope('conv2'): + w_conv2 = weight_variable([self.conv_size, self.conv_size, + self.channel_1_num, self.channel_2_num]) + b_conv2 = bias_variable([self.channel_2_num]) + h_conv2 = tf.nn.relu(conv2d(h_pool1, w_conv2) + b_conv2) + + # Second pooling layer. + with tf.name_scope('pool2'): + h_pool2 = max_pool(h_conv2, self.pool_size) + + # Fully connected layer 1 -- after 2 round of downsampling, our 28x28 image + # is down to 7x7x64 feature maps -- maps this to 1024 features. + last_dim = int(input_dim / (self.pool_size * self.pool_size)) + with tf.name_scope('fc1'): + w_fc1 = weight_variable( + [last_dim * last_dim * self.channel_2_num, self.hidden_size]) + b_fc1 = bias_variable([self.hidden_size]) + + h_pool2_flat = tf.reshape( + h_pool2, [-1, last_dim * last_dim * self.channel_2_num]) + h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, w_fc1) + b_fc1) + + # Dropout - controls the complexity of the model, prevents co-adaptation of features. + with tf.name_scope('dropout'): + h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob) + + # Map the 1024 features to 10 classes, one for each digit + with tf.name_scope('fc2'): + w_fc2 = weight_variable([self.hidden_size, self.y_dim]) + b_fc2 = bias_variable([self.y_dim]) + y_conv = tf.matmul(h_fc1_drop, w_fc2) + b_fc2 + + with tf.name_scope('loss'): + cross_entropy = tf.reduce_mean( + tf.nn.softmax_cross_entropy_with_logits(labels=self.labels, logits=y_conv)) + with tf.name_scope('adam_optimizer'): + self.train_step = tf.train.AdamOptimizer( + self.learning_rate).minimize(cross_entropy) + + with tf.name_scope('accuracy'): + correct_prediction = tf.equal( + tf.argmax(y_conv, 1), tf.argmax(self.labels, 1)) + self.accuracy = tf.reduce_mean( + tf.cast(correct_prediction, tf.float32)) + + +def conv2d(x_input, w_matrix): + """conv2d returns a 2d convolution layer with full stride.""" + return tf.nn.conv2d(x_input, w_matrix, strides=[1, 1, 1, 1], padding='SAME') + + +def max_pool(x_input, pool_size): + """max_pool downsamples a feature map by 2X.""" + return tf.nn.max_pool(x_input, ksize=[1, pool_size, pool_size, 1], + strides=[1, pool_size, pool_size, 1], padding='SAME') + + +def weight_variable(shape): + """weight_variable generates a weight variable of a given shape.""" + initial = tf.truncated_normal(shape, stddev=0.1) + return tf.Variable(initial) + + +def bias_variable(shape): + """bias_variable generates a bias variable of a given shape.""" + initial = tf.constant(0.1, shape=shape) + return tf.Variable(initial) + + +def main(params): + ''' + Main function, build mnist network, run and send result to NNI. + ''' + # Import data + mnist = input_data.read_data_sets(params['data_dir'], one_hot=True) + print('Mnist download data down.') + logger.debug('Mnist download data down.') + + # Create the model + # Build the graph for the deep net + mnist_network = MnistNetwork(channel_1_num=params['channel_1_num'], + channel_2_num=params['channel_2_num'], + conv_size=params['conv_size'], + hidden_size=params['hidden_size'], + pool_size=params['pool_size'], + learning_rate=params['learning_rate']) + mnist_network.build_network() + logger.debug('Mnist build network done.') + + # Write log + graph_location = tempfile.mkdtemp() + logger.debug('Saving graph to: %s', graph_location) + train_writer = tf.summary.FileWriter(graph_location) + train_writer.add_graph(tf.get_default_graph()) + + test_acc = 0.0 + with tf.Session() as sess: + sess.run(tf.global_variables_initializer()) + for i in range(params['batch_num']): + batch = mnist.train.next_batch(params['batch_size']) + mnist_network.train_step.run(feed_dict={mnist_network.images: batch[0], + mnist_network.labels: batch[1], + mnist_network.keep_prob: 1 - params['dropout_rate']} + ) + + if i % 10 == 0: + test_acc = mnist_network.accuracy.eval( + feed_dict={mnist_network.images: mnist.test.images, + mnist_network.labels: mnist.test.labels, + mnist_network.keep_prob: 1.0}) + + nni.report_intermediate_result(test_acc) + logger.debug('test accuracy %g', test_acc) + logger.debug('Pipe send intermediate result done.') + + test_acc = mnist_network.accuracy.eval( + feed_dict={mnist_network.images: mnist.test.images, + mnist_network.labels: mnist.test.labels, + mnist_network.keep_prob: 1.0}) + + nni.report_final_result(test_acc) + logger.debug('Final result is %g', test_acc) + logger.debug('Send final result done.') + + +def generate_default_params(): + ''' + Generate default parameters for mnist network. + ''' + params = { + 'data_dir': '/tmp/tensorflow/mnist/input_data', + 'dropout_rate': 0.5, + 'channel_1_num': 32, + 'channel_2_num': 64, + 'conv_size': 5, + 'pool_size': 2, + 'hidden_size': 1024, + 'learning_rate': 1e-4, + 'batch_size': 32} + return params + + +if __name__ == '__main__': + try: + # get parameters form tuner + RCV_PARAMS = nni.get_next_parameter() + logger.debug(RCV_PARAMS) + # run + params = generate_default_params() + params.update(RCV_PARAMS) + ''' + If you use Hyperband, among the hyperparameters (i.e., key-value pairs) received by a trial, + there is one more key called `STEPS` besides the hyperparameters defined by user. + By using this `STEPS`, the trial can control how long it runs. + ''' + params['batch_num'] = RCV_PARAMS['STEPS'] * 10 + main(params) + except Exception as exception: + logger.exception(exception) + raise diff --git a/examples/trials/mnist-hyperband/search_space.json b/examples/trials/mnist-hyperband/search_space.json new file mode 100644 index 0000000000..540f2708cb --- /dev/null +++ b/examples/trials/mnist-hyperband/search_space.json @@ -0,0 +1,7 @@ +{ + "dropout_rate":{"_type":"uniform","_value":[0.5,0.9]}, + "conv_size":{"_type":"choice","_value":[2,3,5,7]}, + "hidden_size":{"_type":"choice","_value":[124, 512, 1024]}, + "batch_size": {"_type":"choice","_value":[8, 16, 32, 64]}, + "learning_rate":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]} +} diff --git a/pylintrc b/pylintrc index 57ec53011e..928973fee0 100644 --- a/pylintrc +++ b/pylintrc @@ -15,4 +15,4 @@ max-attributes=15 const-naming-style=any disable=duplicate-code, - super-init-not-called \ No newline at end of file + super-init-not-called diff --git a/src/nni_manager/common/manager.ts b/src/nni_manager/common/manager.ts index 0f970a682e..674b177731 100644 --- a/src/nni_manager/common/manager.ts +++ b/src/nni_manager/common/manager.ts @@ -35,7 +35,7 @@ interface ExperimentParams { trainingServicePlatform: string; multiPhase?: boolean; multiThread?: boolean; - tuner: { + tuner?: { className: string; builtinTunerName?: string; codeDir?: string; @@ -53,6 +53,15 @@ interface ExperimentParams { checkpointDir: string; gpuNum?: number; }; + advisor?: { + className: string; + builtinAdvisorName?: string; + codeDir?: string; + classArgs?: any; + classFileName?: string; + checkpointDir: string; + gpuNum?: number; + }; clusterMetaData?: { key: string; value: string; diff --git a/src/nni_manager/common/utils.ts b/src/nni_manager/common/utils.ts index 7784654930..3d4681c841 100644 --- a/src/nni_manager/common/utils.ts +++ b/src/nni_manager/common/utils.ts @@ -47,6 +47,10 @@ function getDefaultDatabaseDir(): string { return path.join(getExperimentRootDir(), 'db'); } +function getCheckpointDir(): string { + return path.join(getExperimentRootDir(), 'checkpoint'); +} + function mkDirP(dirPath: string): Promise { const deferred: Deferred = new Deferred(); fs.exists(dirPath, (exists: boolean) => { @@ -137,7 +141,8 @@ function parseArg(names: string[]): string { } /** - * Generate command line to start advisor process which runs tuner and assessor + * Generate command line to start automl algorithm(s), + * either start advisor or start a process which runs tuner and assessor * @param tuner : For builtin tuner: * { * className: 'EvolutionTuner' @@ -158,10 +163,18 @@ function parseArg(names: string[]): string { * } * * @param assessor: similiar as tuner + * @param advisor: similar as tuner * */ -function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean = false, multiThread: boolean = false): string { - let command: string = `python3 -m nni --tuner_class_name ${tuner.className}`; +function getMsgDispatcherCommand(tuner: any, assessor: any, advisor: any, multiPhase: boolean = false, multiThread: boolean = false): string { + if ((tuner || assessor) && advisor) { + throw new Error('Error: specify both tuner/assessor and advisor is not allowed'); + } + if (!tuner && !advisor) { + throw new Error('Error: specify neither tuner nor advisor is not allowed'); + } + + let command: string = `python3 -m nni`; if (multiPhase) { command += ' --multi_phase'; } @@ -170,30 +183,40 @@ function getMsgDispatcherCommand(tuner: any, assessor: any, multiPhase: boolean command += ' --multi_thread'; } - if (tuner.classArgs !== undefined) { - command += ` --tuner_args ${JSON.stringify(JSON.stringify(tuner.classArgs))}`; - } - - if (tuner.codeDir !== undefined && tuner.codeDir.length > 1) { - command += ` --tuner_directory ${tuner.codeDir}`; - } - - if (tuner.classFileName !== undefined && tuner.classFileName.length > 1) { - command += ` --tuner_class_filename ${tuner.classFileName}`; - } - - if (assessor !== undefined && assessor.className !== undefined) { - command += ` --assessor_class_name ${assessor.className}`; - if (assessor.classArgs !== undefined) { - command += ` --assessor_args ${JSON.stringify(JSON.stringify(assessor.classArgs))}`; + if (advisor) { + command += ` --advisor_class_name ${advisor.className}`; + if (advisor.classArgs !== undefined) { + command += ` --advisor_args ${JSON.stringify(JSON.stringify(advisor.classArgs))}`; } - - if (assessor.codeDir !== undefined && assessor.codeDir.length > 1) { - command += ` --assessor_directory ${assessor.codeDir}`; + if (advisor.codeDir !== undefined && advisor.codeDir.length > 1) { + command += ` --advisor_directory ${advisor.codeDir}`; + } + if (advisor.classFileName !== undefined && advisor.classFileName.length > 1) { + command += ` --advisor_class_filename ${advisor.classFileName}`; + } + } else { + command += ` --tuner_class_name ${tuner.className}`; + if (tuner.classArgs !== undefined) { + command += ` --tuner_args ${JSON.stringify(JSON.stringify(tuner.classArgs))}`; + } + if (tuner.codeDir !== undefined && tuner.codeDir.length > 1) { + command += ` --tuner_directory ${tuner.codeDir}`; + } + if (tuner.classFileName !== undefined && tuner.classFileName.length > 1) { + command += ` --tuner_class_filename ${tuner.classFileName}`; } - if (assessor.classFileName !== undefined && assessor.classFileName.length > 1) { - command += ` --assessor_class_filename ${assessor.classFileName}`; + if (assessor !== undefined && assessor.className !== undefined) { + command += ` --assessor_class_name ${assessor.className}`; + if (assessor.classArgs !== undefined) { + command += ` --assessor_args ${JSON.stringify(JSON.stringify(assessor.classArgs))}`; + } + if (assessor.codeDir !== undefined && assessor.codeDir.length > 1) { + command += ` --assessor_directory ${assessor.codeDir}`; + } + if (assessor.classFileName !== undefined && assessor.classFileName.length > 1) { + command += ` --assessor_class_filename ${assessor.classFileName}`; + } } } @@ -321,6 +344,6 @@ function countFilesRecursively(directory: string, timeoutMilliSeconds?: number): }); } -export {countFilesRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand, +export {countFilesRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand, getCheckpointDir, getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address, mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect }; diff --git a/src/nni_manager/core/nnimanager.ts b/src/nni_manager/core/nnimanager.ts index a09b418f6e..ef6906483b 100644 --- a/src/nni_manager/core/nnimanager.ts +++ b/src/nni_manager/core/nnimanager.ts @@ -35,7 +35,7 @@ import { import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus } from '../common/trainingService'; -import { delay, getLogDir, getMsgDispatcherCommand } from '../common/utils'; +import { delay , getLogDir, getCheckpointDir, getMsgDispatcherCommand, mkDirP} from '../common/utils'; import { ADD_CUSTOMIZED_TRIAL_JOB, INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE @@ -127,15 +127,15 @@ class NNIManager implements Manager { this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString()); } - const dispatcherCommand: string = getMsgDispatcherCommand( - expParams.tuner, expParams.assessor, expParams.multiPhase, expParams.multiThread); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor, + expParams.multiPhase, expParams.multiThread); this.log.debug(`dispatcher command: ${dispatcherCommand}`); + const checkpointDir: string = await this.createCheckpointDir(); this.setupTuner( - //expParams.tuner.tunerCommand, dispatcherCommand, undefined, 'start', - expParams.tuner.checkpointDir); + checkpointDir); this.experimentProfile.startTime = Date.now(); this.status.status = 'EXPERIMENT_RUNNING'; @@ -160,14 +160,15 @@ class NNIManager implements Manager { this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString()); } - const dispatcherCommand: string = getMsgDispatcherCommand( - expParams.tuner, expParams.assessor, expParams.multiPhase, expParams.multiThread); + const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor, + expParams.multiPhase, expParams.multiThread); this.log.debug(`dispatcher command: ${dispatcherCommand}`); + const checkpointDir: string = await this.createCheckpointDir(); this.setupTuner( dispatcherCommand, undefined, 'resume', - expParams.tuner.checkpointDir); + checkpointDir); const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs(); @@ -371,13 +372,22 @@ class NNIManager implements Manager { this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail)); await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail); } + let hyperParams: string | undefined = undefined; switch (trialJobDetail.status) { case 'SUCCEEDED': case 'USER_CANCELED': case 'EARLY_STOPPED': this.trialJobs.delete(trialJobId); finishedTrialJobNum++; - this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status})); + if (trialJobDetail.form.jobType === 'TRIAL') { + hyperParams = (trialJobDetail.form).hyperParameters.value; + } else { + throw new Error('Error: jobType error, not TRIAL'); + } + this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({ + trial_job_id: trialJobDetail.id, + event: trialJobDetail.status, + hyper_params: hyperParams })); break; case 'FAILED': case 'SYS_CANCELED': @@ -385,7 +395,15 @@ class NNIManager implements Manager { // TO DO: push this job to queue for retry this.trialJobs.delete(trialJobId); finishedTrialJobNum++; - this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: trialJobDetail.status})); + if (trialJobDetail.form.jobType === 'TRIAL') { + hyperParams = (trialJobDetail.form).hyperParameters.value; + } else { + throw new Error('Error: jobType error, not TRIAL'); + } + this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({ + trial_job_id: trialJobDetail.id, + event: trialJobDetail.status, + hyper_params: hyperParams})); break; case 'WAITING': case 'RUNNING': @@ -642,16 +660,30 @@ class NNIManager implements Manager { maxExecDuration: 0, // unit: second maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs trainingServicePlatform: '', - searchSpace: '', - tuner: { - className: '', - classArgs: {}, - checkpointDir: '' - } + searchSpace: '' } }; } + private async createCheckpointDir(): Promise { + // TODO: test + const chkpDir: string = getCheckpointDir(); + // create checkpoint directory + await mkDirP(chkpDir); + // assign this directory to exp profile's checkpointDir + if (this.experimentProfile.params.advisor) { + this.experimentProfile.params.advisor.checkpointDir = chkpDir; + } + if (this.experimentProfile.params.tuner) { + this.experimentProfile.params.tuner.checkpointDir = chkpDir; + } + if (this.experimentProfile.params.assessor) { + this.experimentProfile.params.assessor.checkpointDir = chkpDir; + } + + return Promise.resolve(chkpDir); + } + private async storeMaxSequenceId(sequenceId: number): Promise { if (sequenceId > this.experimentProfile.maxSequenceId) { this.experimentProfile.maxSequenceId = sequenceId; diff --git a/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts b/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts index d823795528..5a6ed7bfa5 100644 --- a/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts +++ b/src/nni_manager/core/test/ipcInterfaceTerminate.test.ts @@ -46,7 +46,9 @@ function startProcess(): void { className: 'DummyAssessor', codeDir: './', classFileName: 'dummy_assessor.py' - } + }, + // advisor + undefined ); const proc: ChildProcess = spawn(dispatcherCmd, [], { stdio, cwd: 'core/test', shell: true }); diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index f48829739e..31bd2a3013 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -99,6 +99,15 @@ export namespace ValidationSchemas { maxExecDuration: joi.number().min(0).required(), multiPhase: joi.boolean(), multiThread: joi.boolean(), + advisor: joi.object({ + builtinAdvisorName: joi.string().valid('Hyperband'), + codeDir: joi.string(), + classFileName: joi.string(), + className: joi.string(), + classArgs:joi.any(), + gpuNum: joi.number().min(0), + checkpointDir: joi.string() + }), tuner: joi.object({ builtinTunerName: joi.string().valid('TPE', 'Random', 'Anneal', 'Evolution', 'SMAC', 'BatchTuner', 'GridSearch'), codeDir: joi.string(), @@ -107,7 +116,7 @@ export namespace ValidationSchemas { classArgs: joi.any(), gpuNum: joi.number().min(0), checkpointDir: joi.string() - }).required(), + }), assessor: joi.object({ builtinAssessorName: joi.string().valid('Medianstop'), codeDir: joi.string(), diff --git a/src/sdk/pynni/nni/README.md b/src/sdk/pynni/nni/README.md index 259edf2f77..68d00d3e42 100644 --- a/src/sdk/pynni/nni/README.md +++ b/src/sdk/pynni/nni/README.md @@ -6,10 +6,10 @@ For now, NNI has supported the following tuner algorithms. Note that NNI install - Random Search - Anneal - Naive Evolution - - Grid Search - SMAC (to install through `nnictl`) - - ENAS (ongoing) - - Batch (ongoing) + - Batch + - Grid Search + - Hyperband ## 1. Tuner algorithm introduction @@ -57,6 +57,10 @@ Note that the only acceptable types of search space are 'quniform', 'qloguniform * Type 'quniform' will receive three values [low, high, q], where [low, high] specifies a range and 'q' specifies the number of values that will be sampled evenly. It will be sampled in a way that the first sampled value is 'low', and each of the following values is (high-low)/q larger that the value in front of it. * Type 'qloguniform' behaves like 'quniform' except that it will first change the range to [log10(low), log10(high)] and sample and then change the sampled value back. +**Hyperband** + +[Hyperband][6] tries to use limited resource to explore as many configurations as possible, and finds out the promising ones to get the final result. The basic idea is generating many configurations and to run them for small number of STEPs to find out promising one, then further training those promising ones to select several more promising one. More detail can be refered to [here](hyperband_advisor/README.md) + ## 2. How to use the tuner algorithm in NNI? User only need to do one thing: choose a Tuner```config.yaml```. @@ -86,3 +90,4 @@ There are two filed you need to set: [3]: https://arxiv.org/pdf/1703.01041.pdf [4]: https://www.cs.ubc.ca/~hutter/papers/10-TR-SMAC.pdf [5]: https://github.com/automl/SMAC3 + [6]: https://arxiv.org/pdf/1603.06560.pdf \ No newline at end of file diff --git a/src/sdk/pynni/nni/__main__.py b/src/sdk/pynni/nni/__main__.py index 27b1994a2f..d69baa195b 100644 --- a/src/sdk/pynni/nni/__main__.py +++ b/src/sdk/pynni/nni/__main__.py @@ -27,7 +27,7 @@ import json import importlib -from .constants import ModuleName, ClassName, ClassArgs +from .constants import ModuleName, ClassName, ClassArgs, AdvisorModuleName, AdvisorClassName from nni.common import enable_multi_thread from nni.msg_dispatcher import MsgDispatcher from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher @@ -41,12 +41,19 @@ def augment_classargs(input_class_args, classname): input_class_args[key] = value return input_class_args -def create_builtin_class_instance(classname, jsonstr_args): - if classname not in ModuleName or \ - importlib.util.find_spec(ModuleName[classname]) is None: - raise RuntimeError('Tuner module is not found: {}'.format(classname)) - class_module = importlib.import_module(ModuleName[classname]) - class_constructor = getattr(class_module, ClassName[classname]) +def create_builtin_class_instance(classname, jsonstr_args, is_advisor = False): + if is_advisor: + if classname not in AdvisorModuleName or \ + importlib.util.find_spec(AdvisorModuleName[classname]) is None: + raise RuntimeError('Advisor module is not found: {}'.format(classname)) + class_module = importlib.import_module(AdvisorModuleName[classname]) + class_constructor = getattr(class_module, AdvisorClassName[classname]) + else: + if classname not in ModuleName or \ + importlib.util.find_spec(ModuleName[classname]) is None: + raise RuntimeError('Tuner module is not found: {}'.format(classname)) + class_module = importlib.import_module(ModuleName[classname]) + class_constructor = getattr(class_module, ClassName[classname]) if jsonstr_args: class_args = augment_classargs(json.loads(jsonstr_args), classname) else: @@ -62,7 +69,7 @@ def create_customized_class_instance(class_dir, class_filename, classname, jsons raise ValueError('Class file not found: {}'.format( os.path.join(class_dir, class_filename))) sys.path.append(class_dir) - module_name = class_filename.split('.')[0] + module_name = os.path.splitext(class_filename)[0] class_module = importlib.import_module(module_name) class_constructor = getattr(class_module, classname) if jsonstr_args: @@ -74,7 +81,16 @@ def create_customized_class_instance(class_dir, class_filename, classname, jsons def parse_args(): parser = argparse.ArgumentParser(description='parse command line parameters.') - parser.add_argument('--tuner_class_name', type=str, required=True, + parser.add_argument('--advisor_class_name', type=str, required=False, + help='Advisor class name, the class must be a subclass of nni.MsgDispatcherBase') + parser.add_argument('--advisor_class_filename', type=str, required=False, + help='Advisor class file path') + parser.add_argument('--advisor_args', type=str, required=False, + help='Parameters pass to advisor __init__ constructor') + parser.add_argument('--advisor_directory', type=str, required=False, + help='Advisor directory') + + parser.add_argument('--tuner_class_name', type=str, required=False, help='Tuner class name, the class must be a subclass of nni.Tuner') parser.add_argument('--tuner_class_filename', type=str, required=False, help='Tuner class file path') @@ -91,6 +107,7 @@ def parse_args(): help='Assessor directory') parser.add_argument('--assessor_class_filename', type=str, required=False, help='Assessor class file path') + parser.add_argument('--multi_phase', action='store_true') parser.add_argument('--multi_thread', action='store_true') @@ -106,53 +123,75 @@ def main(): if args.multi_thread: enable_multi_thread() - tuner = None - assessor = None - - if args.tuner_class_name in ModuleName: - tuner = create_builtin_class_instance( - args.tuner_class_name, - args.tuner_args) - else: - tuner = create_customized_class_instance( - args.tuner_directory, - args.tuner_class_filename, - args.tuner_class_name, - args.tuner_args) - - if tuner is None: - raise AssertionError('Failed to create Tuner instance') - - if args.assessor_class_name: - if args.assessor_class_name in ModuleName: - assessor = create_builtin_class_instance( - args.assessor_class_name, - args.assessor_args) + if args.advisor_class_name: + # advisor is enabled and starts to run + if args.multi_phase: + raise AssertionError('multi_phase has not been supported in advisor') + if args.advisor_class_name in AdvisorModuleName: + dispatcher = create_builtin_class_instance( + args.advisor_class_name, + args.advisor_args, True) else: - assessor = create_customized_class_instance( - args.assessor_directory, - args.assessor_class_filename, - args.assessor_class_name, - args.assessor_args) - if assessor is None: - raise AssertionError('Failed to create Assessor instance') - - if args.multi_phase: - dispatcher = MultiPhaseMsgDispatcher(tuner, assessor) + dispatcher = create_customized_class_instance( + args.advisor_directory, + args.advisor_class_filename, + args.advisor_class_name, + args.advisor_args) + if dispatcher is None: + raise AssertionError('Failed to create Advisor instance') + try: + dispatcher.run() + except Exception as exception: + logger.exception(exception) + raise else: - dispatcher = MsgDispatcher(tuner, assessor) - - try: - dispatcher.run() - tuner._on_exit() - if assessor is not None: - assessor._on_exit() - except Exception as exception: - logger.exception(exception) - tuner._on_error() - if assessor is not None: - assessor._on_error() - raise + # tuner (and assessor) is enabled and starts to run + tuner = None + assessor = None + if args.tuner_class_name in ModuleName: + tuner = create_builtin_class_instance( + args.tuner_class_name, + args.tuner_args) + else: + tuner = create_customized_class_instance( + args.tuner_directory, + args.tuner_class_filename, + args.tuner_class_name, + args.tuner_args) + + if tuner is None: + raise AssertionError('Failed to create Tuner instance') + + if args.assessor_class_name: + if args.assessor_class_name in ModuleName: + assessor = create_builtin_class_instance( + args.assessor_class_name, + args.assessor_args) + else: + assessor = create_customized_class_instance( + args.assessor_directory, + args.assessor_class_filename, + args.assessor_class_name, + args.assessor_args) + if assessor is None: + raise AssertionError('Failed to create Assessor instance') + + if args.multi_phase: + dispatcher = MultiPhaseMsgDispatcher(tuner, assessor) + else: + dispatcher = MsgDispatcher(tuner, assessor) + + try: + dispatcher.run() + tuner._on_exit() + if assessor is not None: + assessor._on_exit() + except Exception as exception: + logger.exception(exception) + tuner._on_error() + if assessor is not None: + assessor._on_error() + raise if __name__ == '__main__': try: diff --git a/src/sdk/pynni/nni/constants.py b/src/sdk/pynni/nni/constants.py index 5eb27b762d..8d73108884 100644 --- a/src/sdk/pynni/nni/constants.py +++ b/src/sdk/pynni/nni/constants.py @@ -53,3 +53,11 @@ 'algorithm_name': 'anneal' } } + +AdvisorModuleName = { + 'Hyperband': 'nni.hyperband_advisor.hyperband_advisor' +} + +AdvisorClassName = { + 'Hyperband': 'Hyperband' +} \ No newline at end of file diff --git a/src/sdk/pynni/nni/hyperband_advisor/README.md b/src/sdk/pynni/nni/hyperband_advisor/README.md new file mode 100644 index 0000000000..2c5363134e --- /dev/null +++ b/src/sdk/pynni/nni/hyperband_advisor/README.md @@ -0,0 +1,54 @@ +Hyperband on nni +=== + +## 1. Introduction +[Hyperband][1] is a popular automl algorithm. The basic idea of Hyperband is that it creates several brackets, each bracket has `n` randomly generated hyperparameter configurations, each configuration uses `r` resource (e.g., epoch number, batch number). After the `n` configurations is finished, it chooses top `n/eta` configurations and runs them using increased `r*eta` resource. At last, it chooses the best configuration it has found so far. + +## 2. Implementation with fully parallelism +Frist, this is an example of how to write an automl algorithm based on MsgDispatcherBase, rather than Tuner and Assessor. Hyperband is implemented in this way because it integrates the functions of both Tuner and Assessor, thus, we call it advisor. + +Second, this implementation fully leverages Hyperband's internal parallelism. More specifically, the next bracket is not started strictly after the current bracket, instead, it starts when there is available resource. + +## 3. Usage +To use Hyperband, you should add the following spec in your experiment's yaml config file: + +``` +advisor: + #choice: Hyperband + builtinAdvisorName: Hyperband + classArgs: + #R: the maximum STEPS + R: 100 + #eta: proportion of discarded trials + eta: 3 + #choice: maximize, minimize + optimize_mode: maximize +``` + +Note that once you use advisor, it is not allowed to add tuner and assessor spec in the config file any more. +If you use Hyperband, among the hyperparameters (i.e., key-value pairs) received by a trial, there is one more key called `STEPS` besides the hyperparameters defined by user. By using this `STEPS`, the trial can control how long it runs. + +`R` and `eta` are the parameters of Hyperband that you can change. `R` means the maximum STEPS that can be allocated to a configuration. Here, STEPS could mean the number of epochs or mini-batches. This `STEPS` should be used by the trial to control how long it runs. Refer to the example under `examples/trials/mnist-hyperband/` for details. + +`eta` means `n/eta` configurations from `n` configurations will survive and rerun using more STEPS. + +Here is a concrete example of `R=81` and `eta=3`: +| | s=4 | s=3 | s=2 | s=1 | s=0 | +|------|-----|-----|-----|-----|-----| +|i | n r | n r | n r | n r | n r | +|0 |81 1 |27 3 |9 9 |6 27 |5 81 | +|1 |27 3 |9 9 |3 27 |2 81 | | +|2 |9 9 |3 27 |1 81 | | | +|3 |3 27 |1 81 | | | | +|4 |1 81 | | | | | + +`s` means bracket, `n` means the number of configurations that are generated, the corresponding `r` means how many STEPS these configurations run. `i` means round, for example, bracket 4 has 5 rounds, bracket 3 has 4 rounds. + +About how to write trial code, please refer to the instructions under `examples/trials/mnist-hyperband/`. + +## 4. To be improved +The current implementation of Hyperband can be further improved by supporting simple early stop algorithm, because it is possible that not all the configurations in the top `n/eta` perform good. The unpromising configurations can be stopped early. + +In the current implementation, configurations are generated randomly, which follows the design in the [paper][1]. To further improve, configurations could be generated more wisely by leveraging advanced algorithms. + +[1]: https://arxiv.org/pdf/1603.06560.pdf diff --git a/src/sdk/pynni/nni/hyperband_advisor/__init__.py b/src/sdk/pynni/nni/hyperband_advisor/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py b/src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py new file mode 100644 index 0000000000..51aa7831c5 --- /dev/null +++ b/src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py @@ -0,0 +1,360 @@ +# Copyright (c) Microsoft Corporation +# All rights reserved. +# +# MIT License +# +# Permission is hereby granted, free of charge, +# to any person obtaining a copy of this software and associated +# documentation files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and +# to permit persons to whom the Software is furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING +# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +''' +hyperband_tuner.py +''' + +from enum import Enum, unique +import math +import copy +import logging +import numpy as np +import json_tricks + +from nni.protocol import CommandType, send +from nni.msg_dispatcher_base import MsgDispatcherBase +from nni.common import init_logger +from .. import parameter_expressions + +_logger = logging.getLogger(__name__) + +_next_parameter_id = 0 +_KEY = 'STEPS' + +@unique +class OptimizeMode(Enum): + ''' + Oprimize Mode class + ''' + Minimize = 'minimize' + Maximize = 'maximize' + +def create_parameter_id(): + ''' + Create an id + ''' + global _next_parameter_id # pylint: disable=global-statement + _next_parameter_id += 1 + return _next_parameter_id - 1 + +def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-1): + ''' + Create a full id for a specific bracket's hyperparameter configuration + ''' + if increased_id == -1: + increased_id = str(create_parameter_id()) + params_id = '_'.join([str(brackets_id), + str(brackets_curr_decay), + increased_id]) + return params_id + +def json2paramater(ss_spec, random_state): + ''' + Randomly generate values for hyperparameters from hyperparameter space i.e., x. + ss_spec: hyperparameter space + random_state: random operator to generate random values + ''' + if isinstance(ss_spec, dict): + if '_type' in ss_spec.keys(): + _type = ss_spec['_type'] + _value = ss_spec['_value'] + if _type == 'choice': + _index = random_state.randint(len(_value)) + chosen_params = json2paramater(ss_spec['_value'][_index], random_state) + else: + chosen_params = eval('parameter_expressions.' + # pylint: disable=eval-used + _type)(*(_value + [random_state])) + else: + chosen_params = dict() + for key in ss_spec.keys(): + chosen_params[key] = json2paramater(ss_spec[key], random_state) + elif isinstance(ss_spec, list): + chosen_params = list() + for _, subspec in enumerate(ss_spec): + chosen_params.append(json2paramater(subspec, random_state)) + else: + chosen_params = copy.deepcopy(ss_spec) + return chosen_params + +class Bracket(): + ''' + A bracket in Hyperband, all the information of a bracket is managed by an instance of this class + ''' + def __init__(self, s, s_max, eta, R, optimize_mode): + self.bracket_id = s + self.s_max = s_max + self.eta = eta + self.n = math.ceil((s_max + 1) * (eta**s) / (s + 1)) # pylint: disable=invalid-name + self.r = math.ceil(R / eta**s) # pylint: disable=invalid-name + self.i = 0 + self.hyper_configs = [] # [ {id: params}, {}, ... ] + self.configs_perf = [] # [ {id: [seq, acc]}, {}, ... ] + self.num_configs_to_run = [] # [ n, n, n, ... ] + self.num_finished_configs = [] # [ n, n, n, ... ] + self.optimize_mode = optimize_mode + self.no_more_trial = False + + def is_completed(self): + ''' + check whether this bracket has sent out all the hyperparameter configurations + ''' + return self.no_more_trial + + def get_n_r(self): + ''' + return the values of n and r for the next round + ''' + return math.floor(self.n / self.eta**self.i), self.r * self.eta**self.i + + def increase_i(self): + ''' + i means the ith round. Increase i by 1 + ''' + self.i += 1 + if self.i > self.bracket_id: + self.no_more_trial = True + + def set_config_perf(self, i, parameter_id, seq, value): + ''' + update trial's latest result with its sequence number, e.g., epoch number or batch number + i: the ith round + parameter_id: the id of the trial/parameter + seq: sequence number, e.g., epoch number or batch number + value: latest result with sequence number seq + ''' + if parameter_id in self.configs_perf[i]: + # this should always be true if there is no retry in training service + _logger.debug('assertion: %d %d, %s %s\n', + self.configs_perf[i][parameter_id][0], + seq, + str(type(self.configs_perf[i][parameter_id][0])), + str(type(seq))) + # assert self.configs_perf[i][parameter_id][0] < seq + if self.configs_perf[i][parameter_id][0] < seq: + self.configs_perf[i][parameter_id] = [seq, value] + else: + self.configs_perf[i][parameter_id] = [seq, value] + + + def inform_trial_end(self, i): + ''' + If the trial is finished and the corresponding round (i.e., i) has all its trials finished, + it will choose the top k trials for the next round (i.e., i+1) + ''' + global _KEY # pylint: disable=global-statement + self.num_finished_configs[i] += 1 + _logger.debug('bracket id: %d, round: %d %d, finished: %d, all: %d', self.bracket_id, self.i, i, self.num_finished_configs[i], self.num_configs_to_run[i]) + if self.num_finished_configs[i] >= self.num_configs_to_run[i] \ + and self.no_more_trial is False: + # choose candidate configs from finished configs to run in the next round + assert self.i == i + 1 + this_round_perf = self.configs_perf[i] + if self.optimize_mode is OptimizeMode.Maximize: + sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1], reverse=True) # reverse + else: + sorted_perf = sorted(this_round_perf.items(), key=lambda kv: kv[1][1]) + _logger.debug('bracket %s next round %s, sorted hyper configs: %s', self.bracket_id, self.i, sorted_perf) + next_n, next_r = self.get_n_r() + _logger.debug('bracket %s next round %s, next_n=%d, next_r=%d', self.bracket_id, self.i, next_n, next_r) + hyper_configs = dict() + for k in range(next_n): + params_id = sorted_perf[k][0] + params = self.hyper_configs[i][params_id] + params[_KEY] = next_r # modify r + # generate new id + increased_id = params_id.split('_')[-1] + new_id = create_bracket_parameter_id(self.bracket_id, self.i, increased_id) + hyper_configs[new_id] = params + self._record_hyper_configs(hyper_configs) + return [[key, value] for key, value in hyper_configs.items()] + return None + + def get_hyperparameter_configurations(self, num, r, searchspace_json, random_state): # pylint: disable=invalid-name + ''' + Randomly generate num hyperparameter configurations from search space + num: the number of hyperparameter configurations + ''' + global _KEY # pylint: disable=global-statement + assert self.i == 0 + hyperparameter_configs = dict() + for _ in range(num): + params_id = create_bracket_parameter_id(self.bracket_id, self.i) + params = json2paramater(searchspace_json, random_state) + params[_KEY] = r + hyperparameter_configs[params_id] = params + self._record_hyper_configs(hyperparameter_configs) + return [[key, value] for key, value in hyperparameter_configs.items()] + + def _record_hyper_configs(self, hyper_configs): + ''' + after generating one round of hyperconfigs, this function records the generated hyperconfigs, + creates a dict to record the performance when those hyperconifgs are running, set the number of finished configs + in this round to be 0, and increase the round number. + ''' + self.hyper_configs.append(hyper_configs) + self.configs_perf.append(dict()) + self.num_finished_configs.append(0) + self.num_configs_to_run.append(len(hyper_configs)) + self.increase_i() + + +class Hyperband(MsgDispatcherBase): + ''' + Hyperband inherit from MsgDispatcherBase rather than Tuner, + because it integrates both tuner's functions and assessor's functions. + This is an implementation that could fully leverage available resources, i.e., high parallelism. + A single execution of Hyperband takes a finite budget of (s_max + 1)B. + ''' + def __init__(self, R, eta=3, optimize_mode='maximize'): + ''' + R: the maximum amount of resource that can be allocated to a single configuration + eta: the variable that controls the proportion of configurations discarded in each round of SuccessiveHalving + B = (s_max + 1)R + ''' + super() + self.R = R # pylint: disable=invalid-name + self.eta = eta + self.brackets = dict() # dict of Bracket + self.generated_hyper_configs = [] # all the configs waiting for run + self.completed_hyper_configs = [] # all the completed configs + self.s_max = math.floor(math.log(self.R, self.eta)) + self.curr_s = self.s_max + + self.searchspace_json = None + self.random_state = None + self.optimize_mode = OptimizeMode(optimize_mode) + + # This is for the case that nnimanager requests trial config, but tuner cannot provide immediately. + # In this case, tuner increases self.credit to issue a trial config sometime later. + self.credit = 0 + + def load_checkpoint(self): + pass + + def save_checkpont(self): + pass + + def handle_initialize(self, data): + ''' + data is search space + ''' + self.handle_update_search_space(data) + send(CommandType.Initialized, '') + return True + + def handle_request_trial_jobs(self, data): + ''' + data: number of trial jobs + ''' + for _ in range(data): + self._request_one_trial_job() + + return True + + def _request_one_trial_job(self): + ''' + get one trial job, i.e., one hyperparameter configuration. + ''' + if not self.generated_hyper_configs: + if self.curr_s < 0: + # have tried all configurations + ret = { + 'parameter_id': '-1_0_0', + 'parameter_source': 'algorithm', + 'parameters': '' + } + send(CommandType.NoMoreTrialJobs, json_tricks.dumps(ret)) + self.credit += 1 + return True + _logger.debug('create a new bracket, self.curr_s=%d', self.curr_s) + self.brackets[self.curr_s] = Bracket(self.curr_s, self.s_max, self.eta, self.R, self.optimize_mode) + next_n, next_r = self.brackets[self.curr_s].get_n_r() + _logger.debug('new bracket, next_n=%d, next_r=%d', next_n, next_r) + assert self.searchspace_json is not None and self.random_state is not None + generated_hyper_configs = self.brackets[self.curr_s].get_hyperparameter_configurations(next_n, next_r, + self.searchspace_json, + self.random_state) + self.generated_hyper_configs = generated_hyper_configs.copy() + self.curr_s -= 1 + + assert self.generated_hyper_configs + params = self.generated_hyper_configs.pop() + ret = { + 'parameter_id': params[0], + 'parameter_source': 'algorithm', + 'parameters': params[1] + } + send(CommandType.NewTrialJob, json_tricks.dumps(ret)) + + return True + + def handle_update_search_space(self, data): + ''' + data: JSON object, which is search space + ''' + self.searchspace_json = data + self.random_state = np.random.RandomState() + + return True + + def handle_trial_end(self, data): + ''' + data: it has three keys: trial_job_id, event, hyper_params + trial_job_id: the id generated by training service + event: the job's state + hyper_params: the hyperparameters (a string) generated and returned by tuner + ''' + hyper_params = json_tricks.loads(data['hyper_params']) + bracket_id, i, _ = hyper_params['parameter_id'].split('_') + hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i)) + if hyper_configs is not None: + _logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs) + self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs + for _ in range(self.credit): + if not self.generated_hyper_configs: + break + params = self.generated_hyper_configs.pop() + ret = { + 'parameter_id': params[0], + 'parameter_source': 'algorithm', + 'parameters': params[1] + } + send(CommandType.NewTrialJob, json_tricks.dumps(ret)) + self.credit -= 1 + + return True + + def handle_report_metric_data(self, data): + ''' + data: it is an object which has keys 'parameter_id', 'value', 'trial_job_id', 'type', 'sequence'. + ''' + if data['type'] == 'FINAL': + self.completed_hyper_configs.append(data) + elif data['type'] == 'PERIODICAL': + bracket_id, i, _ = data['parameter_id'].split('_') + bracket_id = int(bracket_id) + self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], data['value']) + else: + raise ValueError('Data type not supported: {}'.format(data['type'])) + + return True + + def handle_add_customized_trial(self, data): + pass diff --git a/src/sdk/pynni/nni/hyperband_advisor/requirements.txt b/src/sdk/pynni/nni/hyperband_advisor/requirements.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/sdk/pynni/nni/msg_dispatcher.py b/src/sdk/pynni/nni/msg_dispatcher.py index 3f2ecc56dd..4275e58e7e 100644 --- a/src/sdk/pynni/nni/msg_dispatcher.py +++ b/src/sdk/pynni/nni/msg_dispatcher.py @@ -142,6 +142,12 @@ def handle_report_metric_data(self, data): return True def handle_trial_end(self, data): + """ + data: it has three keys: trial_job_id, event, hyper_params + trial_job_id: the id generated by training service + event: the job's state + hyper_params: the hyperparameters generated and returned by tuner + """ trial_job_id = data['trial_job_id'] _ended_trials.add(trial_job_id) if trial_job_id in _trial_history: diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index db93b722e3..ea0d767ef5 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -34,7 +34,22 @@ Optional('multiThread'): bool, Optional('nniManagerIp'): str, 'useAnnotation': bool, -'tuner': Or({ +Optional('advisor'): Or({ + 'builtinAdvisorName': Or('Hyperband'), + 'classArgs': { + 'optimize_mode': Or('maximize', 'minimize'), + Optional('R'): int, + Optional('eta'): int + }, + Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), +},{ + 'codeDir': os.path.exists, + 'classFileName': str, + 'className': str, + Optional('classArgs'): dict, + Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999), +}), +Optional('tuner'): Or({ 'builtinTunerName': Or('TPE', 'Random', 'Anneal', 'SMAC', 'Evolution'), Optional('classArgs'): { 'optimize_mode': Or('maximize', 'minimize') diff --git a/tools/nni_cmd/launcher.py b/tools/nni_cmd/launcher.py index 468eff3fbf..7fa5f8b974 100644 --- a/tools/nni_cmd/launcher.py +++ b/tools/nni_cmd/launcher.py @@ -203,9 +203,12 @@ def set_experiment(experiment_config, mode, port, config_file_name): request_data['description'] = experiment_config['description'] if experiment_config.get('multiPhase'): request_data['multiPhase'] = experiment_config.get('multiPhase') - request_data['tuner'] = experiment_config['tuner'] - if 'assessor' in experiment_config: - request_data['assessor'] = experiment_config['assessor'] + if experiment_config.get('advisor'): + request_data['advisor'] = experiment_config['advisor'] + else: + request_data['tuner'] = experiment_config['tuner'] + if 'assessor' in experiment_config: + request_data['assessor'] = experiment_config['assessor'] request_data['clusterMetaData'] = [] if experiment_config['trainingServicePlatform'] == 'local': diff --git a/tools/nni_cmd/launcher_utils.py b/tools/nni_cmd/launcher_utils.py index a3765f623c..3872f01955 100644 --- a/tools/nni_cmd/launcher_utils.py +++ b/tools/nni_cmd/launcher_utils.py @@ -55,6 +55,8 @@ def parse_path(experiment_config, config_path): expand_path(experiment_config['tuner'], 'codeDir') if experiment_config.get('assessor'): expand_path(experiment_config['assessor'], 'codeDir') + if experiment_config.get('advisor'): + expand_path(experiment_config['advisor'], 'codeDir') #if users use relative path, convert it to absolute path root_path = os.path.dirname(config_path) @@ -66,6 +68,8 @@ def parse_path(experiment_config, config_path): parse_relative_path(root_path, experiment_config['tuner'], 'codeDir') if experiment_config.get('assessor'): parse_relative_path(root_path, experiment_config['assessor'], 'codeDir') + if experiment_config.get('advisor'): + parse_relative_path(root_path, experiment_config['advisor'], 'codeDir') def validate_search_space_content(experiment_config): '''Validate searchspace content, @@ -108,47 +112,58 @@ def validate_common_content(experiment_config): print_error('Your config file is not correct, please check your config file content!\n%s' % exception) exit(1) +def validate_customized_file(experiment_config, spec_key): + ''' + check whether the file of customized tuner/assessor/advisor exists + spec_key: 'tuner', 'assessor', 'advisor' + ''' + if experiment_config[spec_key].get('codeDir') and \ + experiment_config[spec_key].get('classFileName') and \ + experiment_config[spec_key].get('className'): + if not os.path.exists(os.path.join( + experiment_config[spec_key]['codeDir'], + experiment_config[spec_key]['classFileName'])): + print_error('%s file directory is not valid!'%(spec_key)) + exit(1) + else: + print_error('%s file directory is not valid!'%(spec_key)) + exit(1) + def parse_tuner_content(experiment_config): '''Validate whether tuner in experiment_config is valid''' if experiment_config['tuner'].get('builtinTunerName'): experiment_config['tuner']['className'] = experiment_config['tuner']['builtinTunerName'] - elif experiment_config['tuner'].get('codeDir') and \ - experiment_config['tuner'].get('classFileName') and \ - experiment_config['tuner'].get('className'): - if not os.path.exists(os.path.join( - experiment_config['tuner']['codeDir'], - experiment_config['tuner']['classFileName'])): - print_error('Tuner file directory is not valid!') - exit(1) else: - raise ValueError('Tuner format is not valid!') + validate_customized_file(experiment_config, 'tuner') def parse_assessor_content(experiment_config): '''Validate whether assessor in experiment_config is valid''' if experiment_config.get('assessor'): if experiment_config['assessor'].get('builtinAssessorName'): experiment_config['assessor']['className'] = experiment_config['assessor']['builtinAssessorName'] - elif experiment_config['assessor'].get('codeDir') and \ - experiment_config['assessor'].get('classFileName') and \ - experiment_config['assessor'].get('className'): - if not os.path.exists(os.path.join( - experiment_config['assessor']['codeDir'], - experiment_config['assessor']['classFileName'])): - print_error('Assessor file directory is not valid!') - exit(1) else: - print_error('Assessor format is not valid!') - exit(1) + validate_customized_file(experiment_config, 'assessor') -def validate_annotation_content(experiment_config): - '''Valid whether useAnnotation and searchSpacePath is coexist''' +def parse_advisor_content(experiment_config): + '''Validate whether advisor in experiment_config is valid''' + if experiment_config['advisor'].get('builtinAdvisorName'): + experiment_config['advisor']['className'] = experiment_config['advisor']['builtinAdvisorName'] + else: + validate_customized_file(experiment_config, 'advisor') + +def validate_annotation_content(experiment_config, spec_key, builtin_name): + ''' + Valid whether useAnnotation and searchSpacePath is coexist + spec_key: 'advisor' or 'tuner' + builtin_name: 'builtinAdvisorName' or 'builtinTunerName' + ''' if experiment_config.get('useAnnotation'): if experiment_config.get('searchSpacePath'): print_error('If you set useAnnotation=true, please leave searchSpacePath empty') exit(1) else: # validate searchSpaceFile - if experiment_config['tuner'].get('tunerName') and experiment_config['tuner'].get('optimizationMode'): + if experiment_config[spec_key].get(builtin_name): if experiment_config.get('searchSpacePath') is None: print_error('Please set searchSpace!') exit(1) @@ -165,6 +180,12 @@ def validate_all_content(experiment_config, config_path): parse_path(experiment_config, config_path) validate_common_content(experiment_config) parse_time(experiment_config) - parse_tuner_content(experiment_config) - parse_assessor_content(experiment_config) - validate_annotation_content(experiment_config) + if experiment_config.get('advisor'): + parse_advisor_content(experiment_config) + validate_annotation_content(experiment_config, 'advisor', 'builtinAdvisorName') + else: + if not experiment_config.get('tuner'): + raise Exception('Please provide tuner spec!') + parse_tuner_content(experiment_config) + parse_assessor_content(experiment_config) + validate_annotation_content(experiment_config, 'tuner', 'builtinTunerName')