diff --git a/smdebug/core/access_layer/file.py b/smdebug/core/access_layer/file.py index 76ded6af1..9a6f6ef91 100644 --- a/smdebug/core/access_layer/file.py +++ b/smdebug/core/access_layer/file.py @@ -3,6 +3,7 @@ import shutil # First Party +from smdebug.core.logger import get_logger from smdebug.core.sagemaker_utils import is_sagemaker_job # Local @@ -40,6 +41,7 @@ def __init__(self, path, mode): super().__init__() self.path = path self.mode = mode + self.logger = get_logger() ensure_dir(path) if mode in WRITE_MODES: self.temp_path = get_temp_path(self.path) @@ -65,6 +67,9 @@ def close(self): self._accessor.close() if self.mode in WRITE_MODES: shutil.move(self.temp_path, self.path) + self.logger.debug( + f"Sagemaker-Debugger: Wrote {os.path.getsize(self.path)} bytes to file {self.path}" + ) def ingest_all(self): self._data = self._accessor.read() diff --git a/smdebug/core/access_layer/s3.py b/smdebug/core/access_layer/s3.py index 33b7aab1d..3e46fc407 100644 --- a/smdebug/core/access_layer/s3.py +++ b/smdebug/core/access_layer/s3.py @@ -1,4 +1,5 @@ # Standard Library +import os import re # Third Party @@ -6,6 +7,7 @@ # First Party from smdebug.core.access_layer.base import TSAccessBase +from smdebug.core.logger import get_logger from smdebug.core.utils import get_region @@ -20,6 +22,7 @@ def __init__( self.binary = binary self._init_data() self.flushed = False + self.logger = get_logger() self.current_len = 0 self.s3 = boto3.resource("s3", region_name=get_region()) @@ -56,6 +59,10 @@ def close(self): return key = self.s3.Object(self.bucket_name, self.key_name) key.put(Body=self.data) + self.logger.debug( + f"Sagemaker-Debugger: Wrote {len(self.data)} bytes to file " + f"s3://{os.path.join(self.bucket_name, self.key_name)}" + ) self._init_data() self.flushed = True diff --git a/smdebug/core/index_reader.py b/smdebug/core/index_reader.py index ac5014a06..d18ff7df7 100644 --- a/smdebug/core/index_reader.py +++ b/smdebug/core/index_reader.py @@ -27,8 +27,6 @@ ) from smdebug.exceptions import IndexReaderException, TensorUnavailableForStep -logger = get_logger() - class ReadIndexFilesCache: """ @@ -105,6 +103,7 @@ def __init__(self, path): "TORNASOLE_EVENT_FILE_RETRY_LIMIT", DEFAULT_EVENT_FILE_RETRY_LIMIT ) self.path = path + self.logger = get_logger() @abstractmethod def fetch_tensor_value(self, tensor_location: TensorLocation): @@ -136,7 +135,7 @@ def event_file_present_loop(self, tensor_location: TensorLocation): step=tensor_location.mode_step, ) elif has_training_ended(self.path) is True: - logger.warn( + self.logger.warn( f"IndexReader: Training Has Ended" f"\nIndexReader: {event_file_name} was written but not found." ) @@ -149,7 +148,7 @@ def event_file_present_loop(self, tensor_location: TensorLocation): num_retry += 1 time.sleep(2) if num_retry >= self.event_file_retry_limit: - logger.warn( + self.logger.warn( f"IndexReader: {event_file_name} was written but not found. After {num_retry} retries." ) raise TensorUnavailableForStep( @@ -168,7 +167,7 @@ def _has_event_file_been_skipped(self, missing_event_file_name: str) -> bool: :param missing_event_file_name: :return: """ - logger.info(f" Index Reader: Event File {missing_event_file_name} not found.") + self.logger.info(f" Index Reader: Event File {missing_event_file_name} not found.") missing_worker = parse_worker_name_from_file(missing_event_file_name) missing_step = IndexFileLocationUtils.parse_step_from_index_file_name( missing_event_file_name @@ -183,11 +182,11 @@ def _has_event_file_been_skipped(self, missing_event_file_name: str) -> bool: we perform the list operation. """ return False - logger.warn( + self.logger.warn( f" Index Reader: Event File {missing_event_file_name} was written but not found " f"\nHowever Event File {event_file} found." ) - logger.warn(f"IndexReader: Skipping {missing_event_file_name} ") + self.logger.warn(f"IndexReader: Skipping {missing_event_file_name} ") return True return False @@ -218,7 +217,10 @@ def _update_tensors_from_json( ... } """ - index_dict = json.loads(response) + try: + index_dict = json.loads(response) + except ValueError: + raise IndexReaderException("Empty/Corrupt Index File") IndexReader._validate(index_dict) index_meta = index_dict["meta"] mode = index_meta["mode"] @@ -307,7 +309,7 @@ def read_index_files( steps = [] workers = [] index_files, start_after_key = self.list_index_files(start_after_key) - logger.debug(f'Loaded Index Files: {",".join(index_files)}') + self.logger.debug(f'Loaded Index Files: {",".join(index_files)}') for index_file in index_files: if self.index_file_cache.has_not_read(index_file): step = IndexFileLocationUtils.parse_step_from_index_file_name(index_file) @@ -321,7 +323,7 @@ def read_index_files( ) self.index_file_cache.add(index_file, start_after_key) - responses = S3Handler().get_objects(object_requests) + responses = self.s3_handler.get_objects(object_requests) return responses, steps, start_after_key, workers def list_index_files(self, start_after_key=None): @@ -419,6 +421,9 @@ def read_index_files( ) or range_steps is None: steps.append(step) workers.append(parse_worker_name_from_file(index_file)) + self.logger.debug( + f"Sagemaker-Debugger: Read {os.path.getsize(index_file)} bytes from file {index_file}" + ) with open(index_file) as f: responses.append(f.read().encode()) self.index_file_cache.add(index_file, start_after_key) diff --git a/smdebug/exceptions.py b/smdebug/exceptions.py index 1ea17658c..bf05f680d 100644 --- a/smdebug/exceptions.py +++ b/smdebug/exceptions.py @@ -56,10 +56,7 @@ def __init__(self, tname): self.tname = tname def __str__(self): - return ( - "Tensor {} can not be satisfied. Tornasole does " - "not know about this tensor.".format(self.tname) - ) + return "Tensor {} was not saved.".format(self.tname) class InvalidWorker(Exception):