diff --git a/tests/zero_code_change/tests/__init__.py b/tests/zero_code_change/__init__.py similarity index 100% rename from tests/zero_code_change/tests/__init__.py rename to tests/zero_code_change/__init__.py diff --git a/tests/zero_code_change/pytorch_integration_tests.py b/tests/zero_code_change/pytorch_integration_tests.py index 90622ed89..e5e660c8e 100644 --- a/tests/zero_code_change/pytorch_integration_tests.py +++ b/tests/zero_code_change/pytorch_integration_tests.py @@ -21,7 +21,7 @@ from smdebug.core.utils import SagemakerSimulator, ScriptSimulator -def test_pytorch(script_mode: bool, use_loss_module=False): +def test_pytorch(script_mode: bool = False, use_loss_module=False): smd.del_hook() sim_class = ScriptSimulator if script_mode else SagemakerSimulator @@ -82,6 +82,10 @@ def test_pytorch(script_mode: bool, use_loss_module=False): ) +def test_pytorch_loss_module(script_mode: bool = False): + test_pytorch(script_mode=script_mode, use_loss_module=True) + + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( diff --git a/tests/zero_code_change/tensorflow_integration_tests.py b/tests/zero_code_change/tensorflow_integration_tests.py index bd0c57abc..4a1638358 100644 --- a/tests/zero_code_change/tensorflow_integration_tests.py +++ b/tests/zero_code_change/tensorflow_integration_tests.py @@ -22,7 +22,13 @@ import tensorflow_datasets as tfds from tests.tensorflow.hooks.test_mirrored_strategy import test_basic from tests.tensorflow.keras.test_keras_mirrored import test_tf_keras -from tf_utils import ( + +# First Party +import smdebug.tensorflow as smd +from smdebug.core.utils import SagemakerSimulator + +# Local +from .tf_utils import ( get_data, get_estimator, get_input_fns, @@ -31,12 +37,8 @@ get_train_op_and_placeholders, ) -# First Party -import smdebug.tensorflow as smd -from smdebug.core.utils import SagemakerSimulator - -def test_estimator(script_mode: bool): +def test_estimator(script_mode: bool = False): """ Works as intended. """ smd.del_hook() tf.reset_default_graph() @@ -134,7 +136,15 @@ def test_estimator_gradients_zcc(nested=False, mirrored=False): assert len(trial.modes()) == 2 -def test_linear_classifier(script_mode: bool): +def test_estimator_gradients_zcc_nested(): + test_estimator_gradients_zcc(nested=True) + + +def test_estimator_gradients_zcc_mirrored(): + test_estimator_gradients_zcc(nested=False, mirrored=True) + + +def test_linear_classifier(script_mode: bool = False): """ Works as intended. """ smd.del_hook() tf.reset_default_graph() @@ -160,11 +170,20 @@ def test_linear_classifier(script_mode: bool): assert len(trial.tensor_names()) > 0, "Tensors were not saved." -def test_monitored_session(script_mode: bool): +def test_monitored_session(script_mode: bool = False): """ Works as intended. """ smd.del_hook() tf.reset_default_graph() - with SagemakerSimulator() as sim: + json_file_contents = """ + { + "S3OutputPath": "s3://sagemaker-test", + "LocalPath": "/opt/ml/output/tensors", + "HookParameters" : { + "save_interval": "100" + } + } + """ + with SagemakerSimulator(json_file_contents=json_file_contents) as sim: train_op, X, Y = get_train_op_and_placeholders() init = tf.compat.v1.global_variables_initializer() mnist = get_data() @@ -195,6 +214,9 @@ def test_monitored_session_gradients_zcc(): { "S3OutputPath": "s3://sagemaker-test", "LocalPath": "/opt/ml/output/tensors", + "HookParameters" : { + "save_interval": "100" + }, "CollectionConfigurations": [ { "CollectionName": "gradients" @@ -227,7 +249,7 @@ def test_monitored_session_gradients_zcc(): assert len(trial.tensor_names(collection="gradients")) > 0 -def test_keras_v1(script_mode: bool): +def test_keras_v1(script_mode: bool = False): """ Works as intended. """ smd.del_hook() tf.reset_default_graph() @@ -258,7 +280,7 @@ def test_keras_v1(script_mode: bool): assert len(trial.tensor_names()) > 0, "Tensors were not saved." -def test_keras_gradients(script_mode: bool, tf_optimizer: bool = False): +def test_keras_gradients(script_mode: bool = False, tf_optimizer: bool = False): """ Works as intended. """ smd.del_hook() tf.reset_default_graph() @@ -320,6 +342,10 @@ def test_keras_gradients(script_mode: bool, tf_optimizer: bool = False): assert len(trial.tensor_names(collection="optimizer_variables")) > 0 +def test_keras_gradients_tf_opt(script_mode: bool = False): + test_keras_gradients(script_mode=script_mode, tf_optimizer=True) + + def test_keras_gradients_mirrored(include_workers="one"): """ Works as intended. """ smd.del_hook() @@ -366,7 +392,11 @@ def test_keras_gradients_mirrored(include_workers="one"): test_tf_keras("/opt/ml/output/tensors", zcc=True, include_workers=include_workers) -def test_keras_to_estimator(script_mode: bool): +def test_keras_gradients_mirrored_all_workers(): + test_keras_gradients_mirrored(include_workers="all") + + +def test_keras_to_estimator(script_mode: bool = False): """ Works as intended. """ import tensorflow.compat.v1.keras as keras @@ -426,14 +456,14 @@ def input_fn(): test_monitored_session_gradients_zcc() test_estimator(script_mode=script_mode) if not script_mode: - test_estimator_gradients_zcc(nested=True) - test_estimator_gradients_zcc(nested=False) - test_estimator_gradients_zcc(nested=False, mirrored=True) + test_estimator_gradients_zcc() + test_estimator_gradients_zcc_nested() + test_estimator_gradients_zcc_mirrored() test_linear_classifier(script_mode=script_mode) test_keras_v1(script_mode=script_mode) test_keras_gradients(script_mode=script_mode) - test_keras_gradients(script_mode=script_mode, tf_optimizer=True) + test_keras_gradients_tf_opt(script_mode=script_mode) test_keras_to_estimator(script_mode=script_mode) if not script_mode: - test_keras_gradients_mirrored(include_workers="all") + test_keras_gradients_mirrored_all_workers() test_keras_gradients_mirrored() diff --git a/tests/zero_code_change/tests/tensorflow/hooks/test_mirrored_strategy.py b/tests/zero_code_change/tests/tensorflow/hooks/test_mirrored_strategy.py deleted file mode 100644 index 25810d590..000000000 --- a/tests/zero_code_change/tests/tensorflow/hooks/test_mirrored_strategy.py +++ /dev/null @@ -1,493 +0,0 @@ -# Copyright 2016 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Convolutional Neural Network Estimator for MNIST, built with tf.layers.""" - -# Future -from __future__ import absolute_import, division, print_function - -# Third Party -import numpy as np -import pytest -import tensorflow as tf -from tensorflow.python.client import device_lib -from tests.tensorflow.utils import create_trial_fast_refresh - -# First Party -import smdebug.tensorflow as smd -from smdebug.core.collection import CollectionKeys -from smdebug.core.modes import ModeKeys -from smdebug.exceptions import TensorUnavailableForStep -from smdebug.tensorflow import get_hook - - -def cnn_model_fn(features, labels, mode): - """Model function for CNN.""" - # Input Layer - # Reshape X to 4-D tensor: [batch_size, width, height, channels] - # MNIST images are 28x28 pixels, and have one color channel - input_layer = tf.reshape(features["x"], [-1, 28, 28, 1]) - - # Convolutional Layer #1 - # Computes 32 features using a 5x5 filter with ReLU activation. - # Padding is added to preserve width and height. - # Input Tensor Shape: [batch_size, 28, 28, 1] - # Output Tensor Shape: [batch_size, 28, 28, 32] - conv1 = tf.layers.conv2d( - inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same", activation=tf.nn.relu - ) - - # Pooling Layer #1 - # First max pooling layer with a 2x2 filter and stride of 2 - # Input Tensor Shape: [batch_size, 28, 28, 32] - # Output Tensor Shape: [batch_size, 14, 14, 32] - pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) - - # Convolutional Layer #2 - # Computes 64 features using a 5x5 filter. - # Padding is added to preserve width and height. - # Input Tensor Shape: [batch_size, 14, 14, 32] - # Output Tensor Shape: [batch_size, 14, 14, 64] - conv2 = tf.layers.conv2d( - inputs=pool1, filters=64, kernel_size=[5, 5], padding="same", activation=tf.nn.relu - ) - - # Pooling Layer #2 - # Second max pooling layer with a 2x2 filter and stride of 2 - # Input Tensor Shape: [batch_size, 14, 14, 64] - # Output Tensor Shape: [batch_size, 7, 7, 64] - pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) - - # Flatten tensor into a batch of vectors - # Input Tensor Shape: [batch_size, 7, 7, 64] - # Output Tensor Shape: [batch_size, 7 * 7 * 64] - pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) - - # Dense Layer - # Densely connected layer with 1024 neurons - # Input Tensor Shape: [batch_size, 7 * 7 * 64] - # Output Tensor Shape: [batch_size, 1024] - dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu) - - # Add dropout operation; 0.6 probability that element will be kept - dropout = tf.layers.dropout( - inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN - ) - - # Logits layer - # Input Tensor Shape: [batch_size, 1024] - # Output Tensor Shape: [batch_size, 10] - logits = tf.layers.dense(inputs=dropout, units=10) - - predictions = { - # Generate predictions (for PREDICT and EVAL mode) - "classes": tf.argmax(input=logits, axis=1), - # Add `softmax_tensor` to the graph. It is used for PREDICT and by the - # `logging_hook`. - "probabilities": tf.nn.softmax(logits, name="softmax_tensor"), - } - if mode == tf.estimator.ModeKeys.PREDICT: - return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) - - # Calculate Loss (for both TRAIN and EVAL modes) - loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits) - - # Configure the Training Op (for TRAIN mode) - if mode == tf.estimator.ModeKeys.TRAIN: - optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) - optimizer = smd.get_hook().wrap_optimizer(optimizer) - train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step()) - return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) - - # Add evaluation metrics (for EVAL mode) - eval_metric_ops = { - "accuracy": tf.metrics.accuracy(labels=labels, predictions=predictions["classes"]) - } - return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) - - -def per_device_batch_size(batch_size, num_gpus): - """For multi-gpu, batch-size must be a multiple of the number of GPUs. - Note that this should eventually be handled by DistributionStrategies - directly. Multi-GPU support is currently experimental, however, - so doing the work here until that feature is in place. - Args: - batch_size: Global batch size to be divided among devices. This should be - equal to num_gpus times the single-GPU batch_size for multi-gpu training. - num_gpus: How many GPUs are used with DistributionStrategies. - Returns: - Batch size per device. - Raises: - ValueError: if batch_size is not divisible by number of devices - """ - if num_gpus <= 1: - return batch_size - - remainder = batch_size % num_gpus - if remainder: - err = ( - "When running with multiple GPUs, batch size " - "must be a multiple of the number of available GPUs. Found {} " - "GPUs with a batch size of {}; try --batch_size={} instead." - ).format(num_gpus, batch_size, batch_size - remainder) - raise ValueError(err) - return int(batch_size / num_gpus) - - -class InputFnProvider: - def __init__(self, train_batch_size): - self.train_batch_size = train_batch_size - self.__load_data() - - def __load_data(self): - # Load training and eval data - mnist = tf.contrib.learn.datasets.load_dataset("mnist") - self.train_data = mnist.train.images # Returns np.array - self.train_labels = np.asarray(mnist.train.labels, dtype=np.int32) - self.eval_data = mnist.test.images # Returns np.array - self.eval_labels = np.asarray(mnist.test.labels, dtype=np.int32) - - def train_input_fn(self): - """An input function for training""" - # Shuffle, repeat, and batch the examples. - dataset = tf.data.Dataset.from_tensor_slices(({"x": self.train_data}, self.train_labels)) - dataset = dataset.shuffle(1000).repeat().batch(self.train_batch_size) - return dataset - - def eval_input_fn(self): - """An input function for evaluation or prediction""" - dataset = tf.data.Dataset.from_tensor_slices(({"x": self.eval_data}, self.eval_labels)) - dataset = dataset.batch(1).repeat() - return dataset - - -def get_available_gpus(): - local_device_protos = device_lib.list_local_devices() - return len([x.name for x in local_device_protos if x.device_type == "GPU"]) - - -def helper_mirrored( - trial_dir, - save_all=False, - num_steps=3, - save_config=None, - reduction_config=None, - include_collections=None, - steps=None, - zcc=False, - eval_distributed=False, - include_workers="all", -): - num_gpus = get_available_gpus() - num_devices = num_gpus if num_gpus > 0 else 1 - batch_size = 10 * num_devices - - # input_fn which serves Dataset - input_fn_provider = InputFnProvider(per_device_batch_size(batch_size, num_devices)) - - # Use multiple GPUs by MirroredStragtegy. - # All avaiable GPUs will be used if `num_gpus` is omitted. - # if num_devices > 1: - distribution = tf.contrib.distribute.MirroredStrategy() - # print("### Doing Multi GPU Training") - # else: - # distribution = None - # Pass to RunConfig - config = tf.estimator.RunConfig( - train_distribute=distribution, - eval_distribute=distribution if eval_distributed else None, - model_dir="/tmp/mnist_convnet_model", - ) - - if save_config is None: - save_config = smd.SaveConfig(save_interval=2) - - if include_collections is None: - include_collections = [ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.GRADIENTS, - CollectionKeys.LOSSES, - ] - - if not zcc: - ts_hook = smd.SessionHook( - out_dir=trial_dir, - save_all=save_all, - include_collections=include_collections, - save_config=save_config, - reduction_config=reduction_config, - include_workers=include_workers, - ) - else: - print("zcc is passed. ignoring include_collections and save_config") - - mnist_classifier = tf.estimator.Estimator(model_fn=cnn_model_fn, config=config) - if steps is None: - steps = ["train"] - - for s in steps: - if s == "train": - print("Starting train") - if not zcc: - ts_hook.set_mode(smd.modes.TRAIN) - # Train the model - mnist_classifier.train( - input_fn=input_fn_provider.train_input_fn, steps=num_steps, hooks=[ts_hook] - ) - else: - mnist_classifier.train(input_fn=input_fn_provider.train_input_fn, steps=num_steps) - elif s == "eval": - print("Starting eval") - - if not zcc: - ts_hook.set_mode(smd.modes.EVAL) - # Evaluate the model and print results - mnist_classifier.evaluate( - input_fn=input_fn_provider.eval_input_fn, steps=num_steps, hooks=[ts_hook] - ) - else: - mnist_classifier.evaluate(input_fn=input_fn_provider.eval_input_fn, steps=num_steps) - elif s == "predict": - print("Starting predict") - if not zcc: - ts_hook.set_mode(smd.modes.PREDICT) - # Evaluate the model and print results - p = mnist_classifier.predict( - input_fn=input_fn_provider.eval_input_fn, hooks=[ts_hook] - ) - else: - p = mnist_classifier.predict(input_fn=input_fn_provider.eval_input_fn) - for i in range(num_steps): - next(p) - get_hook()._cleanup() - return distribution - - -def skip_trial_check(): - # Skip trial check as in this case SMDebug is disabled for mirrored strategy - # trial will not be loaded - import tensorflow as tf - from packaging import version - - if version.parse(tf.__version__) < version.parse("1.14.0"): - return True - else: - return False - - -@pytest.mark.slow -def test_basic(out_dir, zcc=False): - strategy = helper_mirrored( - out_dir, - steps=["train", "eval", "predict", "train"], - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.GRADIENTS, - CollectionKeys.LOSSES, - ], - eval_distributed=False, - zcc=zcc, - ) - if skip_trial_check(): - return - - tr = create_trial_fast_refresh(out_dir) - # wts, grads, losses - print(tr.tensor_names()) - assert len(tr.tensor_names()) == 8 + 8 + (1 * strategy.num_replicas_in_sync) + 1 - assert len(tr.steps()) == 7 - assert len(tr.steps(ModeKeys.TRAIN)) == 3 - assert len(tr.steps(ModeKeys.EVAL)) == 2 - assert len(tr.steps(ModeKeys.PREDICT)) == 2 - - assert "dense_1/kernel:0" in tr.tensor_names(collection="weights") - for tname in tr.tensor_names(collection="weights"): - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).workers(s, ModeKeys.TRAIN)) == strategy.num_replicas_in_sync - for worker in tr.tensor(tname).workers(s, ModeKeys.TRAIN): - assert tr.tensor(tname).value(s, worker=worker, mode=ModeKeys.TRAIN) is not None - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == strategy.num_replicas_in_sync - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - - tensornames = tr.tensor_names(regex="Identity_\d+:0") - for s in tr.tensor(tensornames[0]).steps(ModeKeys.TRAIN): - for w in tr.tensor(tensornames[0]).workers(s, ModeKeys.TRAIN): - assert tr.tensor(tensornames[0]).value(s, worker=w, mode=ModeKeys.TRAIN) is not None - assert ( - len(tr.tensor(tensornames[0]).workers(s, ModeKeys.TRAIN)) - == strategy.num_replicas_in_sync - ) - - for tname in tr.tensor_names(collection="losses"): - if tname != tensornames[0]: - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).workers(s, ModeKeys.TRAIN)) == 1 - assert tr.tensor(tname).value(s, mode=ModeKeys.TRAIN) is not None - - tname = "sparse_softmax_cross_entropy_loss/value:0" - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == strategy.num_replicas_in_sync - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - - -@pytest.mark.slow -def test_eval_distributed(out_dir): - strategy = helper_mirrored( - out_dir, - steps=["train", "eval"], - include_collections=[CollectionKeys.WEIGHTS, CollectionKeys.BIASES, CollectionKeys.LOSSES], - eval_distributed=True, - ) - if skip_trial_check(): - return - tr = create_trial_fast_refresh(out_dir) - assert len(tr.tensor_names()) == 8 + 1 * strategy.num_replicas_in_sync + 1 - assert len(tr.steps()) == 4 - assert len(tr.steps(ModeKeys.TRAIN)) == 2 - assert len(tr.steps(ModeKeys.EVAL)) == 2 - - for tname in tr.tensor_names(collection="weights"): - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).workers(s, ModeKeys.TRAIN)) == strategy.num_replicas_in_sync - for worker in tr.tensor(tname).workers(s, ModeKeys.TRAIN): - assert tr.tensor(tname).value(s, worker=worker, mode=ModeKeys.TRAIN) is not None - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == strategy.num_replicas_in_sync - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - - tensornames = tr.tensor_names(regex="Identity_\d+:0") - for s in tr.tensor(tensornames[0]).steps(ModeKeys.TRAIN): - for w in tr.tensor(tensornames[0]).workers(s, ModeKeys.TRAIN): - assert tr.tensor(tensornames[0]).value(s, worker=w, mode=ModeKeys.TRAIN) is not None - assert ( - len(tr.tensor(tensornames[0]).workers(s, ModeKeys.TRAIN)) - == strategy.num_replicas_in_sync - ) - - for tname in tr.tensor_names(collection="losses"): - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == 1 - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - if tname != tensornames[0]: - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).workers(s, ModeKeys.EVAL)) == 1 - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - - -@pytest.mark.slow -def test_reductions(out_dir): - strategy = helper_mirrored( - out_dir, - steps=["train", "eval"], - reduction_config=smd.ReductionConfig( - reductions=["sum", "max"], abs_reductions=["sum", "max"], norms=["l1"] - ), - include_collections=[CollectionKeys.WEIGHTS, CollectionKeys.BIASES, CollectionKeys.LOSSES], - eval_distributed=True, - ) - if skip_trial_check(): - return - - tr = create_trial_fast_refresh(out_dir) - assert len(tr.tensor_names()) == 8 + 1 * strategy.num_replicas_in_sync + 1 - assert len(tr.steps()) == 4 - assert len(tr.steps(ModeKeys.TRAIN)) == 2 - assert len(tr.steps(ModeKeys.EVAL)) == 2 - - for tname in tr.tensor_names(collection="weights"): - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - try: - tr.tensor(tname).value(s, mode=ModeKeys.TRAIN) - assert False - except TensorUnavailableForStep: - # for some tensors l1 reduction can't be saved due to improper dimensions for the reduction - assert len(tr.tensor(tname).reduction_values(s, mode=ModeKeys.TRAIN)) >= 4 - - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - try: - tr.tensor(tname).value(s, mode=ModeKeys.EVAL) - assert False - except TensorUnavailableForStep: - # for some tensors l1 reduction can't be saved due to improper dimensions for the reduction - assert len(tr.tensor(tname).reduction_values(s, mode=ModeKeys.EVAL)) >= 4 - - for tname in tr.tensor_names(collection="losses"): - for s in tr.tensor(tname).steps(ModeKeys.EVAL): - assert len(tr.tensor(tname).reduction_values(s, mode=ModeKeys.EVAL)) == 0 - assert tr.tensor(tname).value(s, mode=ModeKeys.EVAL) is not None - - for tname in tr.tensor_names(collection="losses"): - for s in tr.tensor(tname).steps(ModeKeys.TRAIN): - assert len(tr.tensor(tname).reduction_values(s, mode=ModeKeys.TRAIN)) == 0 - assert tr.tensor(tname).value(s, mode=ModeKeys.TRAIN) is not None - - -@pytest.mark.slow -def test_save_all(out_dir): - strategy = helper_mirrored( - out_dir, steps=["train"], num_steps=1, save_all=True, eval_distributed=True - ) - if skip_trial_check(): - return - tr = create_trial_fast_refresh(out_dir) - assert len(tr.tensor_names()) > 100 - assert len(tr.steps()) - assert len(tr.tensor_names(collection="weights")) - assert len(tr.tensor_names(collection="biases")) - assert len(tr.tensor_names(collection="gradients")) - - -@pytest.mark.slow -def test_save_all_worker(out_dir): - # skip test if no gpus available - if get_available_gpus() == 0: - return - strategy = helper_mirrored( - out_dir, - steps=["train"], - num_steps=1, - save_all=True, - eval_distributed=True, - include_workers="all", - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.steps()) - assert len(tr.workers()) == get_available_gpus() - assert len(tr.tensor_names(collection="weights")) - assert "conv2d/kernel:0" in tr.tensor_names(collection="weights") - assert len(tr.tensor("conv2d/kernel:0").workers(0)) == strategy.num_replicas_in_sync - assert len(tr.tensor_names(collection="biases")) - assert "conv2d/bias:0" in tr.tensor_names(collection="biases") - assert len(tr.tensor("conv2d/bias:0").workers(0)) == strategy.num_replicas_in_sync - assert len(tr.tensor_names(collection="gradients")) - - -@pytest.mark.slow -def test_save_one_worker(out_dir): - strategy = helper_mirrored( - out_dir, - steps=["train"], - num_steps=1, - save_all=True, - eval_distributed=True, - include_workers="one", - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.workers()) == 1 - assert len(tr.steps()) - assert len(tr.tensor_names(collection="weights")) - assert len(tr.tensor_names(collection="biases")) - assert len(tr.tensor_names(collection="gradients")) diff --git a/tests/zero_code_change/tests/tensorflow/keras/test_keras_mirrored.py b/tests/zero_code_change/tests/tensorflow/keras/test_keras_mirrored.py deleted file mode 100644 index 78178c364..000000000 --- a/tests/zero_code_change/tests/tensorflow/keras/test_keras_mirrored.py +++ /dev/null @@ -1,560 +0,0 @@ -# Future -from __future__ import absolute_import, division, print_function, unicode_literals - -# Standard Library -import os - -# Third Party -import pytest -import tensorflow as tf -import tensorflow_datasets as tfds -from tensorflow.python.client import device_lib -from tests.tensorflow.utils import create_trial_fast_refresh - -# First Party -import smdebug.tensorflow as smd -from smdebug.core.access_layer import has_training_ended -from smdebug.core.collection import CollectionKeys -from smdebug.core.modes import ModeKeys -from smdebug.core.reduction_config import ALLOWED_NORMS, ALLOWED_REDUCTIONS -from smdebug.exceptions import TensorUnavailable, TensorUnavailableForStep -from smdebug.tensorflow import ReductionConfig, SaveConfig -from smdebug.tensorflow.keras import KerasHook - -tfds.disable_progress_bar() - - -class FetchTensorCallback(tf.keras.callbacks.Callback): - def __init__(self, tensors): - self.tensors = tensors - self.fetches_added = False - - def _callback_fn(self, tensor_val): - assert tensor_val is not None - - def on_train_batch_begin(self, batch, logs): - try: - from tensorflow.python.keras.distribute.distributed_training_utils import ( - get_distributed_model, - ) - from tensorflow.python.keras.utils.mode_keys import ModeKeys as KerasModeKeys - - for t in self.tensors: - x = get_distributed_model(self.model, KerasModeKeys.TRAIN)._distributed_function - x.fetches.append(t) - x.fetch_callbacks[t] = self._callback_fn - self.fetches_added = True - except ImportError: - pass - - def on_train_batch_end(self, batch, logs): - if self.fetches_added: - # these should only be added if these were available above - from tensorflow.python.keras.distribute.distributed_training_utils import ( - get_distributed_model, - ) - from tensorflow.python.keras.utils.mode_keys import ModeKeys as KerasModeKeys - - for t in self.tensors: - x = get_distributed_model(self.model, KerasModeKeys.TRAIN)._distributed_function - x.fetches.remove(t) - del x.fetch_callbacks[t] - self.fetches_added = False - - -def get_available_gpus(): - local_device_protos = device_lib.list_local_devices() - return len([x.name for x in local_device_protos if x.device_type == "GPU"]) - - -def train_model( - trial_dir, - save_all=False, - hook=None, - include_collections=None, - reduction_config=None, - save_config=None, - use_keras_optimizer=True, - eager=False, - create_relu_collection=False, - strategy=None, - steps=None, - add_callbacks=None, - zcc=False, - include_workers="all", -): - print(tf.__version__) - tf.keras.backend.clear_session() - - datasets, info = tfds.load(name="mnist", with_info=True, as_supervised=True) - - mnist_train, mnist_test = datasets["train"], datasets["test"] - - if strategy is None: - strategy = tf.distribute.MirroredStrategy() - - # You can also do info.splits.total_num_examples to get the total - # number of examples in the dataset. - - BUFFER_SIZE = 10000 - - BATCH_SIZE_PER_REPLICA = 64 - BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync - - def scale(image, label): - image = tf.cast(image, tf.float32) - image /= 255 - - return image, label - - train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) - eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE) - - if hook is None and not zcc: - if save_config is None: - save_config = SaveConfig(save_interval=3) - - hook = KerasHook( - out_dir=trial_dir, - save_config=save_config, - reduction_config=reduction_config, - include_collections=include_collections, - save_all=save_all, - include_workers=include_workers, - ) - - if not save_all and include_collections is not None: - for cname in hook.include_collections: - if cname not in include_collections: - hook.get_collection(cname).save_config = SaveConfig(end_step=0) - - if use_keras_optimizer: - opt = tf.keras.optimizers.Adam() - else: - opt = tf.train.AdamOptimizer(0.1) - - if not zcc: - opt = hook.wrap_optimizer(opt) - - with strategy.scope(): - relu_layer = tf.keras.layers.Dense(64, activation="relu") - model = tf.keras.Sequential( - [ - tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(28, 28, 1)), - tf.keras.layers.MaxPooling2D(), - tf.keras.layers.Flatten(), - relu_layer, - tf.keras.layers.Dense(10, activation="softmax"), - ] - ) - model.compile( - loss="sparse_categorical_crossentropy", - optimizer=opt, - run_eagerly=eager, - metrics=["accuracy"], - ) - - if create_relu_collection: - hook.get_collection("relu").add_keras_layer(relu_layer, inputs=True, outputs=True) - - hooks = [] - if add_callbacks: - if "tensorboard" in add_callbacks: - hooks.append( - # write_grads = True causes crash saying handle must be created in scope - # erorr like this https://stackoverflow.com/questions/56836895/custom-training-loop-using-tensorflow-gpu-1-14-and-tf-distribute-mirroredstrateg - # this crash is even if callback is off - tf.keras.callbacks.TensorBoard( - log_dir="/tmp/logs", histogram_freq=4, write_images=True - ) - ) - if "fetch_tensor" in add_callbacks: - hooks.append(FetchTensorCallback(model.weights)) - if not zcc: - hooks.append(hook) - - if steps is None: - steps = ["train"] - for step in steps: - if step == "train": - model.fit(train_dataset, epochs=1, steps_per_epoch=10, callbacks=hooks, verbose=0) - elif step == "eval": - model.evaluate(eval_dataset, steps=10, callbacks=hooks, verbose=0) - elif step == "predict": - model.predict(train_dataset, steps=4, callbacks=hooks, verbose=0) - - smd.get_hook()._cleanup() - return strategy - - -@pytest.mark.skip( - "needs to be run individually as it complains that eager " - "needs to be set at startup, but pytest " - "does not allow controlling order of tests" -) -def test_tf_keras_eager(out_dir): - tf.enable_eager_execution() - train_model(out_dir, eager=True, steps=["train"]) - tf.disable_eager_execution() - - -@pytest.mark.skip( - "needs to be run individually as it complains that eager " - "needs to be set at startup, but pytest " - "does not allow controlling order of tests" -) -def test_tf_keras_eager_env(out_dir): - tf.enable_eager_execution() - train_model(out_dir, eager=False, steps=["train"]) - tf.disable_eager_execution() - - -def exhaustive_check(trial_dir, zcc=False, include_workers="one"): - include_collections = [ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.GRADIENTS, - CollectionKeys.LOSSES, - CollectionKeys.OUTPUTS, - CollectionKeys.METRICS, - CollectionKeys.OPTIMIZER_VARIABLES, - ] - strategy = train_model( - trial_dir, - include_collections=include_collections, - steps=["train", "eval", "predict", "train"], - include_workers=include_workers, - zcc=zcc, - ) - - tr = create_trial_fast_refresh(trial_dir) - print(tr.tensor_names()) - - if include_workers == "all": - assert len(tr.workers()) == strategy.num_replicas_in_sync - assert len(tr.tensor_names()) == (6 + 6 + 1 + 3 + strategy.num_replicas_in_sync * 3 + 5) - else: - assert len(tr.workers()) == 1 - assert len(tr.tensor_names()) == (6 + 6 + 1 + 3 + 1 * 3 + 5) - - # 6 weights, 6 gradients, 1 loss, 3 metrics, 24 outputs (8 for each mode), 5 optimizer variables - assert len(tr.modes()) == 3 - assert len(tr.steps()) == 14 - assert len(tr.steps(ModeKeys.TRAIN)) == 8 # 0, 3, 6, 9, 12, 15, 18, 19(end of epoch) - assert len(tr.steps(ModeKeys.EVAL)) == 4 - assert len(tr.steps(ModeKeys.PREDICT)) == 2 # ran 4 steps above - - assert len(tr.tensor_names(collection=CollectionKeys.BIASES)) == 3 - wtnames = tr.tensor_names(collection=CollectionKeys.WEIGHTS) - assert len(wtnames) == 3 - - for wtname in wtnames: - assert len(tr.tensor(wtname).steps()) == 13, wtname - assert len(tr.tensor(wtname).steps(ModeKeys.TRAIN)) == 7 - for s in tr.tensor(wtname).steps(ModeKeys.TRAIN): - assert tr.tensor(wtname).value(s, mode=ModeKeys.TRAIN) is not None - for worker in tr.workers(): - assert tr.tensor(wtname).value(s, mode=ModeKeys.TRAIN, worker=worker) is not None - assert len(tr.tensor(wtname).steps(ModeKeys.EVAL)) == 4 - for s in tr.tensor(wtname).steps(ModeKeys.EVAL): - assert tr.tensor(wtname).value(s, mode=ModeKeys.EVAL) is not None - for worker in tr.workers(): - assert tr.tensor(wtname).value(s, mode=ModeKeys.EVAL, worker=worker) is not None - assert len(tr.tensor(wtname).steps(ModeKeys.PREDICT)) == 2 - - gradnames = tr.tensor_names(collection=CollectionKeys.GRADIENTS) - assert len(gradnames) == 6 - for gradname in gradnames: - assert len(tr.tensor(gradname).steps(ModeKeys.TRAIN)) == 7 - for s in tr.tensor(gradname).steps(ModeKeys.TRAIN): - assert tr.tensor(gradname).value(s, mode=ModeKeys.TRAIN) is not None - assert len(tr.tensor(gradname).steps(ModeKeys.EVAL)) == 0 - assert len(tr.tensor(gradname).steps(ModeKeys.PREDICT)) == 0 - - optvarnames = tr.tensor_names(collection=CollectionKeys.OPTIMIZER_VARIABLES) - assert len(optvarnames) == 5 - for optvarname in optvarnames: - assert len(tr.tensor(optvarname).steps(ModeKeys.TRAIN)) == 7 - for s in tr.tensor(optvarname).steps(ModeKeys.TRAIN): - assert tr.tensor(optvarname).value(s, mode=ModeKeys.TRAIN) is not None - assert len(tr.tensor(optvarname).steps(ModeKeys.EVAL)) == 0 - assert len(tr.tensor(optvarname).steps(ModeKeys.PREDICT)) == 0 - - assert len(tr.tensor_names(collection=CollectionKeys.LOSSES)) == 1 - loss_name = tr.tensor_names(collection=CollectionKeys.LOSSES)[0] - # loss is not in predict mode (so less 2) - # add one for end of epoch - assert len(tr.tensor(loss_name).steps(ModeKeys.TRAIN)) == 8 - assert len(tr.tensor(loss_name).steps(ModeKeys.EVAL)) == 4 - assert len(tr.tensor(loss_name).steps(ModeKeys.PREDICT)) == 0 - assert len(tr.tensor(loss_name).steps()) == 12 - - metricnames = tr.tensor_names(collection=CollectionKeys.METRICS) - assert len(metricnames) == 3 - - -@pytest.mark.slow -def test_tf_keras(out_dir, zcc=False, include_workers="all"): - exhaustive_check(out_dir, zcc=zcc, include_workers=include_workers) - - -@pytest.mark.slow -def test_tf_keras_non_keras_opt(out_dir): - include_collections = [ - CollectionKeys.GRADIENTS, - CollectionKeys.OPTIMIZER_VARIABLES, - CollectionKeys.METRICS, - ] - train_model( - out_dir, - include_collections=include_collections, - use_keras_optimizer=False, - steps=["train", "eval"], - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.modes()) == 2 - assert len(tr.steps(ModeKeys.TRAIN)) == 4 # 0, 3, 6, 9 - assert len(tr.tensor_names(collection=CollectionKeys.GRADIENTS)) == 6 - gradient_name = tr.tensor_names(collection=CollectionKeys.GRADIENTS)[0] - assert len(tr.tensor(gradient_name).steps(ModeKeys.TRAIN)) == 4 - assert len(tr.tensor(gradient_name).steps(ModeKeys.EVAL)) == 0 - - # not supported for non keras optimizer with keras - assert len(tr.tensor_names(collection=CollectionKeys.OPTIMIZER_VARIABLES)) == 0 - - -@pytest.mark.slow -def test_save_all(out_dir): - strategy = train_model( - out_dir, - include_collections=None, - save_all=True, - save_config=SaveConfig(save_steps=[5]), - steps=["train"], - ) - tr = create_trial_fast_refresh(out_dir) - print(tr.tensor_names()) - assert ( - len(tr.tensor_names()) - == 6 + 6 + 5 + 3 + 1 + 3 * strategy.num_replicas_in_sync + 2 * strategy.num_replicas_in_sync - ) - # weights, grads, optimizer_variables, metrics, losses, outputs - assert len(tr.steps()) == 3 - - -@pytest.mark.slow -def test_save_one_worker(out_dir): - strategy = train_model( - out_dir, - include_collections=None, - save_all=True, - save_config=SaveConfig(save_steps=[5]), - steps=["train"], - include_workers="one", - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.workers()) == 1 - assert len(tr.steps()) - assert len(tr.tensor_names(collection="weights")) - assert len(tr.tensor_names(collection="weights")) - assert len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) == 1 - assert len(tr.tensor_names(collection="biases")) - assert len(tr.tensor(tr.tensor_names(collection="biases")[0]).workers(0)) == 1 - assert len(tr.tensor_names(collection="gradients")) - - -@pytest.mark.slow -def test_save_all_workers(out_dir, zcc=False): - # Skip if no GPUS - if get_available_gpus() == 0: - return - strategy = train_model( - out_dir, - include_collections=None, - save_all=True, - save_config=SaveConfig(save_steps=[5]), - steps=["train"], - include_workers="all", - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.workers()) == get_available_gpus() - assert len(tr.tensor_names(collection="weights")) - assert ( - len(tr.tensor(tr.tensor_names(collection="weights")[0]).workers(0)) - == strategy.num_replicas_in_sync - ) - - assert "conv2d/weights/conv2d/kernel:0" in tr.tensor_names(collection="weights") - assert ( - len(tr.tensor("conv2d/weights/conv2d/kernel:0").workers(0)) == strategy.num_replicas_in_sync - ) - - assert len(tr.tensor_names(collection="biases")) - assert "conv2d/weights/conv2d/bias:0" in tr.tensor_names(collection="biases") - assert ( - len(tr.tensor(tr.tensor_names(collection="biases")[0]).workers(0)) - == strategy.num_replicas_in_sync - ) - assert len(tr.tensor_names(collection="gradients")) - - -@pytest.mark.slow -def test_base_reductions(out_dir): - train_model( - out_dir, - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.METRICS, - CollectionKeys.LOSSES, - ], - reduction_config=ReductionConfig(norms=ALLOWED_NORMS, reductions=ALLOWED_REDUCTIONS), - steps=["train"], - ) - - tr = create_trial_fast_refresh(out_dir) - weight_name = tr.tensor_names(collection=CollectionKeys.WEIGHTS)[0] - - try: - tr.tensor(weight_name).value(0) - assert False - except TensorUnavailableForStep: - assert tr.tensor(weight_name).reduction_values(0) - - loss_name = tr.tensor_names(collection=CollectionKeys.LOSSES)[0] - assert tr.tensor(loss_name).value(0) is not None - - metric_name = tr.tensor_names(collection=CollectionKeys.METRICS)[0] - assert tr.tensor(metric_name).value(0) is not None - - -@pytest.mark.slow -def test_collection_reductions(out_dir): - tf.reset_default_graph() - tf.keras.backend.clear_session() - hook = KerasHook( - out_dir=out_dir, - save_config=SaveConfig(save_interval=3), - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.GRADIENTS, - ], - ) - hook.get_collection(CollectionKeys.GRADIENTS).reduction_config = ReductionConfig(norms=["l1"]) - train_model(out_dir, hook=hook, steps=["train"]) - - tr = create_trial_fast_refresh(out_dir) - weight_name = tr.tensor_names(collection=CollectionKeys.WEIGHTS)[0] - grad_name = tr.tensor_names(collection=CollectionKeys.GRADIENTS)[0] - - try: - tr.tensor(weight_name).value(0) - tr.tensor(grad_name).value(0) - assert False - except TensorUnavailableForStep: - try: - assert tr.tensor(weight_name).reduction_value(0, "l1") is not None - except ValueError: - # some tensors reduction can't be computed - pass - except TensorUnavailable: - # sometimes we might not have tensor saved if it was only being - # saved as reduction and the reduction computation failed - pass - - -@pytest.mark.slow -def test_training_end(out_dir): - train_model(out_dir, include_collections=[CollectionKeys.OUTPUTS], steps=["train"]) - assert has_training_ended(out_dir) is True - - -@pytest.mark.slow -def test_collection_add(out_dir): - strategy = train_model( - out_dir, - include_collections=["relu"], - save_config=SaveConfig(save_interval=9), - create_relu_collection=True, - steps=["train"], - ) - - tr = create_trial_fast_refresh(out_dir) - relu_coll_tensor_names = tr.tensor_names(collection="relu") - - assert len(relu_coll_tensor_names) == strategy.num_replicas_in_sync * 2 - assert tr.tensor(relu_coll_tensor_names[0]).value(0) is not None - assert tr.tensor(relu_coll_tensor_names[1]).value(0) is not None - - -@pytest.mark.slow -def test_include_regex(out_dir): - hook = KerasHook( - out_dir=out_dir, - save_config=SaveConfig(save_interval=9), - include_collections=["custom_coll"], - include_workers="all", - ) - hook.get_collection("custom_coll").include("dense") - strategy = train_model(out_dir, hook=hook, steps=["train"]) - - tr = create_trial_fast_refresh(out_dir) - tnames = tr.tensor_names(collection="custom_coll") - - assert len(tnames) == 4 + 3 * strategy.num_replicas_in_sync - for tname in tnames: - assert tr.tensor(tname).value(0) is not None - - -@pytest.mark.slow -def test_clash_with_tb_callback(out_dir): - train_model( - out_dir, - save_config=SaveConfig(save_interval=9), - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.GRADIENTS, - CollectionKeys.LOSSES, - CollectionKeys.METRICS, - ], - steps=["train"], - add_callbacks=["tensorboard"], - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.tensor_names()) == 16 - - -@pytest.mark.slow -def test_clash_with_custom_callback(out_dir): - strategy = train_model( - out_dir, - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.OUTPUTS, - CollectionKeys.GRADIENTS, - ], - save_config=SaveConfig(save_interval=9), - steps=["train"], - add_callbacks=["fetch_tensor"], - ) - tr = create_trial_fast_refresh(out_dir) - assert len(tr.tensor_names()) == 6 + 6 + strategy.num_replicas_in_sync * 1 + 3 - - -def test_one_device(out_dir): - strategy = train_model( - out_dir, - include_collections=[ - CollectionKeys.WEIGHTS, - CollectionKeys.BIASES, - CollectionKeys.OUTPUTS, - CollectionKeys.GRADIENTS, - ], - save_config=SaveConfig(save_interval=9), - strategy=tf.distribute.OneDeviceStrategy(device="/cpu:0"), - steps=["train"], - ) - assert os.path.isdir(os.path.join(out_dir, "events")) is False diff --git a/tests/zero_code_change/tests/tensorflow/utils.py b/tests/zero_code_change/tests/tensorflow/utils.py deleted file mode 100644 index e8e3d8af6..000000000 --- a/tests/zero_code_change/tests/tensorflow/utils.py +++ /dev/null @@ -1,8 +0,0 @@ -# First Party -from smdebug.trials import create_trial - - -def create_trial_fast_refresh(path, **kwargs): - tr = create_trial(path, **kwargs) - tr.training_end_delay_refresh = 0.01 - return tr