Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions smdebug/core/access_layer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil

# First Party
from smdebug.core.logger import get_logger
from smdebug.core.sagemaker_utils import is_sagemaker_job

# Local
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, path, mode):
super().__init__()
self.path = path
self.mode = mode
self.logger = get_logger()
ensure_dir(path)
if mode in WRITE_MODES:
self.temp_path = get_temp_path(self.path)
Expand All @@ -65,6 +67,9 @@ def close(self):
self._accessor.close()
if self.mode in WRITE_MODES:
shutil.move(self.temp_path, self.path)
self.logger.debug(
f"Sagemaker-Debugger: Wrote {os.path.getsize(self.path)} bytes to file {self.path}"
)

def ingest_all(self):
self._data = self._accessor.read()
Expand Down
7 changes: 7 additions & 0 deletions smdebug/core/access_layer/s3.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Standard Library
import os
import re

# Third Party
import boto3

# First Party
from smdebug.core.access_layer.base import TSAccessBase
from smdebug.core.logger import get_logger
from smdebug.core.utils import get_region


Expand All @@ -20,6 +22,7 @@ def __init__(
self.binary = binary
self._init_data()
self.flushed = False
self.logger = get_logger()

self.current_len = 0
self.s3 = boto3.resource("s3", region_name=get_region())
Expand Down Expand Up @@ -56,6 +59,10 @@ def close(self):
return
key = self.s3.Object(self.bucket_name, self.key_name)
key.put(Body=self.data)
self.logger.debug(
f"Sagemaker-Debugger: Wrote {len(self.data)} bytes to file "
f"s3://{os.path.join(self.bucket_name, self.key_name)}"
)
self._init_data()
self.flushed = True

Expand Down
25 changes: 15 additions & 10 deletions smdebug/core/index_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
)
from smdebug.exceptions import IndexReaderException, TensorUnavailableForStep

logger = get_logger()


class ReadIndexFilesCache:
"""
Expand Down Expand Up @@ -105,6 +103,7 @@ def __init__(self, path):
"TORNASOLE_EVENT_FILE_RETRY_LIMIT", DEFAULT_EVENT_FILE_RETRY_LIMIT
)
self.path = path
self.logger = get_logger()

@abstractmethod
def fetch_tensor_value(self, tensor_location: TensorLocation):
Expand Down Expand Up @@ -136,7 +135,7 @@ def event_file_present_loop(self, tensor_location: TensorLocation):
step=tensor_location.mode_step,
)
elif has_training_ended(self.path) is True:
logger.warn(
self.logger.warn(
f"IndexReader: Training Has Ended"
f"\nIndexReader: {event_file_name} was written but not found."
)
Expand All @@ -149,7 +148,7 @@ def event_file_present_loop(self, tensor_location: TensorLocation):
num_retry += 1
time.sleep(2)
if num_retry >= self.event_file_retry_limit:
logger.warn(
self.logger.warn(
f"IndexReader: {event_file_name} was written but not found. After {num_retry} retries."
)
raise TensorUnavailableForStep(
Expand All @@ -168,7 +167,7 @@ def _has_event_file_been_skipped(self, missing_event_file_name: str) -> bool:
:param missing_event_file_name:
:return:
"""
logger.info(f" Index Reader: Event File {missing_event_file_name} not found.")
self.logger.info(f" Index Reader: Event File {missing_event_file_name} not found.")
missing_worker = parse_worker_name_from_file(missing_event_file_name)
missing_step = IndexFileLocationUtils.parse_step_from_index_file_name(
missing_event_file_name
Expand All @@ -183,11 +182,11 @@ def _has_event_file_been_skipped(self, missing_event_file_name: str) -> bool:
we perform the list operation.
"""
return False
logger.warn(
self.logger.warn(
f" Index Reader: Event File {missing_event_file_name} was written but not found "
f"\nHowever Event File {event_file} found."
)
logger.warn(f"IndexReader: Skipping {missing_event_file_name} ")
self.logger.warn(f"IndexReader: Skipping {missing_event_file_name} ")
return True
return False

Expand Down Expand Up @@ -218,7 +217,10 @@ def _update_tensors_from_json(
...
}
"""
index_dict = json.loads(response)
try:
index_dict = json.loads(response)
except ValueError:
raise IndexReaderException("Empty/Corrupt Index File")
IndexReader._validate(index_dict)
index_meta = index_dict["meta"]
mode = index_meta["mode"]
Expand Down Expand Up @@ -307,7 +309,7 @@ def read_index_files(
steps = []
workers = []
index_files, start_after_key = self.list_index_files(start_after_key)
logger.debug(f'Loaded Index Files: {",".join(index_files)}')
self.logger.debug(f'Loaded Index Files: {",".join(index_files)}')
for index_file in index_files:
if self.index_file_cache.has_not_read(index_file):
step = IndexFileLocationUtils.parse_step_from_index_file_name(index_file)
Expand All @@ -321,7 +323,7 @@ def read_index_files(
)
self.index_file_cache.add(index_file, start_after_key)

responses = S3Handler().get_objects(object_requests)
responses = self.s3_handler.get_objects(object_requests)
return responses, steps, start_after_key, workers

def list_index_files(self, start_after_key=None):
Expand Down Expand Up @@ -419,6 +421,9 @@ def read_index_files(
) or range_steps is None:
steps.append(step)
workers.append(parse_worker_name_from_file(index_file))
self.logger.debug(
f"Sagemaker-Debugger: Read {os.path.getsize(index_file)} bytes from file {index_file}"
)
with open(index_file) as f:
responses.append(f.read().encode())
self.index_file_cache.add(index_file, start_after_key)
Expand Down
5 changes: 1 addition & 4 deletions smdebug/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ def __init__(self, tname):
self.tname = tname

def __str__(self):
return (
"Tensor {} can not be satisfied. Tornasole does "
"not know about this tensor.".format(self.tname)
)
return "Tensor {} was not saved.".format(self.tname)


class InvalidWorker(Exception):
Expand Down