diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index cbe3330c89..f7c898f395 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -156,6 +156,22 @@ py_test( ], ) +py_test( + name = "event_file_loader_notf_test", + size = "small", + srcs = ["event_file_loader_test.py"], + main = "event_file_loader_test.py", + srcs_version = "PY2AND3", + deps = [ + ":event_file_loader", + "//tensorboard:expect_tensorflow_installed", + "//tensorboard/compat:no_tensorflow", + "//tensorboard/compat/proto:protos_all_py_pb2", + "//tensorboard/summary/writer", + "@org_pythonhosted_six", + ], +) + py_library( name = "event_accumulator", srcs = [ diff --git a/tensorboard/backend/event_processing/event_file_loader.py b/tensorboard/backend/event_processing/event_file_loader.py index 408dc31810..faee398bda 100644 --- a/tensorboard/backend/event_processing/event_file_loader.py +++ b/tensorboard/backend/event_processing/event_file_loader.py @@ -29,12 +29,14 @@ def _make_tf_record_iterator(file_path): """Returns an iterator over TF records for the given tfrecord file.""" - try: - from tensorboard.compat import _pywrap_tensorflow - - py_record_reader_new = _pywrap_tensorflow.PyRecordReader_New - except (ImportError, AttributeError): - py_record_reader_new = None + # If we don't have TF at all, use the stub implementation. + if tf.__version__ == "stub": + # TODO(#1711): Reshape stub implementation to fit tf_record_iterator API + # rather than needlessly emulating the old PyRecordReader_New API. + logger.debug("Opening a stub record reader pointing at %s", file_path) + return _PyRecordReaderIterator( + tf.pywrap_tensorflow.PyRecordReader_New, file_path + ) # If PyRecordReader exists, use it, otherwise use tf_record_iterator(). # Check old first, then new, since tf_record_iterator existed previously but # only gained the semantics we need at the time PyRecordReader was removed. @@ -42,8 +44,14 @@ def _make_tf_record_iterator(file_path): # TODO(#1711): Eventually remove PyRecordReader fallback once we can drop # support for TF 2.1 and prior, and find a non-deprecated replacement for # tf.compat.v1.io.tf_record_iterator. + try: + from tensorflow.python import pywrap_tensorflow + + py_record_reader_new = pywrap_tensorflow.PyRecordReader_New + except (ImportError, AttributeError): + py_record_reader_new = None if py_record_reader_new: - logger.debug("Opening a record reader pointing at %s", file_path) + logger.debug("Opening a PyRecordReader pointing at %s", file_path) return _PyRecordReaderIterator(py_record_reader_new, file_path) else: logger.debug("Opening a tf_record_iterator pointing at %s", file_path) diff --git a/tensorboard/compat/__init__.py b/tensorboard/compat/__init__.py index 78cd610e9c..9cadd3a4ed 100644 --- a/tensorboard/compat/__init__.py +++ b/tensorboard/compat/__init__.py @@ -74,36 +74,3 @@ def tf2(): # As a fallback, try `tensorflow.compat.v2` if it's defined. return tf.compat.v2 raise ImportError("cannot import tensorflow 2.0 API") - - -# TODO(https://github.com/tensorflow/tensorboard/issues/1711): remove this -@_lazy.lazy_load("tensorboard.compat._pywrap_tensorflow") -def _pywrap_tensorflow(): - """Provide pywrap_tensorflow access in TensorBoard. - - pywrap_tensorflow cannot be accessed from tf.python.pywrap_tensorflow - and needs to be imported using - `from tensorflow.python import pywrap_tensorflow`. Therefore, we provide - a separate accessor function for it here. - - NOTE: pywrap_tensorflow is not part of TensorFlow API and this - dependency will go away soon. - - Returns: - pywrap_tensorflow import, if available. - - Raises: - ImportError: if we couldn't import pywrap_tensorflow. - """ - try: - from tensorboard.compat import notf - except ImportError: - try: - from tensorflow.python import pywrap_tensorflow - - return pywrap_tensorflow - except ImportError: - pass - from tensorboard.compat.tensorflow_stub import pywrap_tensorflow - - return pywrap_tensorflow diff --git a/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py b/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py index 649ac598eb..31ee4f49ae 100644 --- a/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py +++ b/tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py @@ -197,19 +197,29 @@ def __init__( self.status = status self.curr_event = None self.file_handle = gfile.GFile(self.filename, "rb") + # Maintain a buffer of partially read records, so we can recover from + # truncated records upon a retry. + self._buffer = b"" + self._buffer_pos = 0 def GetNext(self): + # Each new read should start at the beginning of any partial record. + self._buffer_pos = 0 # Read the header self.curr_event = None - header_str = self.file_handle.read(8) - if len(header_str) != 8: + header_str = self._read(8) + if not header_str: # Hit EOF so raise and exit raise errors.OutOfRangeError(None, None, "No more events to read") + if len(header_str) < 8: + raise self._truncation_error("header") header = struct.unpack("Q", header_str) # Read the crc32, which is 4 bytes, and check it against # the crc32 of the header - crc_header_str = self.file_handle.read(4) + crc_header_str = self._read(4) + if len(crc_header_str) < 4: + raise self._truncation_error("header crc") crc_header = struct.unpack("I", crc_header_str) header_crc_calc = masked_crc32c(header_str) if header_crc_calc != crc_header[0]: @@ -220,25 +230,59 @@ def GetNext(self): # The length of the header tells us how many bytes the Event # string takes header_len = int(header[0]) - event_str = self.file_handle.read(header_len) + event_str = self._read(header_len) + if len(event_str) < header_len: + raise self._truncation_error("data") event_crc_calc = masked_crc32c(event_str) # The next 4 bytes contain the crc32 of the Event string, - # which we check for integrity. Sometimes, the last Event - # has no crc32, in which case we skip. - crc_event_str = self.file_handle.read(4) - if crc_event_str: - crc_event = struct.unpack("I", crc_event_str) - if event_crc_calc != crc_event[0]: - raise errors.DataLossError( - None, - None, - "{} failed event crc32 check".format(self.filename), - ) + # which we check for integrity. + crc_event_str = self._read(4) + if len(crc_event_str) < 4: + raise self._truncation_error("data crc") + crc_event = struct.unpack("I", crc_event_str) + if event_crc_calc != crc_event[0]: + raise errors.DataLossError( + None, None, "{} failed event crc32 check".format(self.filename), + ) # Set the current event to be read later by record() call self.curr_event = event_str + # Clear the buffered partial record since we're done reading it. + self._buffer = b"" + + def _read(self, n): + """Read up to n bytes from the underlying file, with buffering. + + Reads are satisfied from a buffer of previous data read starting at + `self._buffer_pos` until the buffer is exhausted, and then from the + actual underlying file. Any new data is added to the buffer, and + `self._buffer_pos` is advanced to the point in the buffer past all + data returned as part of this read. + + Args: + n: non-negative number of bytes to read + + Returns: + bytestring of data read, up to n bytes + """ + result = self._buffer[self._buffer_pos : self._buffer_pos + n] + self._buffer_pos += len(result) + n -= len(result) + if n > 0: + new_data = self.file_handle.read(n) + result += new_data + self._buffer += new_data + self._buffer_pos += len(new_data) + return result + + def _truncation_error(self, section): + return errors.DataLossError( + None, + None, + "{} has truncated record in {}".format(self.filename, section), + ) def record(self): return self.curr_event diff --git a/tensorboard/plugins/projector/projector_plugin.py b/tensorboard/plugins/projector/projector_plugin.py index a53a538fa4..bbb5b265e7 100644 --- a/tensorboard/plugins/projector/projector_plugin.py +++ b/tensorboard/plugins/projector/projector_plugin.py @@ -34,7 +34,6 @@ from tensorboard.backend.http_util import Respond from tensorboard.compat import tf -from tensorboard.compat import _pywrap_tensorflow from tensorboard.plugins import base_plugin from tensorboard.plugins.projector.projector_config_pb2 import ProjectorConfig from tensorboard.util import tb_logging