Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ py_library(
srcs = ["event_file_loader.py"],
srcs_version = "PY2AND3",
deps = [
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
"//tensorboard/compat:tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/util:platform_util",
Expand Down Expand Up @@ -189,8 +191,6 @@ py_library(
":io_wrapper",
":plugin_asset_util",
":reservoir",
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
"//tensorboard/compat:tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/plugins/distribution:compressor",
Expand All @@ -205,6 +205,8 @@ py_test(
srcs_version = "PY2AND3",
deps = [
":event_accumulator",
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/plugins/audio:metadata",
Expand Down
4 changes: 2 additions & 2 deletions tensorboard/backend/event_processing/event_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,11 @@ def _GeneratorFromPath(path):
if not path:
raise ValueError("path must be a valid string")
if io_wrapper.IsSummaryEventsFile(path):
return event_file_loader.EventFileLoader(path)
return event_file_loader.LegacyEventFileLoader(path)
else:
return directory_watcher.DirectoryWatcher(
path,
event_file_loader.EventFileLoader,
event_file_loader.LegacyEventFileLoader,
io_wrapper.IsSummaryEventsFile,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def generators_from_logdir(logdir):

def generator_from_event_file(event_file):
"""Returns a generator that yields events from an event file."""
return event_file_loader.EventFileLoader(event_file).Load()
return event_file_loader.LegacyEventFileLoader(event_file).Load()


def get_inspection_units(logdir="", event_file="", tag=""):
Expand Down
20 changes: 18 additions & 2 deletions tensorboard/backend/event_processing/event_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import contextlib

from tensorboard import data_compat
from tensorboard import dataclass_compat
from tensorboard.compat import tf
from tensorboard.compat.proto import event_pb2
from tensorboard.util import platform_util
Expand Down Expand Up @@ -149,7 +151,7 @@ def Load(self):
logger.debug("No more events in %s", self._file_path)


class EventFileLoader(RawEventFileLoader):
class LegacyEventFileLoader(RawEventFileLoader):
"""An iterator that yields parsed Event protos."""

def Load(self):
Expand All @@ -161,10 +163,24 @@ def Load(self):
Yields:
All events in the file that have not been yielded yet.
"""
for record in super(EventFileLoader, self).Load():
for record in super(LegacyEventFileLoader, self).Load():
yield event_pb2.Event.FromString(record)


class EventFileLoader(LegacyEventFileLoader):
"""An iterator that passes events through read-time compat layers.

Specifically, this includes `data_compat` and `dataclass_compat`.
"""

def Load(self):
for event in super(EventFileLoader, self).Load():
event = data_compat.migrate_event(event)
events = dataclass_compat.migrate_event(event)
for event in events:
yield event


class TimestampedEventFileLoader(EventFileLoader):
"""An iterator that yields (UNIX timestamp float, Event proto) pairs."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import six

from tensorboard import data_compat
from tensorboard import dataclass_compat
from tensorboard.backend.event_processing import directory_loader
from tensorboard.backend.event_processing import directory_watcher
from tensorboard.backend.event_processing import event_file_loader
Expand Down Expand Up @@ -294,13 +292,6 @@ def AllSummaryMetadata(self):

def _ProcessEvent(self, event):
"""Called whenever an event is loaded."""
event = data_compat.migrate_event(event)
events = dataclass_compat.migrate_event(event)
for event in events:
self._ProcessMigratedEvent(event)

def _ProcessMigratedEvent(self, event):
"""Helper for `_ProcessEvent`."""
if self._first_event_timestamp is None:
self._first_event_timestamp = event.wall_time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf

from tensorboard import data_compat
from tensorboard import dataclass_compat
from tensorboard.backend.event_processing import plugin_event_accumulator as ea
from tensorboard.compat.proto import config_pb2
from tensorboard.compat.proto import event_pb2
Expand Down Expand Up @@ -60,7 +62,11 @@ def __init__(self, testcase, zero_out_timestamps=False):

def Load(self):
while self.items:
yield self.items.pop(0)
event = self.items.pop(0)
event = data_compat.migrate_event(event)
events = dataclass_compat.migrate_event(event)
for event in events:
yield event

def AddScalarTensor(self, tag, wall_time=0, step=0, value=0):
"""Add a rank-0 tensor event.
Expand Down
4 changes: 2 additions & 2 deletions tensorboard/dataclass_compat_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ def test_graph_def(self):
self.assertLen(files, 1)
event_file = os.path.join(logdir, files[0])
self.assertIn("tfevents", event_file)
loader = event_file_loader.EventFileLoader(event_file)
events = list(loader.Load())
loader = event_file_loader.RawEventFileLoader(event_file)
events = [event_pb2.Event.FromString(x) for x in loader.Load()]
self.assertLen(events, 2)
self.assertEqual(events[0].WhichOneof("what"), "file_version")
self.assertEqual(events[1].WhichOneof("what"), "graph_def")
Expand Down
4 changes: 2 additions & 2 deletions tensorboard/uploader/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ py_library(
deps = [
":logdir_loader",
":util",
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
"//tensorboard:expect_grpc_installed",
"//tensorboard/backend:process_graph",
"//tensorboard/backend/event_processing:directory_loader",
Expand All @@ -123,6 +121,8 @@ py_test(
":test_util",
":uploader_lib",
":util",
"//tensorboard:data_compat",
"//tensorboard:dataclass_compat",
"//tensorboard:expect_grpc_installed",
"//tensorboard:expect_grpc_testing_installed",
"//tensorboard:expect_tensorflow_installed",
Expand Down
50 changes: 17 additions & 33 deletions tensorboard/uploader/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
from tensorboard.uploader.proto import experiment_pb2
from tensorboard.uploader import logdir_loader
from tensorboard.uploader import util
from tensorboard import data_compat
from tensorboard import dataclass_compat
from tensorboard.backend import process_graph
from tensorboard.backend.event_processing import directory_loader
from tensorboard.backend.event_processing import event_file_loader
Expand Down Expand Up @@ -353,8 +351,7 @@ def send_requests(self, run_to_events):
point is too large (say, due to a gigabyte-long tag name).
"""

for (run_name, event, orig_value) in self._run_values(run_to_events):
value = data_compat.migrate_value(orig_value)
for (run_name, event, value) in self._run_values(run_to_events):
time_series_key = (run_name, value.tag)

# The metadata for a time series is memorized on the first event.
Expand Down Expand Up @@ -408,10 +405,6 @@ def send_requests(self, run_to_events):
def _run_values(self, run_to_events):
"""Helper generator to create a single stream of work items.

The events are passed through the `data_compat` and `dataclass_compat`
layers before being emitted, so downstream consumers may process them
uniformly.

Note that `dataclass_compat` may emit multiple variants of
the same event, for backwards compatibility. Thus this stream should
be filtered to obtain the desired version of each event. Here, we
Expand All @@ -429,13 +422,9 @@ def _run_values(self, run_to_events):
# such data from the request anyway.
for (run_name, events) in six.iteritems(run_to_events):
for event in events:
v2_event = data_compat.migrate_event(event)
events = dataclass_compat.migrate_event(v2_event)
events = _filter_graph_defs(events)
for event in events:
if event.summary:
for value in event.summary.value:
yield (run_name, event, value)
_filter_graph_defs(event)
for value in event.summary.value:
yield (run_name, event, value)


class _ScalarBatchedRequestSender(object):
Expand Down Expand Up @@ -839,24 +828,19 @@ def _varint_cost(n):
return result


def _filter_graph_defs(events):
for e in events:
for v in e.summary.value:
if (
v.metadata.plugin_data.plugin_name
!= graphs_metadata.PLUGIN_NAME
):
continue
if v.tag == graphs_metadata.RUN_GRAPH_NAME:
data = list(v.tensor.string_val)
filtered_data = [_filtered_graph_bytes(x) for x in data]
filtered_data = [x for x in filtered_data if x is not None]
if filtered_data != data:
new_tensor = tensor_util.make_tensor_proto(
filtered_data, dtype=types_pb2.DT_STRING
)
v.tensor.CopyFrom(new_tensor)
yield e
def _filter_graph_defs(event):
for v in event.summary.value:
if v.metadata.plugin_data.plugin_name != graphs_metadata.PLUGIN_NAME:
continue
if v.tag == graphs_metadata.RUN_GRAPH_NAME:
data = list(v.tensor.string_val)
filtered_data = [_filtered_graph_bytes(x) for x in data]
filtered_data = [x for x in filtered_data if x is not None]
if filtered_data != data:
new_tensor = tensor_util.make_tensor_proto(
filtered_data, dtype=types_pb2.DT_STRING
)
v.tensor.CopyFrom(new_tensor)


def _filtered_graph_bytes(graph_bytes):
Expand Down
Loading