diff --git a/examples/tensorflow/local/horovod_keras_mnist.py b/examples/tensorflow/local/horovod_keras_mnist.py new file mode 100644 index 000000000..24ad99d0f --- /dev/null +++ b/examples/tensorflow/local/horovod_keras_mnist.py @@ -0,0 +1,158 @@ +""" +This script is a simple MNIST training script which uses Horovod and Tensorflow's Keras interface. +It has been orchestrated with SageMaker Debugger hook to allow saving tensors during training. +Here, the hook has been created using its constructor to allow running this locally for your experimentation. +When you want to run this script in SageMaker, it is recommended to create the hook from json file. +Please see scripts in either 'sagemaker_byoc' or 'sagemaker_official_container' folder based on your use case. + +This script has been adapted from an example in Horovod repository https://github.com/uber/horovod +""" + +# Standard Library +import argparse +import math +import os + +# Third Party +import horovod.tensorflow.keras as hvd +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import backend as K +from tensorflow.keras.datasets import mnist +from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D +from tensorflow.keras.models import Sequential + +# First Party +import smdebug.tensorflow as smd + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def main(args): + # Horovod: initialize Horovod. + hvd.init() + + if not args.use_only_cpu: + # Horovod: pin GPU to be used to process local rank (one GPU per process) + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + config.gpu_options.visible_device_list = str(hvd.local_rank()) + else: + config = None + + K.set_session(tf.Session(config=config)) + + batch_size = 128 + num_classes = 10 + + # Horovod: adjust number of epochs based on number of GPUs. + epochs = int(math.ceil(args.num_epochs / hvd.size())) + + # Input image dimensions + img_rows, img_cols = 28, 28 + + # The data, shuffled and split between train and test sets + (x_train, y_train), (x_test, y_test) = mnist.load_data() + + if K.image_data_format() == "channels_first": + x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) + x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) + input_shape = (1, img_rows, img_cols) + else: + x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) + x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) + input_shape = (img_rows, img_cols, 1) + + x_train = x_train.astype("float32") + x_test = x_test.astype("float32") + x_train /= 255 + x_test /= 255 + print("x_train shape:", x_train.shape) + print(x_train.shape[0], "train samples") + print(x_test.shape[0], "test samples") + + # Convert class vectors to binary class matrices + y_train = keras.utils.to_categorical(y_train, num_classes) + y_test = keras.utils.to_categorical(y_test, num_classes) + + model = Sequential() + model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape)) + model.add(Conv2D(64, (3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.25)) + model.add(Flatten()) + model.add(Dense(128, activation="relu")) + model.add(Dropout(0.5)) + model.add(Dense(num_classes, activation="softmax")) + + # Horovod: adjust learning rate based on number of GPUs. + opt = keras.optimizers.Adadelta(1.0 * hvd.size()) + + # Horovod: add Horovod Distributed Optimizer. + opt = hvd.DistributedOptimizer(opt) + + ##### Enabling SageMaker Debugger ########### + # creating hook + smd_hook = smd.KerasHook( + out_dir=args.out_dir, + save_config=smd.SaveConfig(save_interval=args.save_interval), + include_collections=["weights", "gradients"], + include_workers=args.include_workers, + ) + + ##### Enabling SageMaker Debugger ########### + # wrapping optimizer so hook can identify gradients + opt = smd_hook.wrap_optimizer(opt) + + model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"]) + + callbacks = [ + # Horovod: broadcast initial variable states from rank 0 to all other processes. + # This is necessary to ensure consistent initialization of all workers when + # training is started with random weights or restored from a checkpoint. + hvd.callbacks.BroadcastGlobalVariablesCallback(0), + ##### Enabling SageMaker Debugger ########### + # adding smd hook as a callback + smd_hook, + ] + + # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. + if hvd.rank() == 0: + callbacks.append( + keras.callbacks.ModelCheckpoint(os.path.join(args.model_dir, "checkpoint-{epoch}.h5")) + ) + + model.fit( + x_train, + y_train, + batch_size=batch_size, + callbacks=callbacks, + epochs=epochs, + verbose=1 if hvd.rank() == 0 else 0, + validation_data=(x_test, y_test), + ) + score = model.evaluate(x_test, y_test, verbose=0) + print("Test loss:", score[0]) + print("Test accuracy:", score[1]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--use_only_cpu", type=str2bool, default=False) + parser.add_argument("--num_epochs", type=int, default=5, help="Number of epochs to train for") + parser.add_argument("--out_dir", type=str) + parser.add_argument("--save_interval", type=int, default=500) + parser.add_argument("--include_workers", type=str, default="one") + parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model") + args = parser.parse_args() + + main(args) diff --git a/examples/tensorflow/local/mnist.py b/examples/tensorflow/local/mnist.py index 00bb26185..7d5cca380 100644 --- a/examples/tensorflow/local/mnist.py +++ b/examples/tensorflow/local/mnist.py @@ -31,7 +31,7 @@ def main(): parser.add_argument( "--num_steps", type=int, - help="Number of steps to train for. If this" "is passed, it overrides num_epochs", + help="Number of steps to train for. If this is passed, it overrides num_epochs", ) parser.add_argument( "--num_eval_steps", @@ -47,6 +47,8 @@ def main(): np.random.seed(2) random.seed(12) + ##### Enabling SageMaker Debugger ########### + # creating hook hook = smd.EstimatorHook( out_dir=args.out_dir, include_collections=["weights", "gradients"], @@ -104,7 +106,8 @@ def cnn_model_fn(features, labels, mode): if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.GradientDescentOptimizer(learning_rate=args.lr) - # SMD: Wrap your optimizer as follows to help SageMaker Debugger identify gradients + ##### Enabling SageMaker Debugger ########### + # Wrap your optimizer as follows to help SageMaker Debugger identify gradients # This does not change your optimization logic, it returns back the same optimizer optimizer = hook.wrap_optimizer(optimizer) @@ -140,12 +143,20 @@ def cnn_model_fn(features, labels, mode): x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False ) + ##### Enabling SageMaker Debugger ########### # Set training mode so SMDebug can classify the steps into training mode hook.set_mode(smd.modes.TRAIN) + + ##### Enabling SageMaker Debugger ########### + # pass hook to hooks parameter of train method mnist_classifier.train(input_fn=train_input_fn, steps=args.num_steps, hooks=[hook]) + ##### Enabling SageMaker Debugger ########### # Set eval mode so SMDebug can classify the steps into eval mode hook.set_mode(smd.modes.EVAL) + + ##### Enabling SageMaker Debugger ########### + # pass hook to hooks parameter of evaluate method mnist_classifier.evaluate(input_fn=eval_input_fn, steps=args.num_eval_steps, hooks=[hook]) diff --git a/examples/tensorflow/local/tf_keras_resnet.py b/examples/tensorflow/local/tf_keras_resnet.py index 1a8b0f0d0..e2c409b47 100644 --- a/examples/tensorflow/local/tf_keras_resnet.py +++ b/examples/tensorflow/local/tf_keras_resnet.py @@ -42,6 +42,8 @@ def train(batch_size, epoch, model, hook): epochs=epoch, validation_data=(X_valid, Y_valid), shuffle=True, + ##### Enabling SageMaker Debugger ########### + # adding hook as a callback callbacks=[hook], ) @@ -57,6 +59,8 @@ def main(): model = ResNet50(weights=None, input_shape=(32, 32, 3), classes=10) + ##### Enabling SageMaker Debugger ########### + # creating hook hook = smd.KerasHook( out_dir=opt.out_dir, include_collections=["weights", "gradients", "losses"], @@ -64,6 +68,8 @@ def main(): ) optimizer = tf.keras.optimizers.Adam() + + ##### Enabling SageMaker Debugger ########### # wrap the optimizer so the hook can identify the gradients optimizer = hook.wrap_optimizer(optimizer) model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) diff --git a/examples/tensorflow/sagemaker_byoc/horovod_keras_mnist.py b/examples/tensorflow/sagemaker_byoc/horovod_keras_mnist.py new file mode 100644 index 000000000..fcea19e4b --- /dev/null +++ b/examples/tensorflow/sagemaker_byoc/horovod_keras_mnist.py @@ -0,0 +1,152 @@ +""" +This script is a simple MNIST training script which uses Horovod and Tensorflow's Keras interface. +It has been orchestrated with SageMaker Debugger hooks to allow saving tensors during training. +These hooks have been instrumented to read from json configuration that SageMaker will put in the training container. +Configuration provided to the SageMaker python SDK when creating a job will be passed on to the hook. +This allows you to use the same script with differing configurations across different runs. +If you use an official SageMaker Framework container (i.e. AWS Deep Learning Container), then +you do not have to orchestrate your script as below. Hooks will automatically be added in those environments. +For more information, please refer to https://github.com/awslabs/sagemaker-debugger/blob/master/docs/sagemaker.md + +This script has been adapted from an example in Horovod repository https://github.com/uber/horovod +""" +# Standard Library +import argparse +import math +import os + +# Third Party +import horovod.tensorflow.keras as hvd +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import backend as K +from tensorflow.keras.datasets import mnist +from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D +from tensorflow.keras.models import Sequential + +# First Party +import smdebug.tensorflow as smd + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def main(args): + # Horovod: initialize Horovod. + hvd.init() + + if not args.use_only_cpu: + # Horovod: pin GPU to be used to process local rank (one GPU per process) + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + config.gpu_options.visible_device_list = str(hvd.local_rank()) + else: + config = None + + K.set_session(tf.Session(config=config)) + + batch_size = 128 + num_classes = 10 + + # Horovod: adjust number of epochs based on number of GPUs. + epochs = int(math.ceil(args.num_epochs / hvd.size())) + + # Input image dimensions + img_rows, img_cols = 28, 28 + + # The data, shuffled and split between train and test sets + (x_train, y_train), (x_test, y_test) = mnist.load_data() + + if K.image_data_format() == "channels_first": + x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) + x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) + input_shape = (1, img_rows, img_cols) + else: + x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) + x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) + input_shape = (img_rows, img_cols, 1) + + x_train = x_train.astype("float32") + x_test = x_test.astype("float32") + x_train /= 255 + x_test /= 255 + print("x_train shape:", x_train.shape) + print(x_train.shape[0], "train samples") + print(x_test.shape[0], "test samples") + + # Convert class vectors to binary class matrices + y_train = keras.utils.to_categorical(y_train, num_classes) + y_test = keras.utils.to_categorical(y_test, num_classes) + + model = Sequential() + model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape)) + model.add(Conv2D(64, (3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.25)) + model.add(Flatten()) + model.add(Dense(128, activation="relu")) + model.add(Dropout(0.5)) + model.add(Dense(num_classes, activation="softmax")) + + # Horovod: adjust learning rate based on number of GPUs. + opt = keras.optimizers.Adadelta(1.0 * hvd.size()) + + # Horovod: add Horovod Distributed Optimizer. + opt = hvd.DistributedOptimizer(opt) + + ##### Enabling SageMaker Debugger ########### + # Create hook from the configuration provided through sagemaker python sdk + smd_hook = smd.KerasHook.create_from_json_file() + + ##### Enabling SageMaker Debugger ########### + # wrap the optimizer so the hook can identify the gradients + opt = smd_hook.wrap_optimizer(opt) + + model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"]) + + callbacks = [ + # Horovod: broadcast initial variable states from rank 0 to all other processes. + # This is necessary to ensure consistent initialization of all workers when + # training is started with random weights or restored from a checkpoint. + hvd.callbacks.BroadcastGlobalVariablesCallback(0), + ##### Enabling SageMaker Debugger ########### + # pass smd_hook as a callback + smd_hook, + ] + + # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. + if hvd.rank() == 0: + callbacks.append( + keras.callbacks.ModelCheckpoint(os.path.join(args.model_dir, "checkpoint-{epoch}.h5")) + ) + + model.fit( + x_train, + y_train, + batch_size=batch_size, + callbacks=callbacks, + epochs=epochs, + verbose=1 if hvd.rank() == 0 else 0, + validation_data=(x_test, y_test), + ) + score = model.evaluate(x_test, y_test, verbose=0) + print("Test loss:", score[0]) + print("Test accuracy:", score[1]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--use_only_cpu", type=str2bool, default=False) + parser.add_argument("--num_epochs", type=int, default=5, help="Number of epochs to train for") + parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model") + args = parser.parse_args() + + main(args) diff --git a/examples/tensorflow/sagemaker_byoc/mnist.py b/examples/tensorflow/sagemaker_byoc/mnist.py index bd354d24c..5bd9fdfd3 100644 --- a/examples/tensorflow/sagemaker_byoc/mnist.py +++ b/examples/tensorflow/sagemaker_byoc/mnist.py @@ -30,7 +30,7 @@ parser.add_argument( "--num_steps", type=int, - help="Number of steps to train for. If this" "is passed, it overrides num_epochs", + help="Number of steps to train for. If this is passed, it overrides num_epochs", ) parser.add_argument( "--num_eval_steps", @@ -46,6 +46,7 @@ np.random.seed(2) random.seed(12) +##### Enabling SageMaker Debugger ########### # This allows you to create the hook from the configuration you pass to the SageMaker pySDK hook = smd.SessionHook.create_from_json_file() @@ -97,7 +98,8 @@ def cnn_model_fn(features, labels, mode): if mode == tf.estimator.ModeKeys.TRAIN: optimizer = tf.train.GradientDescentOptimizer(learning_rate=args.lr) - # SMD: Wrap your optimizer as follows to help SageMaker Debugger identify gradients + ##### Enabling SageMaker Debugger ########### + # Wrap your optimizer as follows to help SageMaker Debugger identify gradients # This does not change your optimization logic, it returns back the same optimizer optimizer = hook.wrap_optimizer(optimizer) @@ -130,10 +132,18 @@ def cnn_model_fn(features, labels, mode): x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False ) +##### Enabling SageMaker Debugger ########### # Set training mode so SMDebug can classify the steps into training mode hook.set_mode(smd.modes.TRAIN) + +##### Enabling SageMaker Debugger ########### +# pass hook to hooks parameter of train method mnist_classifier.train(input_fn=train_input_fn, steps=args.num_steps, hooks=[hook]) +##### Enabling SageMaker Debugger ########### # Set eval mode so SMDebug can classify the steps into eval mode hook.set_mode(smd.modes.EVAL) + +##### Enabling SageMaker Debugger ########### +# pass hook to hooks parameter of evaluate method mnist_classifier.evaluate(input_fn=eval_input_fn, steps=args.num_eval_steps, hooks=[hook]) diff --git a/examples/tensorflow/sagemaker_byoc/simple.py b/examples/tensorflow/sagemaker_byoc/simple.py index 08433dd68..47e43063f 100644 --- a/examples/tensorflow/sagemaker_byoc/simple.py +++ b/examples/tensorflow/sagemaker_byoc/simple.py @@ -48,6 +48,8 @@ def str2bool(v): np.random.seed(2) random.seed(12) +##### Enabling SageMaker Debugger ########### +# Create hook from the configuration provided through sagemaker python sdk hook = smd.SessionHook.create_from_json_file() # Network definition @@ -60,6 +62,8 @@ def str2bool(v): y = tf.matmul(x, w0) loss = tf.reduce_mean((tf.matmul(x, w) - y) ** 2, name="loss") +##### Enabling SageMaker Debugger ########### +# Adding custom loss to losses collection hook.add_to_collection("losses", loss) global_step = tf.Variable(17, name="global_step", trainable=False) @@ -67,22 +71,27 @@ def str2bool(v): optimizer = tf.train.AdamOptimizer(args.lr) +##### Enabling SageMaker Debugger ########### # Wrap the optimizer with wrap_optimizer so smdebug can find gradients to save optimizer = hook.wrap_optimizer(optimizer) # use this wrapped optimizer to minimize loss optimizer_op = optimizer.minimize(loss, global_step=increment_global_step_op) +##### Enabling SageMaker Debugger ########### # pass the hook to hooks parameter of monitored session sess = tf.train.MonitoredSession(hooks=[hook]) -# use this session for running the tensorflow model +##### Enabling SageMaker Debugger ########### +# setting the mode of job so analysis can differentiate between TRAIN, EVAL, PREDICT hook.set_mode(smd.modes.TRAIN) for i in range(args.steps): x_ = np.random.random((10, 2)) * args.scale _loss, opt, gstep = sess.run([loss, optimizer_op, increment_global_step_op], {x: x_}) print(f"Step={i}, Loss={_loss}") +##### Enabling SageMaker Debugger ########### +# setting the mode of job so analysis can differentiate between TRAIN, EVAL, PREDICT hook.set_mode(smd.modes.EVAL) for i in range(args.steps): x_ = np.random.random((10, 2)) * args.scale diff --git a/examples/tensorflow/sagemaker_byoc/tf_keras_resnet.py b/examples/tensorflow/sagemaker_byoc/tf_keras_resnet.py index 9ce9d2e74..cb08ad461 100644 --- a/examples/tensorflow/sagemaker_byoc/tf_keras_resnet.py +++ b/examples/tensorflow/sagemaker_byoc/tf_keras_resnet.py @@ -45,7 +45,7 @@ def train(batch_size, epoch, model, hook): epochs=epoch, validation_data=(X_valid, Y_valid), shuffle=True, - callbacks=[hook], + callbacks=[hook], ##### Enabling SageMaker Debugger ########### ) @@ -58,14 +58,16 @@ def main(): model = ResNet50(weights=None, input_shape=(32, 32, 3), classes=10) + ##### Enabling SageMaker Debugger ########### # Create hook from the configuration provided through sagemaker python sdk hook = smd.KerasHook.create_from_json_file() optimizer = tf.keras.optimizers.Adam() + + ##### Enabling SageMaker Debugger ########### # wrap the optimizer so the hook can identify the gradients optimizer = hook.wrap_optimizer(optimizer) - model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) - # start the training. + model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) train(opt.batch_size, opt.epoch, model, hook) diff --git a/examples/tensorflow/sagemaker_official_container/horovod_estimator_mnist.py b/examples/tensorflow/sagemaker_official_container/horovod_estimator_mnist.py new file mode 100644 index 000000000..b61aaf08e --- /dev/null +++ b/examples/tensorflow/sagemaker_official_container/horovod_estimator_mnist.py @@ -0,0 +1,225 @@ +# Copyright 2018 Uber Technologies, Inc. All Rights Reserved. +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convolutional Neural Network Estimator for MNIST, built with tf.layers.""" + +""" +This script is a simple MNIST training script which uses Horovod and Tensorflow's Estimator interface. +It is designed to be used with SageMaker Debugger in an official SageMaker Framework container (i.e. AWS Deep Learning Container). +You will notice that this script looks exactly like a normal TensorFlow training script. +The hook needed by SageMaker Debugger to save tensors during training will be automatically added in those environments. +The hook will load configuration from json configuration that SageMaker will put in the training container from the configuration provided using the SageMaker python SDK when creating a job. +For more information, please refer to https://github.com/awslabs/sagemaker-debugger/blob/master/docs/sagemaker.md + +This script has been adapted from an example in Horovod repository https://github.com/uber/horovod +""" +# Standard Library +import argparse +import errno +import os + +# Third Party +import horovod.tensorflow as hvd +import numpy as np +import tensorflow as tf +from tensorflow import keras + +tf.logging.set_verbosity(tf.logging.INFO) + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def cnn_model_fn(features, labels, mode): + """Model function for CNN.""" + # Input Layer + # Reshape X to 4-D tensor: [batch_size, width, height, channels] + # MNIST images are 28x28 pixels, and have one color channel + input_layer = tf.reshape(features["x"], [-1, 28, 28, 1]) + + # Convolutional Layer #1 + # Computes 32 features using a 5x5 filter with ReLU activation. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 28, 28, 1] + # Output Tensor Shape: [batch_size, 28, 28, 32] + conv1 = tf.layers.conv2d( + inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #1 + # First max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 28, 28, 32] + # Output Tensor Shape: [batch_size, 14, 14, 32] + pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) + + # Convolutional Layer #2 + # Computes 64 features using a 5x5 filter. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 14, 14, 32] + # Output Tensor Shape: [batch_size, 14, 14, 64] + conv2 = tf.layers.conv2d( + inputs=pool1, filters=64, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #2 + # Second max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 14, 14, 64] + # Output Tensor Shape: [batch_size, 7, 7, 64] + pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) + + # Flatten tensor into a batch of vectors + # Input Tensor Shape: [batch_size, 7, 7, 64] + # Output Tensor Shape: [batch_size, 7 * 7 * 64] + pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) + + # Dense Layer + # Densely connected layer with 1024 neurons + # Input Tensor Shape: [batch_size, 7 * 7 * 64] + # Output Tensor Shape: [batch_size, 1024] + dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu) + + # Add dropout operation; 0.6 probability that element will be kept + dropout = tf.layers.dropout( + inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN + ) + + # Logits layer + # Input Tensor Shape: [batch_size, 1024] + # Output Tensor Shape: [batch_size, 10] + logits = tf.layers.dense(inputs=dropout, units=10) + + predictions = { + # Generate predictions (for PREDICT and EVAL mode) + "classes": tf.argmax(input=logits, axis=1), + # Add `softmax_tensor` to the graph. It is used for PREDICT and by the + # `logging_hook`. + "probabilities": tf.nn.softmax(logits, name="softmax_tensor"), + } + if mode == tf.estimator.ModeKeys.PREDICT: + return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) + + # Calculate Loss (for both TRAIN and EVAL modes) + onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10) + loss = tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits) + + # Configure the Training Op (for TRAIN mode) + if mode == tf.estimator.ModeKeys.TRAIN: + # Horovod: scale learning rate by the number of workers. + optimizer = tf.train.MomentumOptimizer(learning_rate=0.001 * hvd.size(), momentum=0.9) + + # Horovod: add Horovod Distributed Optimizer. + optimizer = hvd.DistributedOptimizer(optimizer) + + train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step()) + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) + + # Add evaluation metrics (for EVAL mode) + eval_metric_ops = { + "accuracy": tf.metrics.accuracy(labels=labels, predictions=predictions["classes"]) + } + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) + + +def main(args): + # Horovod: initialize Horovod. + hvd.init() + + # Keras automatically creates a cache directory in ~/.keras/datasets for + # storing the downloaded MNIST data. This creates a race + # condition among the workers that share the same filesystem. If the + # directory already exists by the time this worker gets around to creating + # it, ignore the resulting exception and continue. + cache_dir = os.path.join(os.path.expanduser("~"), ".keras", "datasets") + if not os.path.exists(cache_dir): + try: + os.mkdir(cache_dir) + except OSError as e: + if e.errno == errno.EEXIST and os.path.isdir(cache_dir): + pass + else: + raise + + # Download and load MNIST dataset. + (train_data, train_labels), (eval_data, eval_labels) = keras.datasets.mnist.load_data( + "MNIST-data-%d" % hvd.rank() + ) + + # The shape of downloaded data is (-1, 28, 28), hence we need to reshape it + # into (-1, 784) to feed into our network. Also, need to normalize the + # features between 0 and 1. + train_data = np.reshape(train_data, (-1, 784)) / 255.0 + eval_data = np.reshape(eval_data, (-1, 784)) / 255.0 + + # Horovod: pin GPU to be used to process local rank (one GPU per process) + if not args.use_only_cpu: + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + config.gpu_options.visible_device_list = str(hvd.local_rank()) + estimator_config = tf.estimator.RunConfig(session_config=config) + else: + estimator_config = None + + # Horovod: save checkpoints only on worker 0 to prevent other workers from + # corrupting them. + model_dir = args.model_dir if hvd.rank() == 0 else None + + # Create the Estimator + mnist_classifier = tf.estimator.Estimator( + model_fn=cnn_model_fn, model_dir=model_dir, config=estimator_config + ) + + # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from + # rank 0 to all other processes. This is necessary to ensure consistent + # initialization of all workers when training is started with random weights or + # restored from a checkpoint. + bcast_hook = hvd.BroadcastGlobalVariablesHook(0) + + # Train the model + train_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"x": train_data}, y=train_labels, batch_size=100, num_epochs=None, shuffle=True + ) + + # Horovod: adjust number of steps based on number of GPUs. + mnist_classifier.train( + input_fn=train_input_fn, steps=args.num_steps // hvd.size(), hooks=[bcast_hook] + ) + + # Evaluate the model and print results + eval_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False + ) + eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn) + print(eval_results) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--use_only_cpu", type=str2bool, default=False) + parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model") + parser.add_argument( + "--num_steps", + type=int, + help="Number of steps to train for. If this is passed, it overrides num_epochs", + ) + args = parser.parse_args() + + main(args) diff --git a/examples/tensorflow/sagemaker_official_container/horovod_keras_mnist.py b/examples/tensorflow/sagemaker_official_container/horovod_keras_mnist.py new file mode 100644 index 000000000..925f80b30 --- /dev/null +++ b/examples/tensorflow/sagemaker_official_container/horovod_keras_mnist.py @@ -0,0 +1,136 @@ +""" +This script is a simple MNIST training script which uses Horovod and Tensorflow's Keras interface. +It is designed to be used with SageMaker Debugger in an official SageMaker Framework container (i.e. AWS Deep Learning Container). +You will notice that this script looks exactly like a normal TensorFlow training script. +The hook needed by SageMaker Debugger to save tensors during training will be automatically added in those environments. +The hook will load configuration from json configuration that SageMaker will put in the training container from the configuration provided using the SageMaker python SDK when creating a job. +For more information, please refer to https://github.com/awslabs/sagemaker-debugger/blob/master/docs/sagemaker.md + +This script has been adapted from an example in Horovod repository https://github.com/uber/horovod +""" +# Standard Library +import argparse +import math +import os + +# Third Party +import horovod.tensorflow.keras as hvd +import tensorflow as tf +from tensorflow import keras +from tensorflow.keras import backend as K +from tensorflow.keras.datasets import mnist +from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D +from tensorflow.keras.models import Sequential + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def main(args): + # Horovod: initialize Horovod. + hvd.init() + + if not args.use_only_cpu: + # Horovod: pin GPU to be used to process local rank (one GPU per process) + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + config.gpu_options.visible_device_list = str(hvd.local_rank()) + else: + config = None + + K.set_session(tf.Session(config=config)) + + batch_size = 128 + num_classes = 10 + + # Horovod: adjust number of epochs based on number of GPUs. + epochs = int(math.ceil(args.num_epochs / hvd.size())) + + # Input image dimensions + img_rows, img_cols = 28, 28 + + # The data, shuffled and split between train and test sets + (x_train, y_train), (x_test, y_test) = mnist.load_data() + + if K.image_data_format() == "channels_first": + x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) + x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) + input_shape = (1, img_rows, img_cols) + else: + x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) + x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) + input_shape = (img_rows, img_cols, 1) + + x_train = x_train.astype("float32") + x_test = x_test.astype("float32") + x_train /= 255 + x_test /= 255 + print("x_train shape:", x_train.shape) + print(x_train.shape[0], "train samples") + print(x_test.shape[0], "test samples") + + # Convert class vectors to binary class matrices + y_train = keras.utils.to_categorical(y_train, num_classes) + y_test = keras.utils.to_categorical(y_test, num_classes) + + model = Sequential() + model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape)) + model.add(Conv2D(64, (3, 3), activation="relu")) + model.add(MaxPooling2D(pool_size=(2, 2))) + model.add(Dropout(0.25)) + model.add(Flatten()) + model.add(Dense(128, activation="relu")) + model.add(Dropout(0.5)) + model.add(Dense(num_classes, activation="softmax")) + + # Horovod: adjust learning rate based on number of GPUs. + opt = keras.optimizers.Adadelta(1.0 * hvd.size()) + + # Horovod: add Horovod Distributed Optimizer. + opt = hvd.DistributedOptimizer(opt) + + model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"]) + + callbacks = [ + # Horovod: broadcast initial variable states from rank 0 to all other processes. + # This is necessary to ensure consistent initialization of all workers when + # training is started with random weights or restored from a checkpoint. + hvd.callbacks.BroadcastGlobalVariablesCallback(0) + ] + + # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. + if hvd.rank() == 0: + callbacks.append( + keras.callbacks.ModelCheckpoint(os.path.join(args.model_dir, "checkpoint-{epoch}.h5")) + ) + + model.fit( + x_train, + y_train, + batch_size=batch_size, + callbacks=callbacks, + epochs=epochs, + verbose=1 if hvd.rank() == 0 else 0, + validation_data=(x_test, y_test), + ) + score = model.evaluate(x_test, y_test, verbose=0) + print("Test loss:", score[0]) + print("Test accuracy:", score[1]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--use_only_cpu", type=str2bool, default=False) + parser.add_argument("--num_epochs", type=int, default=5, help="Number of epochs to train for") + parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model") + args = parser.parse_args() + + main(args) diff --git a/examples/tensorflow/sagemaker_official_container/mnist.py b/examples/tensorflow/sagemaker_official_container/mnist.py index 2143c5bc9..544b47ed8 100644 --- a/examples/tensorflow/sagemaker_official_container/mnist.py +++ b/examples/tensorflow/sagemaker_official_container/mnist.py @@ -25,7 +25,7 @@ parser.add_argument( "--num_steps", type=int, - help="Number of steps to train for. If this" "is passed, it overrides num_epochs", + help="Number of steps to train for. If this is passed, it overrides num_epochs", ) parser.add_argument( "--num_eval_steps", diff --git a/smdebug/core/config_constants.py b/smdebug/core/config_constants.py index d82e99e63..d47c127a8 100644 --- a/smdebug/core/config_constants.py +++ b/smdebug/core/config_constants.py @@ -1,5 +1,5 @@ CONFIG_FILE_PATH_ENV_STR = "SMDEBUG_CONFIG_FILE_PATH" -CONFIG_DEFAULT_WORKER_NAME = "worker_0" +DEFAULT_WORKER_NAME = "worker_0" DEFAULT_CONFIG_FILE_PATH = "/opt/ml/input/config/debughookconfig.json" EXPORT_TENSORBOARD_KEY = "export_tensorboard" TENSORBOARD_DIR_KEY = "tensorboard_dir" diff --git a/smdebug/core/hook.py b/smdebug/core/hook.py index a16f2ed48..5bee0642f 100644 --- a/smdebug/core/hook.py +++ b/smdebug/core/hook.py @@ -19,7 +19,7 @@ ) from smdebug.core.collection_manager import CollectionManager from smdebug.core.config_constants import ( - CONFIG_DEFAULT_WORKER_NAME, + DEFAULT_WORKER_NAME, LATEST_GLOBAL_STEP_SAVED, LATEST_GLOBAL_STEP_SEEN, LATEST_MODE_STEP, @@ -125,7 +125,7 @@ def __init__( self.dry_run = dry_run self.worker = None self.save_all_workers = True if include_workers == "all" else False - self.chief_worker = CONFIG_DEFAULT_WORKER_NAME + self.chief_worker = DEFAULT_WORKER_NAME if include_collections is None: include_collections = default_include_collections @@ -142,7 +142,6 @@ def __init__( self.reduction_config = reduction_config self.include_regex = include_regex self.collection_manager = collection_manager - self.collection_manager.set_num_workers(self._get_num_workers()) self.init_step = init_step self.logger = logger @@ -387,9 +386,8 @@ def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: :param tensor_ref: used by TF :return: List[FileWriter] """ - if self.save_all_workers is False: - if self.worker != self.chief_worker: - return [] + if self.save_all_workers is False and self.worker != self.chief_worker: + return [] return [self.writer] if self.writer else [] def _maybe_get_tb_writer(self) -> Optional[FileWriter]: diff --git a/smdebug/core/json_config.py b/smdebug/core/json_config.py index ffcbffc02..aa9b088d5 100644 --- a/smdebug/core/json_config.py +++ b/smdebug/core/json_config.py @@ -53,7 +53,6 @@ CONFIG_COLLECTION_CONFIG_KEY, CONFIG_COLLECTION_NAME_KEY, CONFIG_COLLECTION_PARAMS_KEY, - CONFIG_DEFAULT_WORKER_NAME, CONFIG_FILE_PATH_ENV_STR, CONFIG_HOOK_PARAMS_KEY, CONFIG_INCLUDE_REGEX_KEY, @@ -66,6 +65,7 @@ DEFAULT_CONFIG_FILE_PATH, DEFAULT_SAGEMAKER_OUTDIR, DEFAULT_SAGEMAKER_TENSORBOARD_PATH, + DEFAULT_WORKER_NAME, EXPORT_TENSORBOARD_KEY, TENSORBOARD_CONFIG_FILE_PATH_ENV_STR, TENSORBOARD_DIR_KEY, diff --git a/smdebug/mxnet/hook.py b/smdebug/mxnet/hook.py index debe5bd79..ff1678a35 100644 --- a/smdebug/mxnet/hook.py +++ b/smdebug/mxnet/hook.py @@ -4,7 +4,7 @@ # First Party from smdebug.core.collection import CollectionKeys from smdebug.core.hook import CallbackHook -from smdebug.core.json_config import CONFIG_DEFAULT_WORKER_NAME +from smdebug.core.json_config import DEFAULT_WORKER_NAME from smdebug.mxnet.collection import CollectionManager from smdebug.mxnet.graph import _net2pb from smdebug.mxnet.singleton_utils import set_hook @@ -67,7 +67,7 @@ def _get_worker_name(self): return f"worker_{hvd.rank()}" except (ModuleNotFoundError, ValueError, ImportError): pass - return CONFIG_DEFAULT_WORKER_NAME + return DEFAULT_WORKER_NAME def _get_num_workers(self): try: diff --git a/smdebug/pytorch/hook.py b/smdebug/pytorch/hook.py index 047d76b77..c1393d0e9 100644 --- a/smdebug/pytorch/hook.py +++ b/smdebug/pytorch/hook.py @@ -7,7 +7,7 @@ # First Party from smdebug.core.collection import CollectionKeys from smdebug.core.hook import CallbackHook -from smdebug.core.json_config import CONFIG_DEFAULT_WORKER_NAME +from smdebug.core.json_config import DEFAULT_WORKER_NAME from smdebug.pytorch.collection import CollectionManager from smdebug.pytorch.singleton_utils import set_hook from smdebug.pytorch.utils import get_reduction_of_data, make_numpy_array @@ -88,7 +88,7 @@ def _get_worker_name(self): except (ModuleNotFoundError, ValueError, ImportError): pass # Return default - return CONFIG_DEFAULT_WORKER_NAME + return DEFAULT_WORKER_NAME def _log_params(self, module): module_name = module._get_name() diff --git a/smdebug/tensorflow/base_hook.py b/smdebug/tensorflow/base_hook.py index 3bce5adcc..bedfb4405 100644 --- a/smdebug/tensorflow/base_hook.py +++ b/smdebug/tensorflow/base_hook.py @@ -7,7 +7,7 @@ from tensorflow.python.distribute.distribute_lib import _DefaultDistributionStrategy # First Party -from smdebug.core.config_constants import CONFIG_DEFAULT_WORKER_NAME +from smdebug.core.config_constants import DEFAULT_WORKER_NAME from smdebug.core.hook import BaseHook from smdebug.core.modes import ModeKeys from smdebug.core.reductions import get_numpy_reduction, get_reduction_tensor_name @@ -20,10 +20,12 @@ from .singleton_utils import set_hook from .utils import ( TFDistributionStrategy, + get_chief_worker_from_tf_config, get_num_workers_from_tf_config, get_worker_id_from_tf_config, is_mirrored_strategy, is_parameter_server_strategy, + load_tf_config_json, ) try: @@ -85,13 +87,38 @@ def __init__( Example -> /job:worker/replica:0/task:1/device:GPU:0 : _job-worker_replica-0_task-1_device-GPU-0""" self.device_map = {} self.writer_map = {} - self.distribution_strategy = None - self.tf_config = os.getenv( - "TF_CONFIG" - ) # caches the TF_CONFIG for the parameter server strategy + # This will be None if the var wasn't set, i.e. not param server + self.tf_config_json = load_tf_config_json(os.getenv("TF_CONFIG")) self._hook_supported = None + self._exported_collections = False + self._distribution_strategy = { + ModeKeys.TRAIN: None, + ModeKeys.EVAL: None, + ModeKeys.PREDICT: None, + ModeKeys.GLOBAL: None, + } + self._prepared_tensors = { + ModeKeys.TRAIN: False, + ModeKeys.EVAL: False, + ModeKeys.PREDICT: False, + ModeKeys.GLOBAL: False, + } + self._exported_model = { + ModeKeys.TRAIN: False, + ModeKeys.EVAL: False, + ModeKeys.PREDICT: False, + ModeKeys.GLOBAL: False, + } set_hook(self) + @property + def distribution_strategy(self): + return self._distribution_strategy[self.mode] + + @distribution_strategy.setter + def distribution_strategy(self, distribution_strategy): + self._distribution_strategy[self.mode] = distribution_strategy + def _get_distribution_strategy(self) -> TFDistributionStrategy: try: import horovod.tensorflow as hvd @@ -101,19 +128,30 @@ def _get_distribution_strategy(self) -> TFDistributionStrategy: except (ModuleNotFoundError, ValueError, ImportError): pass - if self.tf_config and is_parameter_server_strategy(self.tf_config): - return TFDistributionStrategy.PARAMETER_SERVER_STRATEGY - strat = tf.distribute.get_strategy() if is_mirrored_strategy(strat): - return TFDistributionStrategy.MIRRORED_STRATEGY + return TFDistributionStrategy.MIRRORED if isinstance(strat, _DefaultDistributionStrategy): # single device return TFDistributionStrategy.NONE + # Disable PS till we verify proper support of PS on SM + # if self.tf_config_json and is_parameter_server_strategy(self.tf_config): + # return TFDistributionStrategy.PARAMETER_SERVER + return TFDistributionStrategy.UNSUPPORTED + def _assert_distribution_strategy(self): + """ + The distribution strategy is initialized to None, + as it's not available during hook construction. + Later when the graph is ready, that's when correct distribution strategy is returned. + """ + assert ( + self.distribution_strategy is not None + ), "_get_distribution_strategy should be called before this method" + def _get_worker_name(self) -> str: """ This function returns the name of the worker based on @@ -126,67 +164,81 @@ def _get_worker_name(self) -> str: It is safe to return the CONFIG_DEFAULT_WORKER_NAME in this case. :return: str """ - try: + self._assert_distribution_strategy() + if self.distribution_strategy == TFDistributionStrategy.HOROVOD: import horovod.tensorflow as hvd - if hvd.size(): - return f"worker_{hvd.rank()}" - except (ModuleNotFoundError, ValueError, ImportError): - pass - - tf_config = os.getenv("TF_CONFIG") - if tf_config and is_parameter_server_strategy(tf_config): - return get_worker_id_from_tf_config(tf_config) - return CONFIG_DEFAULT_WORKER_NAME + return f"worker_{hvd.rank()}" + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + # unused for this strategy + return DEFAULT_WORKER_NAME + elif self.distribution_strategy == TFDistributionStrategy.PARAMETER_SERVER: + return get_worker_id_from_tf_config(self.tf_config_json) + elif self.distribution_strategy == TFDistributionStrategy.NONE: + return DEFAULT_WORKER_NAME + elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: + raise NotImplementedError def export_collections(self): - num_workers = self._get_num_workers() + assert self._prepared_tensors[self.mode] + if self.save_all_workers is False: num_workers = 1 - if ( - self.distribution_strategy == TFDistributionStrategy.PARAMETER_SERVER_STRATEGY - or self.distribution_strategy == TFDistributionStrategy.HOROVOD - ): - if self.worker != self.chief_worker: - return + else: + num_workers = self._get_num_workers() self.collection_manager.set_num_workers(num_workers) - if len(self.device_map): + if self.distribution_strategy in [ + TFDistributionStrategy.PARAMETER_SERVER, + TFDistributionStrategy.HOROVOD, + ]: + if self.save_all_workers is False and self.worker != self.chief_worker: + return + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + if len(self.device_map): + for device, serialized_device in self.device_map.items(): + if self.save_all_workers is True or device == self.chief_worker: + collection_file_name = f"{serialized_device}_collections.json" + self.collection_manager.export(self.out_dir, collection_file_name) + return - for device, serialized_device in self.device_map.items(): - if self.save_all_workers is False and device != self.chief_worker: - continue - collection_file_name = f"{serialized_device}_collections.json" - self.collection_manager.export(self.out_dir, collection_file_name) - else: - collection_file_name = f"{self.worker}_collections.json" - self.collection_manager.export(self.out_dir, collection_file_name) + # below is used in these cases + # if mirrored and device_map is empty (CPU training) + # if horovod/param server and worker == chief worker + collection_file_name = f"{self.worker}_collections.json" + self.collection_manager.export(self.out_dir, collection_file_name) def _get_num_workers(self): - try: + self._assert_distribution_strategy() + if self.distribution_strategy == TFDistributionStrategy.HOROVOD: import horovod.tensorflow as hvd - if hvd.size(): - return hvd.size() - except (ModuleNotFoundError, ValueError, ImportError): - pass - tf_config = os.getenv("TF_CONFIG") - if tf_config and is_parameter_server_strategy(tf_config): - return get_num_workers_from_tf_config(tf_config) - strategy = tf.distribute.get_strategy() - return strategy.num_replicas_in_sync - - def _export_model(self): - tb_writer = self._maybe_get_tb_writer() - if tb_writer: - self.logger.info("Writing graph") - tb_writer.write_graph(self.graph.as_graph_def(add_shapes=True)) - # don't close writer as it might be needed in the step that follows - # else we will have to open the file again - - def _add_to_device_map(self, tensor): - if tensor.device and "CPU" not in tensor.device and tensor.device not in self.device_map: - self.device_map[tensor.device] = serialize_tf_device(tensor.device) + return hvd.size() + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + strategy = tf.distribute.get_strategy() + return strategy.num_replicas_in_sync + elif self.distribution_strategy == TFDistributionStrategy.PARAMETER_SERVER: + return get_num_workers_from_tf_config(self.tf_config_json) + elif self.distribution_strategy == TFDistributionStrategy.NONE: + return 1 + elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: + raise NotImplementedError + + def _set_chief_worker(self): + self._assert_distribution_strategy() + # this won't be used if save_all_workers is True + if self.distribution_strategy == TFDistributionStrategy.HOROVOD: + self.chief_worker = DEFAULT_WORKER_NAME + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + assert self._prepared_tensors[self.mode] + if len(self.device_map): + self.chief_worker = sorted(self.device_map.keys())[0] + else: + self.chief_worker = DEFAULT_WORKER_NAME + elif self.distribution_strategy == TFDistributionStrategy.PARAMETER_SERVER: + self.chief_worker = get_chief_worker_from_tf_config(self.tf_config_json) + elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: + raise NotImplementedError def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: """ @@ -200,70 +252,70 @@ def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]: :param tensor_name: :return: List[FileWriter] """ - if ( - len(self.device_map) - and self.distribution_strategy != TFDistributionStrategy.PARAMETER_SERVER_STRATEGY - ): - if tensor_ref.tf_obj is not None: - worker = tensor_ref.tf_obj.device - else: - # metrics in Keras - worker = "CPU" - - if not bool(worker) or "CPU" in worker: - return list(self.writer_map.values()) - if self.save_all_workers is False: - if worker == self.chief_worker: - worker = self.device_map[worker] - else: - return [] - else: - worker = self.device_map[worker] - return [self.writer_map[worker]] + if self.distribution_strategy in [ + TFDistributionStrategy.PARAMETER_SERVER, + TFDistributionStrategy.HOROVOD, + ]: + if (self.save_all_workers is True or self.worker == self.chief_worker) and self.writer: + return [self.writer] + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + if len(self.device_map): + # else is for metrics in Keras + worker = tensor_ref.tf_obj.device if tensor_ref.tf_obj is not None else "CPU" + # if device str is empty or cpu in worker + if not bool(worker) or "CPU" in worker: + if self.save_all_workers: + return list(self.writer_map.values()) + else: + return [self.writer_map[self.device_map[self.chief_worker]]] + elif self.save_all_workers or worker == self.chief_worker: + return [self.writer_map[self.device_map[worker]]] + elif self.writer: + # training on CPU when all device strings have cpu + return [self.writer] + elif self.distribution_strategy == TFDistributionStrategy.NONE: + if self.writer: + return [self.writer] else: - return [self.writer] if self.writer else [] + raise NotImplementedError + # when self.writer is None, returns empty list + return [] def _initialize_writers(self, only_initialize_if_missing=False) -> None: # In keras, sometimes we are not sure if writer is initialized # (such as metrics at end of epoch), that's why it passes the flag only_init_if_missing - if self.dry_run: return - if ( - self.save_all_workers is False - and self.distribution_strategy != TFDistributionStrategy.MIRRORED_STRATEGY - ): - """ - If include_workers is False, we assign we check if the hook has been created by - the chief worker. If not we do not initialize a writer. - """ - if self.chief_worker != self.worker: - return - - if ( - len(self.device_map) - and self.distribution_strategy != TFDistributionStrategy.PARAMETER_SERVER_STRATEGY - ): - """ - Initialize one writer per device string - If save_all_workers is False, we only initialize a writer - for the chief worker - """ - for device, device_string in self.device_map.items(): - if device_string in self.writer_map and only_initialize_if_missing is True: - continue - if self.save_all_workers is True: - self.writer_map[device_string] = FileWriter( - trial_dir=self.out_dir, step=self.step, worker=device_string + if self.distribution_strategy in [ + TFDistributionStrategy.PARAMETER_SERVER, + TFDistributionStrategy.HOROVOD, + ]: + if self.save_all_workers is True or self.worker == self.chief_worker: + if self.writer is None or only_initialize_if_missing is False: + self.writer = FileWriter( + trial_dir=self.out_dir, step=self.step, worker=self.worker ) - elif self.save_all_workers is False and device == self.chief_worker: - self.writer_map[device_string] = FileWriter( - trial_dir=self.out_dir, step=self.step, worker=device_string + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: + if len(self.device_map): + for device, device_string in self.device_map.items(): + if device_string in self.writer_map and only_initialize_if_missing is True: + continue + if self.save_all_workers is True or device == self.chief_worker: + self.writer_map[device_string] = FileWriter( + trial_dir=self.out_dir, step=self.step, worker=device_string + ) + else: + # training on CPU when all device strings have cpu + if self.writer is None or only_initialize_if_missing is False: + self.writer = FileWriter( + trial_dir=self.out_dir, step=self.step, worker=self.worker ) - else: + elif self.distribution_strategy == TFDistributionStrategy.NONE: if self.writer is None or only_initialize_if_missing is False: self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) + else: + raise NotImplementedError def _close_writers(self) -> None: if self.dry_run: @@ -288,6 +340,17 @@ def _close_writers(self) -> None: for device in to_delete_writers: del self.writer_map[device] + def _export_model(self): + tb_writer = self._maybe_get_tb_writer() + if tb_writer: + tb_writer.write_graph(self.graph.as_graph_def(add_shapes=True)) + # don't close writer as it might be needed in the step that follows + # else we will have to open the file again + + def _add_to_device_map(self, tensor): + if tensor.device and "CPU" not in tensor.device and tensor.device not in self.device_map: + self.device_map[tensor.device] = serialize_tf_device(tensor.device) + def _log_unsupported_optimizer(self, optimizer): self.logger.warning( f"Unsupported optimizer {optimizer} {optimizer.__class__}, cannot automatically find " @@ -368,14 +431,11 @@ def set_optimizer_variables(self, optimizer_variables): ) def save_scalar(self, name, value, sm_metric=False): - """ - save_scalar() not supported on Tensorflow - """ - self.logger.warning( - "save_scalar not supported on Tensorflow. " - "Add the scalar to scalars or sm_metrics collection instead. " + raise NotImplementedError( + "save_scalar not supported for Tensorflow. " + "Add the scalar to scalars or sm_metrics collection instead depending " + "on whether you want the scalar to show up as a SageMaker Metric. " ) - return @staticmethod def _make_numpy_array(tensor_value): diff --git a/smdebug/tensorflow/keras.py b/smdebug/tensorflow/keras.py index 077286046..dec69c054 100644 --- a/smdebug/tensorflow/keras.py +++ b/smdebug/tensorflow/keras.py @@ -57,22 +57,13 @@ def __init__( save_all=save_all, include_workers=include_workers, ) - self._exported_collections = False - self._exported_model = { - ModeKeys.TRAIN: False, - ModeKeys.EVAL: False, - ModeKeys.PREDICT: False, - } self.tensor_refs_to_save_this_step = set() self._fetches_added = set() - self._prepared_tensors = { - ModeKeys.TRAIN: False, - ModeKeys.EVAL: False, - ModeKeys.PREDICT: False, - } self.callable_cache = CallableCache() def _is_not_supported(self): + if self.distribution_strategy is None: + self.distribution_strategy = self._get_distribution_strategy() if self._hook_supported is None: self._hook_supported = True if tf.executing_eagerly() or ( @@ -80,7 +71,7 @@ def _is_not_supported(self): ): self.logger.info("Disabling SMDebug as it does not support eager mode") self._hook_supported = False - elif self._get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: + elif self.distribution_strategy == TFDistributionStrategy.MIRRORED: try: from tensorflow.python.keras.distribute.distributed_training_utils import ( get_distributed_model, @@ -92,7 +83,7 @@ def _is_not_supported(self): "with TensorFlow version <1.14" ) self._hook_supported = False - elif self._get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: + elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: self.logger.info( f"Disabling SMDebug as it does not support " f"{tf.distribute.get_strategy()}" ) @@ -134,10 +125,7 @@ def _get_matching_collections( def _check_and_add_layer_tensor( self, mode, layer, tensor_type, tensor, is_input_to_model=False, is_output_of_model=False ): - if ( - self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY - and not tensor.device - ): + if self.distribution_strategy == TFDistributionStrategy.MIRRORED and not tensor.device: # these are extra tensors which show up # ignoring this still allows us to access all replica's tensors # self.logger.debug(f"Skipping {layer} {tensor_type} {tensor}") @@ -251,7 +239,7 @@ def _get_distributed_model(self, mode): def _is_input_layer(self, mode, layer_inputs): model_inputs = [] - if self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY: + if self.distribution_strategy == TFDistributionStrategy.MIRRORED: model = self._get_distributed_model(mode) else: model = self.model @@ -265,7 +253,7 @@ def _is_input_layer(self, mode, layer_inputs): def _is_output_layer(self, mode, layer_outputs): model_outputs = [] - if self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY: + if self.distribution_strategy == TFDistributionStrategy.MIRRORED: model = self._get_distributed_model(mode) else: model = self.model @@ -299,8 +287,6 @@ def _prepare_layers(self, mode): for w in weights: self._check_and_add_layer_tensor(mode, layer, "weight", w) - self._prepared_tensors[mode] = True - def _prepare_non_layer_tensors(self): # for gradients, optimizer_variables for coll in self.collection_manager.get_collections().values(): @@ -450,7 +436,6 @@ def on_epoch_end(self, batch, logs=None): def _on_any_mode_begin(self, mode): if self._is_not_supported(): return - self.distribution_strategy = self._get_distribution_strategy() self.worker = self._get_worker_name() self.graph = tf.get_default_graph() self.set_mode(mode) @@ -495,14 +480,10 @@ def _on_any_batch_begin(self, batch, mode, logs=None): if self._validate_exec_function(self._get_exec_function(mode)): self._prepare_layers(mode) self._prepare_non_layer_tensors() + self._prepared_tensors[mode] = True # below should be after tensors are processed, # so we know that device map is populated - if ( - len(self.device_map) - and self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY - and self.save_all_workers is False - ): - self.chief_worker = sorted(self.device_map.keys())[0] + self._set_chief_worker() # else: # this will delay the preparation of tensors as the # full graph is not built. Gradients are not available diff --git a/smdebug/tensorflow/session.py b/smdebug/tensorflow/session.py index 2866397f8..4f556735a 100644 --- a/smdebug/tensorflow/session.py +++ b/smdebug/tensorflow/session.py @@ -5,7 +5,6 @@ # First Party from smdebug.core.collection import CollectionKeys -from smdebug.core.config_constants import CONFIG_DEFAULT_WORKER_NAME from smdebug.core.tfevent.proto.summary_pb2 import Summary from smdebug.core.tfevent.util import make_numpy_array from smdebug.core.utils import match_inc @@ -17,7 +16,6 @@ TFDistributionStrategy, build_fetches_tuple, extract_graph_summary, - get_chief_worker_parameter_server, tensor_can_be_saved, ) @@ -207,12 +205,18 @@ def _add_tensors(self): # so collections have save configs and reduction configs self._prepare_collections() + self._add_losses() + self._add_weights_and_biases() + self._add_summaries_tensors() + for op in self.graph.get_operations(): for tensor in op.outputs: if is_placeholder(tensor): self._placeholder_tensors.add(tensor) self._check_and_add_tensor(tensor) + self._prepared_tensors[self.mode] = True + def _add_summaries_tensors(self): if CollectionKeys.TENSORFLOW_SUMMARIES in self.include_collections: c = self.collection_manager.get(CollectionKeys.TENSORFLOW_SUMMARIES).add( @@ -232,10 +236,16 @@ def _add_weights_and_biases(self): # adds a tensor_ref with name `w1/read:0` and export_name `w1:0` self.collection_manager.get(CollectionKeys.WEIGHTS).add(w) + def _add_losses(self): + losses = tf.losses.get_losses() + self.collection_manager.get(CollectionKeys.LOSSES).add(losses) + def _is_not_supported(self): + if self.distribution_strategy is None: + self.distribution_strategy = self._get_distribution_strategy() if self._hook_supported is None: self._hook_supported = True - if self._get_distribution_strategy() == TFDistributionStrategy.MIRRORED_STRATEGY: + if self.distribution_strategy == TFDistributionStrategy.MIRRORED: from packaging import version if version.parse(tf.__version__) < version.parse("1.14.0"): @@ -246,18 +256,14 @@ def _is_not_supported(self): "Disabling SMDebug as it does not support mirrored strategy" "with TensorFlow version <1.14" ) - elif self._get_distribution_strategy() == TFDistributionStrategy.UNSUPPORTED: + elif self.distribution_strategy == TFDistributionStrategy.UNSUPPORTED: self.logger.info( f"Disabling SMDebug as it does not support " f"{tf.distribute.get_strategy()}" ) self._hook_supported = False return not self._hook_supported - def begin(self): - if self._is_not_supported(): - return - - # clear all caches so we don't interfere with other modes + def _clear_cached_state(self): self._subgraph_nodes = {} self._tensor_placeholder_dependence = {} self._placeholder_tensors = set() @@ -267,34 +273,28 @@ def begin(self): # setting this to False means that on next apply_gradients/get_grads gradients will be set again self._gradients_set = False - # todo: use global step from TF instead of internal steps + def begin(self): + if self._is_not_supported(): + return + # clear all caches so we don't interfere with other modes + self._clear_cached_state() + + # todo: use global step from TF instead of internal steps # todo: handle multiple graphs in the model self.worker = self._get_worker_name() - self.distribution_strategy = self._get_distribution_strategy() self.graph = tf.get_default_graph() - self._add_weights_and_biases() - - losses = tf.losses.get_losses() - self.collection_manager.get(CollectionKeys.LOSSES).add(losses) - - self._add_summaries_tensors() self._add_tensors() + self._set_chief_worker() - if self.save_all_workers is False: - if self.distribution_strategy == TFDistributionStrategy.PARAMETER_SERVER_STRATEGY: - self.chief_worker = get_chief_worker_parameter_server(self.tf_config) - elif self.distribution_strategy == TFDistributionStrategy.HOROVOD: - self.chief_worker = CONFIG_DEFAULT_WORKER_NAME - elif ( - len(self.device_map) - and self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY - ): - self.chief_worker = sorted(self.device_map.keys())[0] + if self._exported_model[self.mode] is False: + self._export_model() + self._exported_model[self.mode] = True - self._export_model() - self.export_collections() + if self._exported_collections is False: + self.export_collections() + self._exported_collections = True def _get_tensors_to_save_this_step(self) -> set: tensors_to_save = set() diff --git a/smdebug/tensorflow/utils.py b/smdebug/tensorflow/utils.py index 47aa42546..b1049a4f0 100644 --- a/smdebug/tensorflow/utils.py +++ b/smdebug/tensorflow/utils.py @@ -8,7 +8,6 @@ from tensorflow.python.distribute import values # First Party -from smdebug.core.config_constants import CONFIG_DEFAULT_WORKER_NAME from smdebug.core.modes import ModeKeys try: @@ -21,8 +20,8 @@ class TFDistributionStrategy(Enum): NONE = 0 HOROVOD = 1 - MIRRORED_STRATEGY = 2 - PARAMETER_SERVER_STRATEGY = 3 + MIRRORED = 2 + PARAMETER_SERVER = 3 UNSUPPORTED = 100 @@ -195,35 +194,49 @@ def get_original_fetch_ops(fetches): """ -def is_parameter_server_strategy(tf_config: str) -> bool: +def load_tf_config_json(tf_config: str): try: - tf_config = json.loads(tf_config) + return json.loads(tf_config) except (json.JSONDecodeError, TypeError): - return False # Do not break for incorrectly set tf_config - return "cluster" in tf_config and "ps" in tf_config["cluster"] + # if tf_config is None throws TypeError, so return None from next line + return None -def is_mirrored_strategy(strat): - return isinstance(strat, (tf.distribute.MirroredStrategy, ContribMirroredStrategy)) +def is_parameter_server_strategy(tf_config_json: dict) -> bool: + try: + return "cluster" in tf_config_json and "ps" in tf_config_json["cluster"] + except TypeError: + # when json is None + return False -def get_worker_id_from_tf_config(tf_config: str) -> str: +def get_worker_id_from_tf_config(tf_config_json: dict) -> str: """Valid roles in a cluster is "chief", "worker", "ps" and "evaluator".""" - tf_config = json.loads(tf_config) - task = tf_config["task"] + task = tf_config_json["task"] worker_type = task["type"] worker_index = task["index"] return f"{worker_type}_{worker_index}" -def get_num_workers_from_tf_config(tf_config: str) -> int: - tf_config = json.loads(tf_config) - workers = tf_config["cluster"]["worker"] - if "chief" in tf_config["cluster"]: - workers.extend(tf_config["cluster"]["chief"]) +def get_num_workers_from_tf_config(tf_config_json: dict) -> int: + workers = tf_config_json["cluster"]["worker"] + if "chief" in tf_config_json["cluster"]: + workers.extend(tf_config_json["cluster"]["chief"]) return len(workers) +def get_chief_worker_from_tf_config(tf_config_json: dict): + if "chief" in tf_config_json["cluster"]: + return "chief_0" + else: + raise NotImplementedError + # todo + + +def is_mirrored_strategy(strat): + return isinstance(strat, (tf.distribute.MirroredStrategy, ContribMirroredStrategy)) + + def is_keras_optimizer(obj): for cls in obj.__class__.__mro__: if ".".join([cls.__module__, cls.__name__]) == "keras.optimizers.Optimizer": @@ -282,9 +295,3 @@ def get_keras_mode(mode): return KerasModeKeys.TEST elif mode == ModeKeys.PREDICT: return KerasModeKeys.PREDICT - - -def get_chief_worker_parameter_server(tf_config): - if "chief" in tf_config["cluster"]: - return "chief_0" - return CONFIG_DEFAULT_WORKER_NAME diff --git a/tests/mxnet/test_hook_all_zero.py b/tests/mxnet/test_hook_all_zero.py index 0de2b1e42..1d6c0b00a 100644 --- a/tests/mxnet/test_hook_all_zero.py +++ b/tests/mxnet/test_hook_all_zero.py @@ -37,10 +37,7 @@ def test_hook_all_zero(hook=None, out_dir=None): assert len(tr.steps()) == 4 tnames = tr.tensor_names(regex="conv._input") - print(tnames) tname = tr.tensor_names(regex="conv._input")[0] - print(tname) - print(tr.tensor(tname).steps()) conv_tensor_value = tr.tensor(tname).value(step_num=0) is_zero = np.all(conv_tensor_value == 0) assert is_zero == True diff --git a/tests/tensorflow/hooks/test_mirrored_strategy.py b/tests/tensorflow/hooks/test_mirrored_strategy.py index 25810d590..f76654f76 100644 --- a/tests/tensorflow/hooks/test_mirrored_strategy.py +++ b/tests/tensorflow/hooks/test_mirrored_strategy.py @@ -305,8 +305,9 @@ def test_basic(out_dir, zcc=False): tr = create_trial_fast_refresh(out_dir) # wts, grads, losses - print(tr.tensor_names()) - assert len(tr.tensor_names()) == 8 + 8 + (1 * strategy.num_replicas_in_sync) + 1 + assert ( + len(tr.tensor_names()) == 8 + 8 + (1 * strategy.num_replicas_in_sync) + 1 + ) # 1 main loss, and 1 from each worker assert len(tr.steps()) == 7 assert len(tr.steps(ModeKeys.TRAIN)) == 3 assert len(tr.steps(ModeKeys.EVAL)) == 2 @@ -319,7 +320,7 @@ def test_basic(out_dir, zcc=False): for worker in tr.tensor(tname).workers(s, ModeKeys.TRAIN): assert tr.tensor(tname).value(s, worker=worker, mode=ModeKeys.TRAIN) is not None for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == strategy.num_replicas_in_sync + assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == 1 # as eval_dist = False assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None tensornames = tr.tensor_names(regex="Identity_\d+:0") @@ -334,12 +335,12 @@ def test_basic(out_dir, zcc=False): for tname in tr.tensor_names(collection="losses"): if tname != tensornames[0]: for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).workers(s, ModeKeys.TRAIN)) == 1 + assert len(tr.tensor(tname).workers(s, ModeKeys.TRAIN)) == 1, tname assert tr.tensor(tname).value(s, mode=ModeKeys.TRAIN) is not None tname = "sparse_softmax_cross_entropy_loss/value:0" for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == strategy.num_replicas_in_sync + assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == 1 # eval_dist=False assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None diff --git a/tests/tensorflow/keras/test_keras.py b/tests/tensorflow/keras/test_keras.py index 4befe3cb3..e3680549d 100644 --- a/tests/tensorflow/keras/test_keras.py +++ b/tests/tensorflow/keras/test_keras.py @@ -266,7 +266,6 @@ def test_base_reductions(out_dir): steps=["train"], ) tr = create_trial_fast_refresh(out_dir) - print(tr.tensor_names()) weight_name = tr.tensor_names(collection=CollectionKeys.WEIGHTS)[0] try: tr.tensor(weight_name).value(0) diff --git a/tests/tensorflow/test_utils.py b/tests/tensorflow/test_utils.py index f2ee8a5e2..2122f7672 100644 --- a/tests/tensorflow/test_utils.py +++ b/tests/tensorflow/test_utils.py @@ -5,6 +5,7 @@ # First Party from smdebug.tensorflow.utils import ( TFDistributionStrategy, + get_chief_worker_from_tf_config, get_num_workers_from_tf_config, get_worker_id_from_tf_config, is_parameter_server_strategy, @@ -12,122 +13,149 @@ ) -def test_read_tf_config(): +def test_read_tf_config(monkeypatch): # Case 1: No TF_CONFIG assert is_parameter_server_strategy(os.getenv("TF_CONFIG")) is False # Case 2: TF_CONFIG present but empty - os.environ["TF_CONFIG"] = json.dumps({}) + monkeypatch.setenv("TF_CONFIG", json.dumps({})) - assert is_parameter_server_strategy(os.getenv("TF_CONFIG")) is False + assert is_parameter_server_strategy(json.loads(os.getenv("TF_CONFIG"))) is False # Case 3: TF_CONFIG present but invalid because of missing ps field - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": {"worker": ["host1:port", "host2:port", "host3:port"]}, - "task": {"type": "worker", "index": 1}, - } + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": {"worker": ["host1:port", "host2:port", "host3:port"]}, + "task": {"type": "worker", "index": 1}, + } + ), ) - - assert is_parameter_server_strategy(os.getenv("TF_CONFIG")) is False + assert is_parameter_server_strategy(json.loads(os.getenv("TF_CONFIG"))) is False # Case 4: TF_CONFIG present and valid - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "worker", "index": 1}, - } + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ), ) - - assert is_parameter_server_strategy(os.getenv("TF_CONFIG")) is True - - del os.environ["TF_CONFIG"] - - -def test_get_worker_id_from_tf_config(): - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "worker", "index": 1}, - } + assert is_parameter_server_strategy(json.loads(os.getenv("TF_CONFIG"))) is True + + +def test_get_worker_id_from_tf_config(monkeypatch): + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ), ) - worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + worker_id = get_worker_id_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) assert worker_id == "worker_1" - del os.environ["TF_CONFIG"] - - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "ps", "index": 0}, - } + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "ps", "index": 0}, + } + ), ) - worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + worker_id = get_worker_id_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) assert worker_id == "ps_0" - del os.environ["TF_CONFIG"] - - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "chief": ["host0:port"], - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "chief", "index": 0}, - } + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "chief", "index": 0}, + } + ), ) - worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + worker_id = get_worker_id_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) assert worker_id == "chief_0" - del os.environ["TF_CONFIG"] - - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "chief": ["host0:port"], - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "evaluator", "index": 0}, - } + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "evaluator", "index": 0}, + } + ), ) - worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + worker_id = get_worker_id_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) assert worker_id == "evaluator_0" - del os.environ["TF_CONFIG"] -def test_get_num_workers_from_tf_config(): - os.environ["TF_CONFIG"] = json.dumps( - { - "cluster": { - "worker": ["host1:port", "host2:port", "host3:port"], - "ps": ["host4:port", "host5:port"], - }, - "task": {"type": "worker", "index": 1}, - } +def test_get_num_workers_from_tf_config(monkeypatch): + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ), ) - num_workers = get_num_workers_from_tf_config(os.getenv("TF_CONFIG")) + num_workers = get_num_workers_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) assert num_workers == 3 - del os.environ["TF_CONFIG"] + monkeypatch.setenv( + "TF_CONFIG", + json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ), + ) + + num_workers = get_num_workers_from_tf_config(json.loads(os.getenv("TF_CONFIG"))) + assert num_workers == 4 - os.environ["TF_CONFIG"] = json.dumps( + +def test_get_chief(): + conf = json.dumps( { "cluster": { "chief": ["host0:port"], @@ -137,10 +165,7 @@ def test_get_num_workers_from_tf_config(): "task": {"type": "worker", "index": 1}, } ) - - num_workers = get_num_workers_from_tf_config(os.getenv("TF_CONFIG")) - assert num_workers == 4 - del os.environ["TF_CONFIG"] + assert get_chief_worker_from_tf_config(json.loads(conf)) == "chief_0" class TFOpMock: diff --git a/tests/zero_code_change/README.md b/tests/zero_code_change/README.md new file mode 100644 index 000000000..f484dff8b --- /dev/null +++ b/tests/zero_code_change/README.md @@ -0,0 +1,9 @@ +You can run the ZCC tests on DLAMI or DLC for supported containers as follows from the main folder of the repository: + +``` +PYTHONPATH=. python tests/zero_code_change/tensorflow_integration_tests.py +``` + +``` +pytest tests/zero_code_change/horovod/ +``` diff --git a/tests/zero_code_change/horovod/__init__.py b/tests/zero_code_change/horovod/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/zero_code_change/horovod/tensorflow/__init__.py b/tests/zero_code_change/horovod/tensorflow/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/zero_code_change/horovod/tensorflow/test_estimator.py b/tests/zero_code_change/horovod/tensorflow/test_estimator.py new file mode 100644 index 000000000..9f8c37e60 --- /dev/null +++ b/tests/zero_code_change/horovod/tensorflow/test_estimator.py @@ -0,0 +1,101 @@ +# Standard Library +import os + +# First Party +from smdebug.trials import create_trial + +# Local +from .utils import build_json, get_available_gpus, launch_horovod_job + +""" +Tested on current DLAMI p3.8xlarge when run from the main directory +""" + +HOROVOD_MNIST_SCRIPT_NAME = "horovod_estimator_mnist.py" +HOROVOD_MNIST_ARGS = ["--num_steps", "1000"] + + +def basic_test(out_dir, mode): + path = build_json(out_dir, include_workers="one", include_collections=["weights", "gradients"]) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", os.path.join(out_dir, "checkpoint")] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + + tr = create_trial(out_dir) + print(tr.tensor_names()) + assert len(tr.workers()) == 1 + assert len(tr.tensor_names()) == 13 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == 1 + + +def test_cpu(out_dir): + basic_test(out_dir, "cpu") + + +def test_gpu(out_dir): + basic_test(out_dir, "gpu") + + +def mode_allworkers(out_dir, mode): + path = build_json(out_dir, include_workers="all", include_collections=["weights", "gradients"]) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", os.path.join(out_dir, "checkpoint")] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + tr = create_trial(out_dir) + assert len(tr.workers()) == num_workers + assert len(tr.tensor_names()) == 13 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == num_workers + + +def test_cpu_allworkers(out_dir): + mode_allworkers(out_dir, "cpu") + + +def test_gpu_allworkers(out_dir): + mode_allworkers(out_dir, "gpu") + + +def mode_allworkers_saveall(out_dir, mode): + path = build_json( + out_dir, include_workers="all", save_all=True, include_collections=["weights", "gradients"] + ) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", os.path.join(out_dir, "checkpoint")] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + tr = create_trial(out_dir) + assert len(tr.workers()) == num_workers + assert len(tr.tensor_names()) > 99 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == num_workers + assert len(tr.tensor(tr.tensor_names(collection="losses")[0]).workers(0)) == num_workers + + +def test_gpu_allworkers_saveall(out_dir): + mode_allworkers_saveall(out_dir, "gpu") + + +def test_cpu_allworkers_saveall(out_dir): + mode_allworkers_saveall(out_dir, "cpu") diff --git a/tests/zero_code_change/horovod/tensorflow/test_keras.py b/tests/zero_code_change/horovod/tensorflow/test_keras.py new file mode 100644 index 000000000..395d39e6b --- /dev/null +++ b/tests/zero_code_change/horovod/tensorflow/test_keras.py @@ -0,0 +1,99 @@ +# First Party +from smdebug.trials import create_trial + +# Local +from .utils import build_json, get_available_gpus, launch_horovod_job + +""" +Tested on current DLAMI p3.8xlarge +""" + +HOROVOD_MNIST_SCRIPT_NAME = "horovod_keras_mnist.py" +HOROVOD_MNIST_ARGS = ["--num_epochs", "2"] + + +def basic_test(out_dir, mode): + path = build_json(out_dir, include_workers="one", include_collections=["weights", "gradients"]) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", out_dir] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + + tr = create_trial(out_dir) + print(tr.tensor_names()) + assert len(tr.workers()) == 1 + assert len(tr.tensor_names()) == 18 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == 1 + + +def test_cpu(out_dir): + basic_test(out_dir, "cpu") + + +def test_gpu(out_dir): + basic_test(out_dir, "gpu") + + +def mode_allworkers(out_dir, mode): + path = build_json(out_dir, include_workers="all", include_collections=["weights", "gradients"]) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", out_dir] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + tr = create_trial(out_dir) + assert len(tr.workers()) == num_workers + assert len(tr.tensor_names()) == 18 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == num_workers + + +def test_cpu_allworkers(out_dir): + mode_allworkers(out_dir, "cpu") + + +def test_gpu_allworkers(out_dir): + mode_allworkers(out_dir, "gpu") + + +def mode_allworkers_saveall(out_dir, mode): + path = build_json( + out_dir, include_workers="all", save_all=True, include_collections=["weights", "gradients"] + ) + num_workers = len(get_available_gpus()) + mode_args = list(HOROVOD_MNIST_ARGS) + ["--model_dir", out_dir] + if mode == "cpu": + mode_args += ["--use_only_cpu", "true"] + launch_horovod_job( + script_file_path=f"examples/tensorflow/sagemaker_official_container/{HOROVOD_MNIST_SCRIPT_NAME}", + script_args=mode_args, + num_workers=num_workers, + config_file_path=path, + mode=mode, + ) + tr = create_trial(out_dir) + print(tr.tensor_names()) + assert len(tr.workers()) == num_workers + assert len(tr.tensor_names()) > 20 + assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == num_workers + assert len(tr.tensor("loss").workers(0)) == num_workers + + +def test_gpu_allworkers_saveall(out_dir): + mode_allworkers_saveall(out_dir, "gpu") + + +def test_cpu_allworkers_saveall(out_dir): + mode_allworkers_saveall(out_dir, "cpu") diff --git a/tests/zero_code_change/horovod/tensorflow/utils.py b/tests/zero_code_change/horovod/tensorflow/utils.py new file mode 100644 index 000000000..dc02ea0af --- /dev/null +++ b/tests/zero_code_change/horovod/tensorflow/utils.py @@ -0,0 +1,44 @@ +# Standard Library +import json +import os +import subprocess +import sys +from pathlib import Path + +# Third Party +from tensorflow.python.client import device_lib + + +def get_available_gpus(): + local_device_protos = device_lib.list_local_devices() + return [x.name for x in local_device_protos if x.device_type == "GPU"] + + +def build_json(out_dir, include_workers="all", include_collections=None, path=None, save_all=False): + if include_collections is None: + include_collections = ["weights", "gradients"] + if path is None: + path = Path(out_dir).joinpath("config.json") + + config_dict = {} + config_dict["LocalPath"] = out_dir + config_dict["HookParameters"] = {"include_workers": include_workers, "save_all": save_all} + config_dict["CollectionConfigurations"] = [] + for ic in include_collections: + config_dict["CollectionConfigurations"].append({"CollectionName": ic}) + os.makedirs(out_dir, exist_ok=True) + with open(path.absolute(), "w") as outfile: + json.dump(config_dict, outfile) + return path.absolute() + + +def launch_horovod_job(script_file_path, script_args, num_workers, config_file_path, mode): + command = ( + ["horovodrun", "-np", str(num_workers)] + [sys.executable, script_file_path] + script_args + ) + env_dict = os.environ.copy() + env_dict["SMDEBUG_CONFIG_FILE_PATH"] = f"{config_file_path}" + env_dict["PYTHONPATH"] = "/home/ubuntu/sagemaker-debugger/" + if mode == "cpu": + env_dict["CUDA_VISIBLE_DEVICES"] = "-1" + subprocess.check_call(command, env=env_dict)