-
Notifications
You must be signed in to change notification settings - Fork 83
Function to Test If the hook has been configured with the Default hook config #332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
70b927d
63ee466
171522a
1865f27
82f2bd6
5762c53
e83e126
63a9cad
230177b
bef5e61
5204cf2
b878e5f
744e627
c6765ca
ade21de
dc0b0f1
1191fa5
85e69e5
6808cb7
bed51bc
800b65c
683c758
aeb2ba1
311b131
777567d
fdd537c
469c5dd
9f25d63
6c4ec8f
03e6d26
0613b32
bdb35bd
6f570b4
b3a3ccd
5ff1266
0bbe25b
7c5312c
c0682da
02a026c
368feb4
a49a406
7d8bb23
4de3e63
4f7efae
6861bc0
9d9359b
b4b6924
c42d207
03e2922
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
# Local | ||
from .collection import CollectionKeys, CollectionManager | ||
from .constants import TF_DEFAULT_SAVED_COLLECTIONS | ||
from .singleton_utils import set_hook | ||
from .utils import ( | ||
TFDistributionStrategy, | ||
|
@@ -217,6 +218,14 @@ def export_collections(self): | |
collection_file_name = f"{self.worker}_collections.json" | ||
self.collection_manager.export(self.out_dir, collection_file_name) | ||
|
||
def has_default_hook_configuration(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this function is repeated again here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have overridden the fn in core hook to utilize a constant defined for TF. |
||
# Used in AWS TF to determine if the hook | ||
# is using the default hook configuration | ||
collections_being_saved = [x.name for x in self._collections_to_save] | ||
if set(collections_being_saved) == set(TF_DEFAULT_SAVED_COLLECTIONS): | ||
return True | ||
return False | ||
|
||
def _get_custom_and_default_collections(self) -> Tuple[Set["Collection"], Set["Collection"]]: | ||
if self._custom_collections is None: | ||
self._custom_collections = set() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
SMDEBUG_GRADIENTS_KEY = "smdebug_gradients" | ||
SMDEBUG_LAYER_OUTPUTS_KEY = "smdebug_layer_outputs" | ||
SMDEBUG_PREFIX = "smdebug_" | ||
|
||
TF_DEFAULT_SAVED_COLLECTIONS = ["losses", "metrics", "sm_metrics"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
get_model_input_export_name, | ||
get_model_output_export_name, | ||
is_keras_optimizer, | ||
is_tf_version_2_3_x, | ||
is_tf_version_2x, | ||
) | ||
|
||
|
@@ -71,6 +72,14 @@ def __init__( | |
) # stores tensors custom tensors saved by users every step | ||
self.saved_layers = dict() | ||
self.has_registered_model = False | ||
# supports_tf_logs property was introduced in TF 2.3.0 | ||
# it indicates to the framework that the callback is not | ||
# limited to reading only numpy logs | ||
self._supports_tf_logs = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment. |
||
# TF 2.3.0 has a callback ordering bug | ||
# this flag indicated to the train_batch_begin callback | ||
# the the step was already incremented in the on_train_begin callback | ||
self.step_incremented_in_on_train_begin = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment about what is this. |
||
|
||
def _is_not_supported(self): | ||
if self.distribution_strategy is None: | ||
|
@@ -109,7 +118,8 @@ def register_model(self, model): | |
# It attaches a hook to every layer of the model to capture | ||
# layer values | ||
self.model = model | ||
self._wrap_model_with_input_output_saver() | ||
if self.tape is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The wrap model isn't needed with the new changes on TF to capture layer inputs and outputs when we use |
||
self._wrap_model_with_input_output_saver() | ||
self.has_registered_model = True | ||
|
||
def _get_matching_collections( | ||
|
@@ -348,7 +358,10 @@ def _prepare_tensors_available_post_step(self): | |
|
||
# Add tensor to custom collections | ||
for custom_coll in custom_collections: | ||
if match_inc(tensor_ref.name, custom_coll.include_regex): | ||
if ( | ||
match_inc(tensor_ref.name, custom_coll.include_regex) | ||
and tensor_ref.tf_obj is not None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this check? was there an issue without it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tensor_ref.tf_obj is none for certain tensors (kernelGrad) and ends up throwing a warning |
||
): | ||
custom_coll.add_for_mode(tensor_ref.tf_obj, self.mode) | ||
if custom_coll not in self.tensor_to_collections[tensor_ref.name]: | ||
self.tensor_to_collections[tensor_ref.name].add(custom_coll) | ||
|
@@ -390,6 +403,12 @@ def _save_custom_tensors_post_step(self): | |
self._save_tensor_to_file(tensor_name, tensor_value, collection_names) | ||
self.custom_tensors_to_save.clear() | ||
|
||
def should_save_layer(self, layer_name): | ||
# Called in AWS TF to determine | ||
# if a particular layer value | ||
# should be saved | ||
return self.should_save_tensor_or_collection(layer_name, CollectionKeys.LAYERS) | ||
|
||
def _save_tensor_to_file(self, tensor_name, tensor_value, collections): | ||
if isinstance(collections, set) is False: | ||
collections = {collections} | ||
|
@@ -418,6 +437,31 @@ def _save_tensor_to_file(self, tensor_name, tensor_value, collections): | |
collection.set_tensor_ref(tensor_ref) | ||
self._save_for_tensor(tensor_name, t, check_before_write=True) | ||
|
||
def save_gradients_from_logs(self, gradients): | ||
if gradients is not None: | ||
gradient_collection = self.get_collection(CollectionKeys.GRADIENTS) | ||
step_collections = self._get_collections_to_save_for_step() | ||
collections_to_write = ( | ||
{gradient_collection} if gradient_collection in step_collections else set() | ||
) | ||
vandanavk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if gradients and isinstance(gradients[0], tuple) is False: | ||
gradients = zip(self.model.trainable_variables, gradients) | ||
for v, g in gradients: | ||
if isinstance(v, tf.Tensor): | ||
# Tensor.name is meaningless with eager execution | ||
layer_name = str(v.numpy(), "utf-8") | ||
elif isinstance(v, tf.Variable): | ||
layer_name = v.name | ||
else: | ||
layer_name = v | ||
layer_name = layer_name.split(":")[0] | ||
export_name = "gradients/" + layer_name + "Grad" | ||
if isinstance(g, IndexedSlices): | ||
# This class is a simple wrapper for a pair of Tensor objects | ||
# See: https://www.tensorflow.org/api_docs/python/tf/IndexedSlices | ||
g = g.values | ||
self._save_tensor_to_file(export_name, g, collections_to_write) | ||
|
||
def save_smdebug_logs(self, logs): | ||
if logs is None: | ||
return | ||
|
@@ -437,24 +481,10 @@ def save_smdebug_logs(self, logs): | |
) | ||
# Save Gradients | ||
elif key == SMDEBUG_GRADIENTS_KEY: | ||
gradients = logs[key] | ||
if gradients is not None: | ||
for g, v in zip(gradients, self.model.trainable_variables): | ||
layer_name = v.name | ||
if len(layer_name.split(":")) > 1: | ||
layer_name = layer_name.split(":")[0] | ||
export_name = "gradients/" + layer_name + "Grad" | ||
if isinstance(g, IndexedSlices): | ||
# This class is a simple wrapper for a pair of Tensor objects | ||
# See: https://www.tensorflow.org/api_docs/python/tf/IndexedSlices | ||
g = g.values | ||
tensors_to_save.append((export_name, g)) | ||
collections_to_write = {self.get_collection(CollectionKeys.GRADIENTS)} | ||
self.save_gradients_from_logs(logs[key]) | ||
# Save Intermediate Layers | ||
elif key == SMDEBUG_LAYER_OUTPUTS_KEY: | ||
layer_outputs = logs[key] | ||
self.save_layer_outputs(layer_outputs) | ||
self.save_layer_inputs(logs[ModelInput.INPUTS], layer_outputs) | ||
self._save_layer_values(logs[key]) | ||
# Save Model Inputs | ||
elif key in ModelInputs: | ||
export_name = get_model_input_export_name() | ||
|
@@ -489,10 +519,9 @@ def _save_metrics(self, batch, logs, force_save=False): | |
self._add_metric(metric_name=key) | ||
self._save_for_tensor(key, logs[key], check_before_write=False) | ||
|
||
def _save_layer_input_and_outputs(self, grad_tape=False): | ||
# Iterates over all the saved layers for input and output values | ||
if is_tf_version_2x() is False or (grad_tape is False and self.model.run_eagerly is False): | ||
# This function only works when the run_eagerly is True | ||
def _save_layer_input_and_outputs(self): | ||
# Run only for GradTape | ||
vandanavk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self.tape is None: | ||
return | ||
for layer_name in self.saved_layers: | ||
# Save Input | ||
|
@@ -520,7 +549,6 @@ def _save_tensors_post_step(self, batch, logs): | |
# weights, metrics | ||
self._save_metrics(batch, logs) | ||
self.save_smdebug_logs(logs) | ||
self._save_layer_input_and_outputs() | ||
self._save_custom_tensors_post_step() | ||
|
||
if is_tf_version_2x() and tf.executing_eagerly(): | ||
|
@@ -615,6 +643,13 @@ def _on_any_mode_begin(self, mode): | |
self.graph = tf.get_default_graph() | ||
self.set_mode(mode) | ||
|
||
if self.prepared_collections is False and is_tf_version_2_3_x(): | ||
# Addresses ordering issues in TF 2.3.0 | ||
# sets prepared_collections to True here | ||
self._prepare_collections() | ||
self._increment_step() | ||
self.step_incremented_in_on_train_begin = True | ||
|
||
# have to clear callable cache if we are not caching per mode | ||
self.callable_cache.change_mode() | ||
|
||
|
@@ -658,7 +693,12 @@ def _on_any_batch_begin(self, batch, mode, logs=None): | |
# Write the gradients of the past step if the writer is still available. | ||
if self.writer is not None or len(self.writer_map): | ||
self._close_writers() | ||
self._increment_step() | ||
|
||
# Addresses callback ordering bug in TF 2.3.0 | ||
if self.step_incremented_in_on_train_begin is False: | ||
self._increment_step() | ||
else: | ||
self.step_incremented_in_on_train_begin = False | ||
|
||
if self.prepared_collections is False: | ||
# sets prepared_collections to True here | ||
|
@@ -668,7 +708,6 @@ def _on_any_batch_begin(self, batch, mode, logs=None): | |
if (is_tf_version_2x() and tf.executing_eagerly()) or self._validate_exec_function( | ||
self._get_exec_function(mode) | ||
): | ||
self._wrap_model_with_input_output_saver() | ||
self._prepare_layers(mode) | ||
self._prepare_tensors_available_post_step() | ||
self._prepared_tensors[mode] = True | ||
|
@@ -698,33 +737,23 @@ def on_test_batch_begin(self, batch, logs=None): | |
def on_predict_batch_begin(self, batch, logs=None): | ||
self._on_any_batch_begin(batch, ModeKeys.PREDICT, logs=logs) | ||
|
||
def _save_layer_values(self, layer_outputs, collection, model=None, inputs=None): | ||
if model is None: | ||
if self.model: | ||
model = self.model | ||
else: | ||
return | ||
if layer_outputs is not None: | ||
tensors_to_save = [] | ||
step_collections = self._get_collections_to_save_for_step() | ||
collections_to_write = {collection} if collection in step_collections else set() | ||
tensor_suffix = "output" | ||
if inputs is not None: | ||
layer_outputs = [inputs] + layer_outputs | ||
tensor_suffix = "input" | ||
for o, l in zip(layer_outputs, model.layers): | ||
export_name = get_export_name_for_keras(l.name, tensor_suffix) | ||
tensors_to_save.append((export_name, o)) | ||
for t_name, t_value in tensors_to_save: | ||
self._save_tensor_to_file(t_name, t_value, collections_to_write) | ||
|
||
def save_layer_outputs(self, layer_outputs, model=None): | ||
self._save_layer_values(layer_outputs, self.get_collection(CollectionKeys.LAYERS), model) | ||
|
||
def save_layer_inputs(self, x, layer_outputs, model=None): | ||
self._save_layer_values( | ||
layer_outputs, self.get_collection(CollectionKeys.LAYERS), model, inputs=x | ||
) | ||
def _save_layer_values(self, logs): | ||
if logs is None: | ||
return | ||
step_collections = self._get_collections_to_save_for_step() | ||
layer_collection = self.get_collection(CollectionKeys.LAYERS) | ||
collections_to_write = {layer_collection} if layer_collection in step_collections else set() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will we be looping through logs even if collections_to_write = set() ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this "if" equal to checking
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to loop though logs if collections_to_write={} because:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which collection will save_tensor_to_file save to if collections_to_Write = {} ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll be saving to custom collections with a matching regex |
||
for layer_name, layer_input, layer_output in logs: | ||
# Cast layer_name to str since it can also be of type bytes | ||
# when run with mirrored strategy | ||
if len(layer_input) == 1: | ||
# Layer Inputs are flattened and passed as a list into | ||
# the next layer. Unpacking it speeds up the _make_numpy fn. | ||
layer_input = layer_input[0] | ||
layer_input_tensor_name = get_export_name_for_keras(str(layer_name), "input") | ||
self._save_tensor_to_file(layer_input_tensor_name, layer_input, collections_to_write) | ||
layer_output_tensor_name = get_export_name_for_keras(str(layer_name), "output") | ||
self._save_tensor_to_file(layer_output_tensor_name, layer_output, collections_to_write) | ||
|
||
def _write_optimizer_variables(self): | ||
optimizer_collections = self.collection_manager.get(CollectionKeys.OPTIMIZER_VARIABLES) | ||
|
@@ -951,7 +980,7 @@ def run(*args, **kwargs): | |
) | ||
|
||
self._write_optimizer_variables() | ||
self._save_layer_input_and_outputs(grad_tape=True) | ||
self._save_layer_input_and_outputs() | ||
if not ((isinstance(loss, tf.Tensor)) and hasattr(loss, "numpy")): | ||
return grads | ||
self._add_metric(metric_name="loss", metric_value=loss) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should_save_tensor_for_step ?