From eaaad6d710e9608d8c8b22d1283640705fc12c41 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 25 Mar 2020 18:13:16 -0700 Subject: [PATCH 1/6] data: optimize `read_scalars` by skipping scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Prior to this change, `read_scalars` (resp. `read_tensors`) delegated to `list_scalars` (resp. `list_tensors`) to find the set of time series to read. This is slower than it might sound, because `list_scalars` itself needs to scan over all relevant `multiplexer.Tensors` to identify `max_step` and `max_wall_time`, which are thrown away by `read_scalars`. (That `list_scalars` needs this full scan at all is its own issue; ideally, these would be memoized onto the event multiplexer.) When a `RunTagFilter` specifying a single run and tag is given, we optimize further by requesting individual `SummaryMetadata` rather than paring down `AllSummaryMetadata`. Resolves a comment of @nfelt on #2980: Test Plan: When applied on top of #3419, `:list_session_groups_test` improves from taking 11.1 seconds to taking 6.6 seconds on my machine. This doesn’t seem to fully generalize; I see only ~13% speedups in a microbenchmark that hammers `read_scalars` on a logdir with all the demo data, but the improvement on that test is important. wchargin-branch: data-read-without-list wchargin-source: bc728c60dcb0039a6f802eaf154205b7161e4796 --- .../backend/event_processing/data_provider.py | 105 +++++++++++------- .../plugin_event_accumulator.py | 9 ++ .../plugin_event_multiplexer.py | 15 +++ 3 files changed, 89 insertions(+), 40 deletions(-) diff --git a/tensorboard/backend/event_processing/data_provider.py b/tensorboard/backend/event_processing/data_provider.py index 54c7f04038..355a140c1e 100644 --- a/tensorboard/backend/event_processing/data_provider.py +++ b/tensorboard/backend/event_processing/data_provider.py @@ -109,76 +109,101 @@ def list_runs(self, experiment_id): def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None): self._validate_experiment_id(experiment_id) - run_tag_content = self._multiplexer.PluginRunToTagToContent(plugin_name) - return self._list( - provider.ScalarTimeSeries, - run_tag_content, - run_tag_filter, - summary_pb2.DATA_CLASS_SCALAR, + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_SCALAR ) + return self._list(provider.ScalarTimeSeries, index) def read_scalars( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): self._validate_downsample(downsample) - index = self.list_scalars( - experiment_id, plugin_name, run_tag_filter=run_tag_filter + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_SCALAR ) return self._read(_convert_scalar_event, index, downsample) def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None): self._validate_experiment_id(experiment_id) - run_tag_content = self._multiplexer.PluginRunToTagToContent(plugin_name) - return self._list( - provider.TensorTimeSeries, - run_tag_content, - run_tag_filter, - summary_pb2.DATA_CLASS_TENSOR, + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_TENSOR ) + return self._list(provider.TensorTimeSeries, index) def read_tensors( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): self._validate_downsample(downsample) - index = self.list_tensors( - experiment_id, plugin_name, run_tag_filter=run_tag_filter + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_TENSOR ) return self._read(_convert_tensor_event, index, downsample) - def _list( - self, - construct_time_series, - run_tag_content, - run_tag_filter, - data_class_filter, - ): - """Helper to list scalar or tensor time series. + def _index(self, plugin_name, run_tag_filter, data_class_filter): + """List time series and metadata matching the given filters. + + This is like `_list`, but doesn't traverse `Tensors(...)` to + compute metadata that's not always needed. Args: - construct_time_series: `ScalarTimeSeries` or `TensorTimeSeries`. - run_tag_content: Result of `_multiplexer.PluginRunToTagToContent(...)`. - run_tag_filter: As given by the client; may be `None`. - data_class_filter: A `summary_pb2.DataClass` value. Only time - series of this data class will be returned. + plugin_name: A string plugin name filter (required). + run_tag_filter: An `provider.RunTagFilter`, or `None`. + data_class_filter: A `summary_pb2.DataClass` filter (required). Returns: - A list of objects of type given by `construct_time_series`, - suitable to be returned from `list_scalars` or `list_tensors`. + A nested dict `d` such that `d[run][tag]` is a + `SummaryMetadata` proto. """ - result = {} if run_tag_filter is None: run_tag_filter = provider.RunTagFilter(runs=None, tags=None) - for (run, tag_to_content) in six.iteritems(run_tag_content): + runs = run_tag_filter.runs + tags = run_tag_filter.tags + + # Optimization for a common case, reading a single time series. + if runs and len(runs) == 1 and tags and len(tags) == 1: + (run,) = runs + (tag,) = tags + try: + metadata = self._multiplexer.SummaryMetadata(run, tag) + except KeyError: + return {} + all_metadata = {run: {tag: metadata}} + else: + all_metadata = self._multiplexer.AllSummaryMetadata() + + result = {} + for (run, tag_to_metadata) in all_metadata.items(): + if runs is not None and run not in runs: + continue result_for_run = {} - for tag in tag_to_content: - if not self._test_run_tag(run_tag_filter, run, tag): + for (tag, metadata) in tag_to_metadata.items(): + if tags is not None and tag not in tags: continue - if ( - self._multiplexer.SummaryMetadata(run, tag).data_class - != data_class_filter - ): + if metadata.data_class != data_class_filter: + continue + if metadata.plugin_data.plugin_name != plugin_name: continue result[run] = result_for_run + result_for_run[tag] = metadata + + return result + + def _list(self, construct_time_series, index): + """Helper to list scalar or tensor time series. + + Args: + construct_time_series: `ScalarTimeSeries` or `TensorTimeSeries`. + index: The result of `self._index(...)`. + + Returns: + A list of objects of type given by `construct_time_series`, + suitable to be returned from `list_scalars` or `list_tensors`. + """ + result = {} + for (run, tag_to_metadata) in index.items(): + result_for_run = {} + result[run] = result_for_run + for (tag, summary_metadata) in tag_to_metadata.items(): max_step = None max_wall_time = None for event in self._multiplexer.Tensors(run, tag): @@ -202,7 +227,7 @@ def _read(self, convert_event, index, downsample): Args: convert_event: Takes `plugin_event_accumulator.TensorEvent` to either `provider.ScalarDatum` or `provider.TensorDatum`. - index: The result of `list_scalars` or `list_tensors`. + index: The result of `self._index(...)`. downsample: Non-negative `int`; how many samples to return per time series. diff --git a/tensorboard/backend/event_processing/plugin_event_accumulator.py b/tensorboard/backend/event_processing/plugin_event_accumulator.py index 54ed732e10..5455a1ddd1 100644 --- a/tensorboard/backend/event_processing/plugin_event_accumulator.py +++ b/tensorboard/backend/event_processing/plugin_event_accumulator.py @@ -283,6 +283,15 @@ def SummaryMetadata(self, tag): """ return self.summary_metadata[tag] + def AllSummaryMetadata(self): + """Return summary metadata for all tags. + + Returns: + A dict `d` such that `d[tag]` is a `SummaryMetadata` proto for + the keyed tag. + """ + return dict(self.summary_metadata) + def _ProcessEvent(self, event): """Called whenever an event is loaded.""" event = data_compat.migrate_event(event) diff --git a/tensorboard/backend/event_processing/plugin_event_multiplexer.py b/tensorboard/backend/event_processing/plugin_event_multiplexer.py index 1e2ab7bbc0..d2d52d9486 100644 --- a/tensorboard/backend/event_processing/plugin_event_multiplexer.py +++ b/tensorboard/backend/event_processing/plugin_event_multiplexer.py @@ -456,6 +456,21 @@ def SummaryMetadata(self, run, tag): accumulator = self.GetAccumulator(run) return accumulator.SummaryMetadata(tag) + def AllSummaryMetadata(self): + """Return summary metadata for all time series. + + Returns: + A nested dict `d` such that `d[run][tag]` is a + `SummaryMetadata` proto for the keyed time series. + """ + with self._accumulators_mutex: + # To avoid nested locks, we construct a copy of the run-accumulator map + items = list(six.iteritems(self._accumulators)) + return { + run_name: accumulator.AllSummaryMetadata() + for run_name, accumulator in items + } + def Runs(self): """Return all the run names in the `EventMultiplexer`. From 7195a4fb605bd4cce36da583d2cc6248d57194f5 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 25 Mar 2020 20:01:20 -0700 Subject: [PATCH 2/6] data: add tests for blob sequence handling Summary: Follow-up to #2991. Fixes #3434. Test Plan: Tests pass as written. wchargin-branch: data-blob-sequence-tests wchargin-source: fbd3302933cb0c50609df970edf137202723c769 --- tensorboard/backend/event_processing/BUILD | 2 + .../event_processing/data_provider_test.py | 109 ++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index f7c898f395..2902d81e44 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -59,6 +59,8 @@ py_test( "//tensorboard/plugins/histogram:summary_v2", "//tensorboard/plugins/scalar:metadata", "//tensorboard/plugins/scalar:summary_v2", + "//tensorboard/plugins/image:metadata", + "//tensorboard/plugins/image:summary_v2", "//tensorboard/util:tensor_util", "@org_pythonhosted_six", ], diff --git a/tensorboard/backend/event_processing/data_provider_test.py b/tensorboard/backend/event_processing/data_provider_test.py index 46a10f8ffe..8e4adc2bfd 100644 --- a/tensorboard/backend/event_processing/data_provider_test.py +++ b/tensorboard/backend/event_processing/data_provider_test.py @@ -35,6 +35,8 @@ from tensorboard.plugins.histogram import summary_v2 as histogram_summary from tensorboard.plugins.scalar import metadata as scalar_metadata from tensorboard.plugins.scalar import summary_v2 as scalar_summary +from tensorboard.plugins.image import metadata as image_metadata +from tensorboard.plugins.image import summary_v2 as image_summary from tensorboard.util import tensor_util import tensorflow.compat.v1 as tf1 import tensorflow.compat.v2 as tf @@ -91,6 +93,27 @@ def setUp(self): name, tensor * i, step=i, description=description ) + logdir = os.path.join(self.logdir, "mondrian") + with tf.summary.create_file_writer(logdir).as_default(): + data = [ + ("red", (221, 28, 38), "top-right"), + ("blue", (1, 91, 158), "bottom-left"), + ("yellow", (239, 220, 111), "bottom-right"), + ] + for (name, color, description) in data: + image_1x1 = tf.constant([[[color]]], dtype=tf.uint8) + for i in xrange(1, 11): + k = 6 - abs(6 - i) # 1, .., 6, .., 2 + # a `k`-sample image summary of `i`-by-`i` images + image = tf.tile(image_1x1, [k, i, i, 1]) + image_summary.image( + name, + image, + step=i, + description=description, + max_outputs=99, + ) + def create_multiplexer(self): multiplexer = event_multiplexer.EventMultiplexer() multiplexer.AddRunsFromDirectory(self.logdir) @@ -115,6 +138,7 @@ def test_list_plugins_with_no_graph(self): "greetings", "marigraphs", histogram_metadata.PLUGIN_NAME, + image_metadata.PLUGIN_NAME, scalar_metadata.PLUGIN_NAME, ], ) @@ -134,6 +158,7 @@ def test_list_plugins_with_graph(self): "marigraphs", graph_metadata.PLUGIN_NAME, histogram_metadata.PLUGIN_NAME, + image_metadata.PLUGIN_NAME, scalar_metadata.PLUGIN_NAME, ], ) @@ -371,6 +396,90 @@ def test_read_tensors_downsamples(self): ) self.assertLen(result["lebesgue"]["uniform"], 3) + def test_list_blob_sequences(self): + provider = self.create_provider() + + with self.subTest("finds all time series for a plugin"): + result = provider.list_blob_sequences( + experiment_id="unused", plugin_name=image_metadata.PLUGIN_NAME + ) + self.assertItemsEqual(result.keys(), ["mondrian"]) + self.assertItemsEqual( + result["mondrian"].keys(), ["red", "blue", "yellow"] + ) + sample = result["mondrian"]["blue"] + self.assertIsInstance(sample, base_provider.BlobSequenceTimeSeries) + self.assertEqual(sample.max_step, 10) + # nothing to test for wall time, as it can't be mocked out + self.assertEqual(sample.plugin_content, b"") + self.assertEqual(sample.max_length, 6 + 2) + self.assertEqual(sample.description, "bottom-left") + self.assertEqual(sample.display_name, "") + + with self.subTest("filters by run/tag"): + result = provider.list_blob_sequences( + experiment_id="unused", + plugin_name=image_metadata.PLUGIN_NAME, + run_tag_filter=base_provider.RunTagFilter( + runs=["mondrian", "picasso"], tags=["yellow", "green't"] + ), + ) + self.assertItemsEqual(result.keys(), ["mondrian"]) + self.assertItemsEqual(result["mondrian"].keys(), ["yellow"]) + self.assertIsInstance( + result["mondrian"]["yellow"], + base_provider.BlobSequenceTimeSeries, + ) + + def test_read_blob_sequences_and_read_blob(self): + provider = self.create_provider() + + with self.subTest("reads all time series for a plugin"): + result = provider.read_blob_sequences( + experiment_id="unused", + plugin_name=image_metadata.PLUGIN_NAME, + downsample=4, + ) + self.assertItemsEqual(result.keys(), ["mondrian"]) + self.assertItemsEqual( + result["mondrian"].keys(), ["red", "blue", "yellow"] + ) + sample = result["mondrian"]["blue"] + self.assertLen(sample, 4) # downsampled from 10 + last = sample[-1] + self.assertIsInstance(last, base_provider.BlobSequenceDatum) + self.assertEqual(last.step, 10) + self.assertLen(last.values, 2 + 2) + blobs = [provider.read_blob(v.blob_key) for v in last.values] + self.assertEqual(blobs[0], b"10") + self.assertEqual(blobs[1], b"10") + self.assertStartsWith(blobs[2], b"\x89PNG") + self.assertStartsWith(blobs[3], b"\x89PNG") + + blue1 = blobs[2] + blue2 = blobs[3] + red1 = provider.read_blob( + result["mondrian"]["red"][-1].values[2].blob_key + ) + self.assertEqual(blue1, blue2) + self.assertNotEqual(blue1, red1) + + with self.subTest("filters by run/tag"): + result = provider.read_blob_sequences( + experiment_id="unused", + plugin_name=image_metadata.PLUGIN_NAME, + run_tag_filter=base_provider.RunTagFilter( + runs=["mondrian", "picasso"], tags=["yellow", "green't"] + ), + downsample=1, + ) + self.assertItemsEqual(result.keys(), ["mondrian"]) + self.assertItemsEqual(result["mondrian"].keys(), ["yellow"]) + self.assertIsInstance( + result["mondrian"]["yellow"][0], + base_provider.BlobSequenceDatum, + ) + class DownsampleTest(tf.test.TestCase): """Tests for the `_downsample` private helper function.""" From c4c8b13ecbec6ae79cb22b6a4925795af0cb0c9b Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 25 Mar 2020 20:02:12 -0700 Subject: [PATCH 3/6] [update patch] wchargin-branch: data-read-without-list wchargin-source: d768ced329672f2b307bd25681f111ebe1b929a8 --- .../backend/event_processing/data_provider.py | 117 +++++++++--------- 1 file changed, 58 insertions(+), 59 deletions(-) diff --git a/tensorboard/backend/event_processing/data_provider.py b/tensorboard/backend/event_processing/data_provider.py index 355a140c1e..d0865d5184 100644 --- a/tensorboard/backend/event_processing/data_provider.py +++ b/tensorboard/backend/event_processing/data_provider.py @@ -117,11 +117,14 @@ def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None): def read_scalars( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - self._validate_downsample(downsample) - index = self._index( - plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_SCALAR + return self._read( + _convert_scalar_event, + summary_pb2.DATA_CLASS_SCALAR, + experiment_id, + plugin_name, + downsample, + run_tag_filter, ) - return self._read(_convert_scalar_event, index, downsample) def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None): self._validate_experiment_id(experiment_id) @@ -133,11 +136,14 @@ def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None): def read_tensors( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - self._validate_downsample(downsample) - index = self._index( - plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_TENSOR + return self._read( + _convert_tensor_event, + summary_pb2.DATA_CLASS_TENSOR, + experiment_id, + plugin_name, + downsample, + run_tag_filter, ) - return self._read(_convert_tensor_event, index, downsample) def _index(self, plugin_name, run_tag_filter, data_class_filter): """List time series and metadata matching the given filters. @@ -221,27 +227,42 @@ def _list(self, construct_time_series, index): ) return result - def _read(self, convert_event, index, downsample): - """Helper to read scalar or tensor data from the multiplexer. + def _read( + self, + convert_event, + data_class_filter, + experiment_id, + plugin_name, + downsample, + run_tag_filter, + ): + """Helper to read scalar, tensor, or blob sequence data. Args: - convert_event: Takes `plugin_event_accumulator.TensorEvent` to - either `provider.ScalarDatum` or `provider.TensorDatum`. - index: The result of `self._index(...)`. - downsample: Non-negative `int`; how many samples to return per - time series. + convert_event: One of the `_convert_*_event` helpers. + data_class_filter: A `summary_pb2.DataClass` filter (required). + experiment_id: As to `read_*`. + plugin_name: As to `read_*`. + run_tag_filter: As to `read_*`. + downsample: As to `read_*`. Returns: A dict of dicts of values returned by `convert_event` calls, - suitable to be returned from `read_scalars` or `read_tensors`. + suitable to be returned from `read_*`. """ + self._validate_experiment_id(experiment_id) + self._validate_downsample(downsample) + index = self._index(plugin_name, run_tag_filter, data_class_filter) result = {} - for (run, tags_for_run) in six.iteritems(index): + for (run, tag_to_metadata) in index.items(): result_for_run = {} result[run] = result_for_run - for (tag, metadata) in six.iteritems(tags_for_run): + for tag in tag_to_metadata: events = self._multiplexer.Tensors(run, tag) - data = [convert_event(e) for e in events] + data = [ + convert_event(experiment_id, plugin_name, run, tag, e) + for e in events + ] result_for_run[tag] = _downsample(data, downsample) return result @@ -249,23 +270,14 @@ def list_blob_sequences( self, experiment_id, plugin_name, run_tag_filter=None ): self._validate_experiment_id(experiment_id) - if run_tag_filter is None: - run_tag_filter = provider.RunTagFilter(runs=None, tags=None) - + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_BLOB_SEQUENCE + ) result = {} - run_tag_content = self._multiplexer.PluginRunToTagToContent(plugin_name) - for (run, tag_to_content) in six.iteritems(run_tag_content): + for (run, tag_to_metadata) in index.items(): result_for_run = {} - for tag in tag_to_content: - if not self._test_run_tag(run_tag_filter, run, tag): - continue - summary_metadata = self._multiplexer.SummaryMetadata(run, tag) - if ( - summary_metadata.data_class - != summary_pb2.DATA_CLASS_BLOB_SEQUENCE - ): - continue - result[run] = result_for_run + result[run] = result_for_run + for (tag, metadata) in tag_to_metadata.items(): max_step = None max_wall_time = None max_length = None @@ -281,36 +293,23 @@ def list_blob_sequences( max_step=max_step, max_wall_time=max_wall_time, max_length=max_length, - plugin_content=summary_metadata.plugin_data.content, - description=summary_metadata.summary_description, - display_name=summary_metadata.display_name, + plugin_content=metadata.plugin_data.content, + description=metadata.summary_description, + display_name=metadata.display_name, ) return result def read_blob_sequences( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - self._validate_experiment_id(experiment_id) - self._validate_downsample(downsample) - index = self.list_blob_sequences( - experiment_id, plugin_name, run_tag_filter=run_tag_filter + return self._read( + _convert_blob_sequence_event, + summary_pb2.DATA_CLASS_BLOB_SEQUENCE, + experiment_id, + plugin_name, + downsample, + run_tag_filter, ) - result = {} - for (run, tags_for_run) in six.iteritems(index): - result_for_run = {} - result[run] = result_for_run - for (tag, metadata) in six.iteritems(tags_for_run): - events = self._multiplexer.Tensors(run, tag) - data_by_step = {} - for event in events: - if event.step in data_by_step: - continue - data_by_step[event.step] = _convert_blob_sequence_event( - experiment_id, plugin_name, run, tag, event - ) - data = [datum for (step, datum) in sorted(data_by_step.items())] - result_for_run[tag] = _downsample(data, downsample) - return result def read_blob(self, blob_key): ( @@ -394,7 +393,7 @@ def _decode_blob_key(key): return (experiment_id, plugin_name, run, tag, step, index) -def _convert_scalar_event(event): +def _convert_scalar_event(experiment_id, plugin_name, run, tag, event): """Helper for `read_scalars`.""" return provider.ScalarDatum( step=event.step, @@ -403,7 +402,7 @@ def _convert_scalar_event(event): ) -def _convert_tensor_event(event): +def _convert_tensor_event(experiment_id, plugin_name, run, tag, event): """Helper for `read_tensors`.""" return provider.TensorDatum( step=event.step, From 7335259825d1238ee949eb93dd64959b3b44344f Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 25 Mar 2020 20:03:44 -0700 Subject: [PATCH 4/6] [update patch] wchargin-branch: data-blob-sequence-tests wchargin-source: 664b9b53b60a76eacbd85ecca3335e62c172acf0 --- tensorboard/backend/event_processing/BUILD | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorboard/backend/event_processing/BUILD b/tensorboard/backend/event_processing/BUILD index 2902d81e44..9b7260155b 100644 --- a/tensorboard/backend/event_processing/BUILD +++ b/tensorboard/backend/event_processing/BUILD @@ -57,10 +57,10 @@ py_test( "//tensorboard/plugins/graph:metadata", "//tensorboard/plugins/histogram:metadata", "//tensorboard/plugins/histogram:summary_v2", - "//tensorboard/plugins/scalar:metadata", - "//tensorboard/plugins/scalar:summary_v2", "//tensorboard/plugins/image:metadata", "//tensorboard/plugins/image:summary_v2", + "//tensorboard/plugins/scalar:metadata", + "//tensorboard/plugins/scalar:summary_v2", "//tensorboard/util:tensor_util", "@org_pythonhosted_six", ], From bbd08414d8be4d9d99e70e8b6bcbdf414279b03b Mon Sep 17 00:00:00 2001 From: William Chargin Date: Thu, 26 Mar 2020 11:04:24 -0700 Subject: [PATCH 5/6] [update patch] wchargin-branch: data-blob-sequence-tests wchargin-source: 317d4fc9ae0fb952360f5aa7a2f8c235ffc6b177 --- tensorboard/backend/event_processing/data_provider_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tensorboard/backend/event_processing/data_provider_test.py b/tensorboard/backend/event_processing/data_provider_test.py index 8e4adc2bfd..1cb74f3f25 100644 --- a/tensorboard/backend/event_processing/data_provider_test.py +++ b/tensorboard/backend/event_processing/data_provider_test.py @@ -103,6 +103,8 @@ def setUp(self): for (name, color, description) in data: image_1x1 = tf.constant([[[color]]], dtype=tf.uint8) for i in xrange(1, 11): + # Use a non-monotonic sequence of sample sizes to + # test `max_length` calculation. k = 6 - abs(6 - i) # 1, .., 6, .., 2 # a `k`-sample image summary of `i`-by-`i` images image = tf.tile(image_1x1, [k, i, i, 1]) From aa30fe091e315b0e22033310339336f6c5756a32 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Thu, 26 Mar 2020 11:53:06 -0700 Subject: [PATCH 6/6] [update patch] wchargin-branch: data-read-without-list wchargin-source: e80cdf315b1ae6ed31e5e60f43361a4f1d45a0ee --- .../backend/event_processing/data_provider.py | 92 +++++++++---------- 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/tensorboard/backend/event_processing/data_provider.py b/tensorboard/backend/event_processing/data_provider.py index d0865d5184..ae3f6ba49f 100644 --- a/tensorboard/backend/event_processing/data_provider.py +++ b/tensorboard/backend/event_processing/data_provider.py @@ -117,14 +117,12 @@ def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None): def read_scalars( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - return self._read( - _convert_scalar_event, - summary_pb2.DATA_CLASS_SCALAR, - experiment_id, - plugin_name, - downsample, - run_tag_filter, + self._validate_experiment_id(experiment_id) + self._validate_downsample(downsample) + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_SCALAR ) + return self._read(_convert_scalar_event, index, downsample) def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None): self._validate_experiment_id(experiment_id) @@ -136,14 +134,12 @@ def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None): def read_tensors( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - return self._read( - _convert_tensor_event, - summary_pb2.DATA_CLASS_TENSOR, - experiment_id, - plugin_name, - downsample, - run_tag_filter, + self._validate_experiment_id(experiment_id) + self._validate_downsample(downsample) + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_TENSOR ) + return self._read(_convert_tensor_event, index, downsample) def _index(self, plugin_name, run_tag_filter, data_class_filter): """List time series and metadata matching the given filters. @@ -227,42 +223,27 @@ def _list(self, construct_time_series, index): ) return result - def _read( - self, - convert_event, - data_class_filter, - experiment_id, - plugin_name, - downsample, - run_tag_filter, - ): - """Helper to read scalar, tensor, or blob sequence data. + def _read(self, convert_event, index, downsample): + """Helper to read scalar or tensor data from the multiplexer. Args: - convert_event: One of the `_convert_*_event` helpers. - data_class_filter: A `summary_pb2.DataClass` filter (required). - experiment_id: As to `read_*`. - plugin_name: As to `read_*`. - run_tag_filter: As to `read_*`. - downsample: As to `read_*`. + convert_event: Takes `plugin_event_accumulator.TensorEvent` to + either `provider.ScalarDatum` or `provider.TensorDatum`. + index: The result of `self._index(...)`. + downsample: Non-negative `int`; how many samples to return per + time series. Returns: A dict of dicts of values returned by `convert_event` calls, - suitable to be returned from `read_*`. + suitable to be returned from `read_scalars` or `read_tensors`. """ - self._validate_experiment_id(experiment_id) - self._validate_downsample(downsample) - index = self._index(plugin_name, run_tag_filter, data_class_filter) result = {} - for (run, tag_to_metadata) in index.items(): + for (run, tags_for_run) in six.iteritems(index): result_for_run = {} result[run] = result_for_run - for tag in tag_to_metadata: + for (tag, metadata) in six.iteritems(tags_for_run): events = self._multiplexer.Tensors(run, tag) - data = [ - convert_event(experiment_id, plugin_name, run, tag, e) - for e in events - ] + data = [convert_event(e) for e in events] result_for_run[tag] = _downsample(data, downsample) return result @@ -302,14 +283,27 @@ def list_blob_sequences( def read_blob_sequences( self, experiment_id, plugin_name, downsample=None, run_tag_filter=None ): - return self._read( - _convert_blob_sequence_event, - summary_pb2.DATA_CLASS_BLOB_SEQUENCE, - experiment_id, - plugin_name, - downsample, - run_tag_filter, + self._validate_experiment_id(experiment_id) + self._validate_downsample(downsample) + index = self._index( + plugin_name, run_tag_filter, summary_pb2.DATA_CLASS_BLOB_SEQUENCE ) + result = {} + for (run, tags) in six.iteritems(index): + result_for_run = {} + result[run] = result_for_run + for tag in tags: + events = self._multiplexer.Tensors(run, tag) + data_by_step = {} + for event in events: + if event.step in data_by_step: + continue + data_by_step[event.step] = _convert_blob_sequence_event( + experiment_id, plugin_name, run, tag, event + ) + data = [datum for (step, datum) in sorted(data_by_step.items())] + result_for_run[tag] = _downsample(data, downsample) + return result def read_blob(self, blob_key): ( @@ -393,7 +387,7 @@ def _decode_blob_key(key): return (experiment_id, plugin_name, run, tag, step, index) -def _convert_scalar_event(experiment_id, plugin_name, run, tag, event): +def _convert_scalar_event(event): """Helper for `read_scalars`.""" return provider.ScalarDatum( step=event.step, @@ -402,7 +396,7 @@ def _convert_scalar_event(experiment_id, plugin_name, run, tag, event): ) -def _convert_tensor_event(experiment_id, plugin_name, run, tag, event): +def _convert_tensor_event(event): """Helper for `read_tensors`.""" return provider.TensorDatum( step=event.step,