Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,22 @@ py_test(
],
)

py_test(
name = "event_file_loader_notf_test",
size = "small",
srcs = ["event_file_loader_test.py"],
main = "event_file_loader_test.py",
srcs_version = "PY2AND3",
deps = [
":event_file_loader",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/compat:no_tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/summary/writer",
"@org_pythonhosted_six",
],
)

py_library(
name = "event_accumulator",
srcs = [
Expand Down
22 changes: 15 additions & 7 deletions tensorboard/backend/event_processing/event_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,29 @@

def _make_tf_record_iterator(file_path):
"""Returns an iterator over TF records for the given tfrecord file."""
try:
from tensorboard.compat import _pywrap_tensorflow

py_record_reader_new = _pywrap_tensorflow.PyRecordReader_New
except (ImportError, AttributeError):
py_record_reader_new = None
# If we don't have TF at all, use the stub implementation.
if tf.__version__ == "stub":
# TODO(#1711): Reshape stub implementation to fit tf_record_iterator API
# rather than needlessly emulating the old PyRecordReader_New API.
logger.debug("Opening a stub record reader pointing at %s", file_path)
return _PyRecordReaderIterator(
tf.pywrap_tensorflow.PyRecordReader_New, file_path
)
# If PyRecordReader exists, use it, otherwise use tf_record_iterator().
# Check old first, then new, since tf_record_iterator existed previously but
# only gained the semantics we need at the time PyRecordReader was removed.
#
# TODO(#1711): Eventually remove PyRecordReader fallback once we can drop
# support for TF 2.1 and prior, and find a non-deprecated replacement for
# tf.compat.v1.io.tf_record_iterator.
try:
from tensorflow.python import pywrap_tensorflow

py_record_reader_new = pywrap_tensorflow.PyRecordReader_New
except (ImportError, AttributeError):
py_record_reader_new = None
if py_record_reader_new:
logger.debug("Opening a record reader pointing at %s", file_path)
logger.debug("Opening a PyRecordReader pointing at %s", file_path)
return _PyRecordReaderIterator(py_record_reader_new, file_path)
else:
logger.debug("Opening a tf_record_iterator pointing at %s", file_path)
Expand Down
33 changes: 0 additions & 33 deletions tensorboard/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,3 @@ def tf2():
# As a fallback, try `tensorflow.compat.v2` if it's defined.
return tf.compat.v2
raise ImportError("cannot import tensorflow 2.0 API")


# TODO(https://github.com/tensorflow/tensorboard/issues/1711): remove this
@_lazy.lazy_load("tensorboard.compat._pywrap_tensorflow")
def _pywrap_tensorflow():
"""Provide pywrap_tensorflow access in TensorBoard.

pywrap_tensorflow cannot be accessed from tf.python.pywrap_tensorflow
and needs to be imported using
`from tensorflow.python import pywrap_tensorflow`. Therefore, we provide
a separate accessor function for it here.

NOTE: pywrap_tensorflow is not part of TensorFlow API and this
dependency will go away soon.

Returns:
pywrap_tensorflow import, if available.

Raises:
ImportError: if we couldn't import pywrap_tensorflow.
"""
try:
from tensorboard.compat import notf
except ImportError:
try:
from tensorflow.python import pywrap_tensorflow

return pywrap_tensorflow
except ImportError:
pass
from tensorboard.compat.tensorflow_stub import pywrap_tensorflow

return pywrap_tensorflow
74 changes: 59 additions & 15 deletions tensorboard/compat/tensorflow_stub/pywrap_tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,29 @@ def __init__(
self.status = status
self.curr_event = None
self.file_handle = gfile.GFile(self.filename, "rb")
# Maintain a buffer of partially read records, so we can recover from
# truncated records upon a retry.
self._buffer = b""
self._buffer_pos = 0

def GetNext(self):
# Each new read should start at the beginning of any partial record.
self._buffer_pos = 0
# Read the header
self.curr_event = None
header_str = self.file_handle.read(8)
if len(header_str) != 8:
header_str = self._read(8)
if not header_str:
# Hit EOF so raise and exit
raise errors.OutOfRangeError(None, None, "No more events to read")
if len(header_str) < 8:
raise self._truncation_error("header")
Comment on lines +214 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK read is allowed to return fewer bytes than were requested, even
if those are available in the stream. (For instance, this can occur when
a signal handler is entered during the read.) From Python’s read docs,
it’s not clear to me whether a short result is guaranteed to imply that
EOF is imminent. In practice, it looks like cPython 3.7 and 3.9 both do
exhaust*. Do you think that it’s worth retrying the read until done
or read returns b"", or is that not necessary?

* Test program:

Source (tested on cPython 3.7.5rc1 and `667b91a5e2` on Debian)
import os
import signal
import threading
import time


def handle_sigusr1(signalnum, frame):
    print("Got SIGUSR1 (%d)" % signalnum)


def send_delayed_sigusr1(delay):
    time.sleep(delay)
    os.kill(os.getpid(), signal.SIGUSR1)


signal.signal(signal.SIGUSR1, handle_sigusr1)
threading.Thread(target=send_delayed_sigusr1, args=(0.1,)).start()
with open("/dev/zero", "rb") as infile:
    zeros = infile.read(int(1e10))
print(len(zeros))  # seems to always give 1e10 in my testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up and testing it, I hadn't thought about that. That said, it seems fine to me to not retry the read immediately, since A) practically speaking, with this change the higher level code will retry on the next reload cycle anyway and B) under the circumstances you describe, it's not obvious to me that read returning zero bytes would actually be a stronger indication of EOF (unless the spec is that it always reads at least 1 byte if available, but might return fewer than available if interrupted by a signal).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless the spec is that it always reads at least 1 byte if available,
but might return fewer than available if interrupted by a signal

Yeah, this is basically the spec in C (which is why I ask); Python is
not very clear about what its spec is.

Good point that the higher level code will retry anyway, so the worst
case is that there’ll be a spurious “truncated record” message. Keeping
it as is sounds good to me, then.

header = struct.unpack("Q", header_str)

# Read the crc32, which is 4 bytes, and check it against
# the crc32 of the header
crc_header_str = self.file_handle.read(4)
crc_header_str = self._read(4)
if len(crc_header_str) < 4:
raise self._truncation_error("header crc")
crc_header = struct.unpack("I", crc_header_str)
header_crc_calc = masked_crc32c(header_str)
if header_crc_calc != crc_header[0]:
Expand All @@ -220,25 +230,59 @@ def GetNext(self):
# The length of the header tells us how many bytes the Event
# string takes
header_len = int(header[0])
event_str = self.file_handle.read(header_len)
event_str = self._read(header_len)
if len(event_str) < header_len:
raise self._truncation_error("data")

event_crc_calc = masked_crc32c(event_str)

# The next 4 bytes contain the crc32 of the Event string,
# which we check for integrity. Sometimes, the last Event
# has no crc32, in which case we skip.
crc_event_str = self.file_handle.read(4)
if crc_event_str:
crc_event = struct.unpack("I", crc_event_str)
if event_crc_calc != crc_event[0]:
raise errors.DataLossError(
None,
None,
"{} failed event crc32 check".format(self.filename),
)
# which we check for integrity.
crc_event_str = self._read(4)
if len(crc_event_str) < 4:
raise self._truncation_error("data crc")
crc_event = struct.unpack("I", crc_event_str)
if event_crc_calc != crc_event[0]:
raise errors.DataLossError(
None, None, "{} failed event crc32 check".format(self.filename),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We’re raising the same error for a CRC failure as a truncated stream.
How does upstream code know to retry on the latter but skip this record
on the former? From a cursory read it looks to me like this would
infinitely loop raising DataLossErrors once we find a corrupt record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better or for worse, this is the same behavior that the TF C++ RecordReader has always had:
https://github.com/tensorflow/tensorflow/blob/a34091e538540aad57a7a941575538944f38db24/tensorflow/core/lib/io/record_reader.cc#L99
https://github.com/tensorflow/tensorflow/blob/a34091e538540aad57a7a941575538944f38db24/tensorflow/core/lib/io/record_reader.cc#L105

So I'm not too inclined to try to fix it here given that we can't really fix it in the normal path other than by attempting to parse the string error message on the resulting status, which seems too brittle. I think corrupted checksums are infrequent enough in practice that looping on them, while not ideal, is an acceptable behavior. It's the same "infinite loop" that we would have at EOF anyway, so not really a big deal IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, okay, confirmed: wrote an event file with five steps, corrupted the
third one, and observed that only the first two steps are ever loaded.
That’s not great, but you’re right that it’s not a regression.


# Set the current event to be read later by record() call
self.curr_event = event_str
# Clear the buffered partial record since we're done reading it.
self._buffer = b""

def _read(self, n):
"""Read up to n bytes from the underlying file, with buffering.

Reads are satisfied from a buffer of previous data read starting at
`self._buffer_pos` until the buffer is exhausted, and then from the
actual underlying file. Any new data is added to the buffer, and
`self._buffer_pos` is advanced to the point in the buffer past all
data returned as part of this read.

Args:
n: non-negative number of bytes to read

Returns:
bytestring of data read, up to n bytes
"""
result = self._buffer[self._buffer_pos : self._buffer_pos + n]
self._buffer_pos += len(result)
n -= len(result)
if n > 0:
new_data = self.file_handle.read(n)
result += new_data
self._buffer += new_data
self._buffer_pos += len(new_data)
return result

def _truncation_error(self, section):
return errors.DataLossError(
None,
None,
"{} has truncated record in {}".format(self.filename, section),
)

def record(self):
return self.curr_event
1 change: 0 additions & 1 deletion tensorboard/plugins/projector/projector_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

from tensorboard.backend.http_util import Respond
from tensorboard.compat import tf
from tensorboard.compat import _pywrap_tensorflow
from tensorboard.plugins import base_plugin
from tensorboard.plugins.projector.projector_config_pb2 import ProjectorConfig
from tensorboard.util import tb_logging
Expand Down