diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 9b7260155b..850a3679a3 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -137,6 +137,8 @@ py_library( srcs = ["event_file_loader.py"], srcs_version = "PY2AND3", deps = [ + "//tensorboard:data_compat", + "//tensorboard:dataclass_compat", "//tensorboard/compat:tensorflow", "//tensorboard/compat/proto:protos_all_py_pb2", "//tensorboard/util:platform_util", @@ -189,8 +191,6 @@ py_library( ":io_wrapper", ":plugin_asset_util", ":reservoir", - "//tensorboard:data_compat", - "//tensorboard:dataclass_compat", "//tensorboard/compat:tensorflow", "//tensorboard/compat/proto:protos_all_py_pb2", "//tensorboard/plugins/distribution:compressor", @@ -205,6 +205,8 @@ py_test( srcs_version = "PY2AND3", deps = [ ":event_accumulator", + "//tensorboard:data_compat", + "//tensorboard:dataclass_compat", "//tensorboard:expect_tensorflow_installed", "//tensorboard/compat/proto:protos_all_py_pb2", "//tensorboard/plugins/audio:metadata", diff --git a/tensorboard/backend/event_processing/event_accumulator.py b/tensorboard/backend/event_processing/event_accumulator.py index 2833b40ee4..6020485095 100644 --- a/tensorboard/backend/event_processing/event_accumulator.py +++ b/tensorboard/backend/event_processing/event_accumulator.py @@ -821,11 +821,11 @@ def _GeneratorFromPath(path): if not path: raise ValueError("path must be a valid string") if io_wrapper.IsSummaryEventsFile(path): - return event_file_loader.EventFileLoader(path) + return event_file_loader.LegacyEventFileLoader(path) else: return directory_watcher.DirectoryWatcher( path, - event_file_loader.EventFileLoader, + event_file_loader.LegacyEventFileLoader, io_wrapper.IsSummaryEventsFile, ) diff --git a/tensorboard/backend/event_processing/event_file_inspector.py b/tensorboard/backend/event_processing/event_file_inspector.py index 284970944e..3f7d2c861a 100644 --- a/tensorboard/backend/event_processing/event_file_inspector.py +++ b/tensorboard/backend/event_processing/event_file_inspector.py @@ -347,7 +347,7 @@ def generators_from_logdir(logdir): def generator_from_event_file(event_file): """Returns a generator that yields events from an event file.""" - return event_file_loader.EventFileLoader(event_file).Load() + return event_file_loader.LegacyEventFileLoader(event_file).Load() def get_inspection_units(logdir="", event_file="", tag=""): diff --git a/tensorboard/backend/event_processing/event_file_loader.py b/tensorboard/backend/event_processing/event_file_loader.py index 58193db127..49e01b25bc 100644 --- a/tensorboard/backend/event_processing/event_file_loader.py +++ b/tensorboard/backend/event_processing/event_file_loader.py @@ -20,6 +20,8 @@ import contextlib +from tensorboard import data_compat +from tensorboard import dataclass_compat from tensorboard.compat import tf from tensorboard.compat.proto import event_pb2 from tensorboard.util import platform_util @@ -149,7 +151,7 @@ def Load(self): logger.debug("No more events in %s", self._file_path) -class EventFileLoader(RawEventFileLoader): +class LegacyEventFileLoader(RawEventFileLoader): """An iterator that yields parsed Event protos.""" def Load(self): @@ -161,10 +163,24 @@ def Load(self): Yields: All events in the file that have not been yielded yet. """ - for record in super(EventFileLoader, self).Load(): + for record in super(LegacyEventFileLoader, self).Load(): yield event_pb2.Event.FromString(record) +class EventFileLoader(LegacyEventFileLoader): + """An iterator that passes events through read-time compat layers. + + Specifically, this includes `data_compat` and `dataclass_compat`. + """ + + def Load(self): + for event in super(EventFileLoader, self).Load(): + event = data_compat.migrate_event(event) + events = dataclass_compat.migrate_event(event) + for event in events: + yield event + + class TimestampedEventFileLoader(EventFileLoader): """An iterator that yields (UNIX timestamp float, Event proto) pairs.""" diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 5455a1ddd1..f8f1490f03 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -22,8 +22,6 @@ import six -from tensorboard import data_compat -from tensorboard import dataclass_compat from tensorboard.backend.event_processing import directory_loader from tensorboard.backend.event_processing import directory_watcher from tensorboard.backend.event_processing import event_file_loader @@ -294,13 +292,6 @@ def AllSummaryMetadata(self): def _ProcessEvent(self, event): """Called whenever an event is loaded.""" - event = data_compat.migrate_event(event) - events = dataclass_compat.migrate_event(event) - for event in events: - self._ProcessMigratedEvent(event) - - def _ProcessMigratedEvent(self, event): - """Helper for `_ProcessEvent`.""" if self._first_event_timestamp is None: self._first_event_timestamp = event.wall_time diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py index abfda53fa2..08cdb67ae8 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator_test.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator_test.py @@ -24,6 +24,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf +from tensorboard import data_compat +from tensorboard import dataclass_compat from tensorboard.backend.event_processing import plugin_event_accumulator as ea from tensorboard.compat.proto import config_pb2 from tensorboard.compat.proto import event_pb2 @@ -60,7 +62,11 @@ def __init__(self, testcase, zero_out_timestamps=False): def Load(self): while self.items: - yield self.items.pop(0) + event = self.items.pop(0) + event = data_compat.migrate_event(event) + events = dataclass_compat.migrate_event(event) + for event in events: + yield event def AddScalarTensor(self, tag, wall_time=0, step=0, value=0): """Add a rank-0 tensor event. diff --git a/tensorboard/dataclass_compat_test.py b/tensorboard/dataclass_compat_test.py index d8f6bf3842..b704600e57 100644 --- a/tensorboard/dataclass_compat_test.py +++ b/tensorboard/dataclass_compat_test.py @@ -190,8 +190,8 @@ def test_graph_def(self): self.assertLen(files, 1) event_file = os.path.join(logdir, files[0]) self.assertIn("tfevents", event_file) - loader = event_file_loader.EventFileLoader(event_file) - events = list(loader.Load()) + loader = event_file_loader.RawEventFileLoader(event_file) + events = [event_pb2.Event.FromString(x) for x in loader.Load()] self.assertLen(events, 2) self.assertEqual(events[0].WhichOneof("what"), "file_version") self.assertEqual(events[1].WhichOneof("what"), "graph_def") diff --git a/tensorboard/uploader/BUILD b/tensorboard/uploader/BUILD index b87c30aa82..c71073ea47 100644 --- a/tensorboard/uploader/BUILD +++ b/tensorboard/uploader/BUILD @@ -96,8 +96,6 @@ py_library( deps = [ ":logdir_loader", ":util", - "//tensorboard:data_compat", - "//tensorboard:dataclass_compat", "//tensorboard:expect_grpc_installed", "//tensorboard/backend:process_graph", "//tensorboard/backend/event_processing:directory_loader", @@ -123,6 +121,8 @@ py_test( ":test_util", ":uploader_lib", ":util", + "//tensorboard:data_compat", + "//tensorboard:dataclass_compat", "//tensorboard:expect_grpc_installed", "//tensorboard:expect_grpc_testing_installed", "//tensorboard:expect_tensorflow_installed", diff --git a/tensorboard/uploader/uploader.py b/tensorboard/uploader/uploader.py index f413a1cf38..1a97a82d8a 100644 --- a/tensorboard/uploader/uploader.py +++ b/tensorboard/uploader/uploader.py @@ -33,8 +33,6 @@ from tensorboard.uploader.proto import experiment_pb2 from tensorboard.uploader import logdir_loader from tensorboard.uploader import util -from tensorboard import data_compat -from tensorboard import dataclass_compat from tensorboard.backend import process_graph from tensorboard.backend.event_processing import directory_loader from tensorboard.backend.event_processing import event_file_loader @@ -353,8 +351,7 @@ def send_requests(self, run_to_events): point is too large (say, due to a gigabyte-long tag name). """ - for (run_name, event, orig_value) in self._run_values(run_to_events): - value = data_compat.migrate_value(orig_value) + for (run_name, event, value) in self._run_values(run_to_events): time_series_key = (run_name, value.tag) # The metadata for a time series is memorized on the first event. @@ -408,10 +405,6 @@ def send_requests(self, run_to_events): def _run_values(self, run_to_events): """Helper generator to create a single stream of work items. - The events are passed through the `data_compat` and `dataclass_compat` - layers before being emitted, so downstream consumers may process them - uniformly. - Note that `dataclass_compat` may emit multiple variants of the same event, for backwards compatibility. Thus this stream should be filtered to obtain the desired version of each event. Here, we @@ -429,13 +422,9 @@ def _run_values(self, run_to_events): # such data from the request anyway. for (run_name, events) in six.iteritems(run_to_events): for event in events: - v2_event = data_compat.migrate_event(event) - events = dataclass_compat.migrate_event(v2_event) - events = _filter_graph_defs(events) - for event in events: - if event.summary: - for value in event.summary.value: - yield (run_name, event, value) + _filter_graph_defs(event) + for value in event.summary.value: + yield (run_name, event, value) class _ScalarBatchedRequestSender(object): @@ -839,24 +828,19 @@ def _varint_cost(n): return result -def _filter_graph_defs(events): - for e in events: - for v in e.summary.value: - if ( - v.metadata.plugin_data.plugin_name - != graphs_metadata.PLUGIN_NAME - ): - continue - if v.tag == graphs_metadata.RUN_GRAPH_NAME: - data = list(v.tensor.string_val) - filtered_data = [_filtered_graph_bytes(x) for x in data] - filtered_data = [x for x in filtered_data if x is not None] - if filtered_data != data: - new_tensor = tensor_util.make_tensor_proto( - filtered_data, dtype=types_pb2.DT_STRING - ) - v.tensor.CopyFrom(new_tensor) - yield e +def _filter_graph_defs(event): + for v in event.summary.value: + if v.metadata.plugin_data.plugin_name != graphs_metadata.PLUGIN_NAME: + continue + if v.tag == graphs_metadata.RUN_GRAPH_NAME: + data = list(v.tensor.string_val) + filtered_data = [_filtered_graph_bytes(x) for x in data] + filtered_data = [x for x in filtered_data if x is not None] + if filtered_data != data: + new_tensor = tensor_util.make_tensor_proto( + filtered_data, dtype=types_pb2.DT_STRING + ) + v.tensor.CopyFrom(new_tensor) def _filtered_graph_bytes(graph_bytes): diff --git a/tensorboard/uploader/uploader_test.py b/tensorboard/uploader/uploader_test.py index 32da8024bb..047a25fec1 100644 --- a/tensorboard/uploader/uploader_test.py +++ b/tensorboard/uploader/uploader_test.py @@ -34,6 +34,8 @@ import tensorflow as tf from google.protobuf import message +from tensorboard import data_compat +from tensorboard import dataclass_compat from tensorboard.uploader.proto import experiment_pb2 from tensorboard.uploader.proto import scalar_pb2 from tensorboard.uploader.proto import write_service_pb2 @@ -248,13 +250,23 @@ def scalar_event(tag, value): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ { - "run 1": [scalar_event("1.1", 5.0), scalar_event("1.2", 5.0)], - "run 2": [scalar_event("2.1", 5.0), scalar_event("2.2", 5.0)], + "run 1": _apply_compat( + [scalar_event("1.1", 5.0), scalar_event("1.2", 5.0)] + ), + "run 2": _apply_compat( + [scalar_event("2.1", 5.0), scalar_event("2.2", 5.0)] + ), }, { - "run 3": [scalar_event("3.1", 5.0), scalar_event("3.2", 5.0)], - "run 4": [scalar_event("4.1", 5.0), scalar_event("4.2", 5.0)], - "run 5": [scalar_event("5.1", 5.0), scalar_event("5.2", 5.0)], + "run 3": _apply_compat( + [scalar_event("3.1", 5.0), scalar_event("3.2", 5.0)] + ), + "run 4": _apply_compat( + [scalar_event("4.1", 5.0), scalar_event("4.2", 5.0)] + ), + "run 5": _apply_compat( + [scalar_event("5.1", 5.0), scalar_event("5.2", 5.0)] + ), }, AbortUploadError, ] @@ -295,13 +307,13 @@ def test_start_uploading_graphs(self): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ { - "run 1": [graph_event, graph_event], - "run 2": [graph_event, graph_event], + "run 1": _apply_compat([graph_event, graph_event]), + "run 2": _apply_compat([graph_event, graph_event]), }, { - "run 3": [graph_event, graph_event], - "run 4": [graph_event, graph_event], - "run 5": [graph_event, graph_event], + "run 3": _apply_compat([graph_event, graph_event]), + "run 4": _apply_compat([graph_event, graph_event]), + "run 5": _apply_compat([graph_event, graph_event]), }, AbortUploadError, ] @@ -347,7 +359,7 @@ def test_upload_skip_large_blob(self): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ - {"run 1": [graph_event],}, + {"run 1": _apply_compat([graph_event])}, AbortUploadError, ] @@ -445,8 +457,8 @@ def test_upload_server_error(self): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ - {"run 1": [graph_event],}, - {"run 1": [graph_event],}, + {"run 1": _apply_compat([graph_event])}, + {"run 1": _apply_compat([graph_event])}, AbortUploadError, ] @@ -488,8 +500,8 @@ def test_upload_same_graph_twice(self): mock_logdir_loader = mock.create_autospec(logdir_loader.LogdirLoader) mock_logdir_loader.get_run_events.side_effect = [ - {"run 1": [graph_event],}, - {"run 1": [graph_event],}, + {"run 1": _apply_compat([graph_event])}, + {"run 1": _apply_compat([graph_event])}, AbortUploadError, ] @@ -709,7 +721,7 @@ def _populate_run_from_events( api=mock_client, allowed_plugins=allowed_plugins, ) - builder.send_requests({"": events}) + builder.send_requests({"": _apply_compat(events)}) requests = [c[0][0] for c in mock_client.WriteScalar.call_args_list] if requests: self.assertLen(requests, 1) @@ -920,7 +932,7 @@ def test_no_room_for_single_point(self): event = event_pb2.Event(step=1, wall_time=123.456) event.summary.value.add(tag="foo", simple_value=1.0) long_run_name = "A" * uploader_lib._MAX_REQUEST_LENGTH_BYTES - run_to_events = {long_run_name: [event]} + run_to_events = {long_run_name: _apply_compat([event])} with self.assertRaises(RuntimeError) as cm: builder = _create_request_sender("123", mock_client) builder.send_requests(run_to_events) @@ -937,7 +949,10 @@ def test_break_at_run_boundary(self): event_2 = event_pb2.Event(step=2) event_2.summary.value.add(tag="bar", simple_value=-2.0) run_to_events = collections.OrderedDict( - [(long_run_1, [event_1]), (long_run_2, [event_2])] + [ + (long_run_1, _apply_compat([event_1])), + (long_run_2, _apply_compat([event_2])), + ] ) builder = _create_request_sender("123", mock_client) @@ -975,7 +990,7 @@ def test_break_at_tag_boundary(self): event = event_pb2.Event(step=1) event.summary.value.add(tag=long_tag_1, simple_value=1.0) event.summary.value.add(tag=long_tag_2, simple_value=2.0) - run_to_events = {"train": [event]} + run_to_events = {"train": _apply_compat([event])} builder = _create_request_sender("123", mock_client) builder.send_requests(run_to_events) @@ -1015,7 +1030,7 @@ def test_break_at_scalar_point_boundary(self): if step > 0: summary.value[0].ClearField("metadata") events.append(event_pb2.Event(summary=summary, step=step)) - run_to_events = {"train": events} + run_to_events = {"train": _apply_compat(events)} builder = _create_request_sender("123", mock_client) builder.send_requests(run_to_events) @@ -1050,7 +1065,10 @@ def test_prunes_tags_and_runs(self): event_2 = event_pb2.Event(step=2) event_2.summary.value.add(tag="bar", simple_value=-2.0) run_to_events = collections.OrderedDict( - [("train", [event_1]), ("test", [event_2])] + [ + ("train", _apply_compat([event_1])), + ("test", _apply_compat([event_2])), + ] ) real_create_point = ( @@ -1259,5 +1277,13 @@ def _clear_wall_times(request): point.ClearField("wall_time") +def _apply_compat(events): + for event in events: + event = data_compat.migrate_event(event) + events = dataclass_compat.migrate_event(event) + for event in events: + yield event + + if __name__ == "__main__": tf.test.main()