Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ py_test(
deps = [
":event_accumulator",
":event_multiplexer",
"//tensorboard:expect_numpy_installed",
"//tensorboard:expect_tensorflow_installed",
],
)
Expand Down
98 changes: 77 additions & 21 deletions tensorboard/backend/event_processing/data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _get_first_event_timestamp(self, run_name):
return None

def data_location(self, experiment_id):
del experiment_id # ignored
del experiment_id # ignored
return str(self._logdir)

def list_runs(self, experiment_id):
Expand All @@ -72,8 +72,69 @@ def list_runs(self, experiment_id):
]

def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None):
del experiment_id # ignored for now
run_tag_content = self._multiplexer.PluginRunToTagToContent(plugin_name)
return self._list(
provider.ScalarTimeSeries, run_tag_content, run_tag_filter
)

def read_scalars(
self, experiment_id, plugin_name, downsample=None, run_tag_filter=None
):
# TODO(@wchargin): Downsampling not implemented, as the multiplexer
# is already downsampled. We could downsample on top of the existing
# sampling, which would be nice for testing.
del downsample # ignored for now
index = self.list_scalars(
experiment_id, plugin_name, run_tag_filter=run_tag_filter
)

def convert_scalar_event(event):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real need to change this but FWIW I'd be kind of inclined not to inline these, even though they're only used within single methods, since they don't actually close over any state, so this somewhat needlessly redefines the helper each time we call the API. In general I'm more of a fan of nested functions than the style guide but they do have some downsides like worse stack traces, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough; done in #2994.

return provider.ScalarDatum(
step=event.step,
wall_time=event.wall_time,
value=tensor_util.make_ndarray(event.tensor_proto).item(),
)

return self._read(convert_scalar_event, index)

def list_tensors(self, experiment_id, plugin_name, run_tag_filter=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally don't think copy/paste is a bad thing but do we expect these implementations (list_tensors and read_tensors) to differ much beyond TensorTimeSeries and convert_tensor_event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect list_tensors and read_tensors to remain similar to their
scalar counterparts, though the scalar counterparts could gain
additional features, like “list scalar time series with tag name
accuracy whose moving average is above 0.999”, that don’t make sense
for general tensor time series. (This is why we chose to separate scalar
time series out in the first place, even though the basic functionality
is subsumed by tensor time series.)

In such a future, I’d be happy to inline self._read and self._list
with appropriate changes. If you’d prefer that these be inlined today,
that’s also fine with me. I’m not exactly sure what you’re asking, so
let me know if you’d like any changes?

run_tag_content = self._multiplexer.PluginRunToTagToContent(plugin_name)
return self._list(
provider.TensorTimeSeries, run_tag_content, run_tag_filter
)

def read_tensors(
self, experiment_id, plugin_name, downsample=None, run_tag_filter=None
):
# TODO(@wchargin): Downsampling not implemented, as the multiplexer
# is already downsampled. We could downsample on top of the existing
# sampling, which would be nice for testing.
del downsample # ignored for now
index = self.list_tensors(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I had no objection in the original PR, but isn't it kind of inefficient to implement the read operation in terms of listing? The list operation already iterates over all the tensors once in order to determine max step and wall time, but then the read operation doesn't return that metadata at all (it's just discarded) so the computation is wasted, and we have to iterate over the tensors again a second time to get their actual values.

Performance wise it's probably not the end of the world, but it seems suboptimal. (The same argument we had for the TB.dev backend might suggest also that we don't really need to return max step and wall time all the time; if we made those optional then listing could be made more efficient and the reuse wouldn't result in as much redundant iteration, although it's still a little duplicative even then.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed. I went with this because it was convenient and because, as
you say, it’s not the end of the world (due to downsampling, these
queries are all bounded). For a data provider implementation where this
actually required an extra RPC, I of course would make a different call.

I’d be happy to accept a change that streamlined this, but it’s not high
enough on my priorities for me to do so myself, unless you feel strongly
about it.

experiment_id, plugin_name, run_tag_filter=run_tag_filter
)

def convert_tensor_event(event):
return provider.TensorDatum(
step=event.step,
wall_time=event.wall_time,
numpy=tensor_util.make_ndarray(event.tensor_proto),
)

return self._read(convert_tensor_event, index)

def _list(self, construct_time_series, run_tag_content, run_tag_filter):
"""Helper to list scalar or tensor time series.

Args:
construct_time_series: `ScalarTimeSeries` or `TensorTimeSeries`.
run_tag_content: Result of `_multiplexer.PluginRunToTagToContent(...)`.
run_tag_filter: As given by the client; may be `None`.

Returns:
A list of objects of type given by `construct_time_series`,
suitable to be returned from `list_scalars` or `list_tensors`.
"""
result = {}
if run_tag_filter is None:
run_tag_filter = provider.RunTagFilter(runs=None, tags=None)
Expand All @@ -91,7 +152,7 @@ def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None):
if max_wall_time is None or max_wall_time < event.wall_time:
max_wall_time = event.wall_time
summary_metadata = self._multiplexer.SummaryMetadata(run, tag)
result_for_run[tag] = provider.ScalarTimeSeries(
result_for_run[tag] = construct_time_series(
max_step=max_step,
max_wall_time=max_wall_time,
plugin_content=summary_metadata.plugin_data.content,
Expand All @@ -100,28 +161,23 @@ def list_scalars(self, experiment_id, plugin_name, run_tag_filter=None):
)
return result

def read_scalars(
self, experiment_id, plugin_name, downsample=None, run_tag_filter=None
):
# TODO(@wchargin): Downsampling not implemented, as the multiplexer
# is already downsampled. We could downsample on top of the existing
# sampling, which would be nice for testing.
del downsample # ignored for now
index = self.list_scalars(
experiment_id, plugin_name, run_tag_filter=run_tag_filter
)
def _read(self, convert_event, index):
"""Helper to read scalar or tensor data from the multiplexer.

Args:
convert_event: Takes `plugin_event_accumulator.TensorEvent` to
either `provider.ScalarDatum` or `provider.TensorDatum`.
index: The result of `list_scalars` or `list_tensors`.

Returns:
A dict of dicts of values returned by `convert_event` calls,
suitable to be returned from `read_scalars` or `read_tensors`.
"""
result = {}
for (run, tags_for_run) in six.iteritems(index):
result_for_run = {}
result[run] = result_for_run
for (tag, metadata) in six.iteritems(tags_for_run):
events = self._multiplexer.Tensors(run, tag)
result_for_run[tag] = [self._convert_scalar_event(e) for e in events]
result_for_run[tag] = [convert_event(e) for e in events]
return result

def _convert_scalar_event(self, event):
return provider.ScalarDatum(
step=event.step,
wall_time=event.wall_time,
value=tensor_util.make_ndarray(event.tensor_proto).item(),
)
71 changes: 68 additions & 3 deletions tensorboard/backend/event_processing/data_provider_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import six
from six.moves import xrange # pylint: disable=redefined-builtin
import numpy as np

from tensorboard.backend.event_processing import data_provider
from tensorboard.backend.event_processing import (
Expand Down Expand Up @@ -64,9 +65,15 @@ def setUp(self):

logdir = os.path.join(self.logdir, "pictures")
with tf.summary.create_file_writer(logdir).as_default():
purple = tf.constant([[[255, 0, 255]]], dtype=tf.uint8)
for i in xrange(1, 11):
image_summary.image("purple", [tf.tile(purple, [i, i, 1])], step=i)
colors = [
("`#F0F`", (255, 0, 255), "purple"),
("`#0F0`", (255, 0, 255), "green"),
]
for (description, rgb, name) in colors:
pixel = tf.constant([[list(rgb)]], dtype=tf.uint8)
for i in xrange(1, 11):
pixels = [tf.tile(pixel, [i, i, 1])]
image_summary.image(name, pixels, step=i, description=description)

def create_multiplexer(self):
multiplexer = event_multiplexer.EventMultiplexer()
Expand Down Expand Up @@ -211,6 +218,64 @@ def test_read_scalars_but_not_rank_0(self):
run_tag_filter=run_tag_filter,
)

def test_list_tensors_all(self):
provider = self.create_provider()
result = provider.list_tensors(
experiment_id="unused",
plugin_name=image_metadata.PLUGIN_NAME,
run_tag_filter=None,
)
self.assertItemsEqual(result.keys(), ["pictures"])
self.assertItemsEqual(result["pictures"].keys(), ["purple", "green"])
sample = result["pictures"]["purple"]
self.assertIsInstance(sample, base_provider.TensorTimeSeries)
self.assertEqual(sample.max_step, 10)
# nothing to test for wall time, as it can't be mocked out
self.assertEqual(sample.plugin_content, b"")
self.assertEqual(sample.display_name, "") # not written by V2 summary ops
self.assertEqual(sample.description, "`#F0F`")

def test_list_tensors_filters(self):
provider = self.create_provider()

# Quick check only, as scalars and tensors use the same underlying
# filtering implementation.
result = provider.list_tensors(
experiment_id="unused",
plugin_name=image_metadata.PLUGIN_NAME,
run_tag_filter=base_provider.RunTagFilter(["pictures"], ["green"]),
)
self.assertItemsEqual(result.keys(), ["pictures"])
self.assertItemsEqual(result["pictures"].keys(), ["green"])

def test_read_tensors(self):
multiplexer = self.create_multiplexer()
provider = data_provider.MultiplexerDataProvider(multiplexer, self.logdir)

run_tag_filter = base_provider.RunTagFilter(
runs=["pictures"],
tags=["purple", "green"],
)
result = provider.read_tensors(
experiment_id="unused",
plugin_name=image_metadata.PLUGIN_NAME,
run_tag_filter=run_tag_filter,
downsample=None, # not yet implemented
)

self.assertItemsEqual(result.keys(), ["pictures"])
self.assertItemsEqual(result["pictures"].keys(), ["purple", "green"])
for run in result:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: It may be easier to assert by forming an expected map of TensorTimeSeries and just do assertEqual since we implemented eq.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but I wanted to use np.testing.assert_equal for better error
messages than “giant_numpy_array_1 != giant_numpy_array_2”.

for tag in result[run]:
tensor_events = multiplexer.Tensors(run, tag)
self.assertLen(result[run][tag], len(tensor_events))
for (datum, event) in zip(result[run][tag], tensor_events):
self.assertEqual(datum.step, event.step)
self.assertEqual(datum.wall_time, event.wall_time)
np.testing.assert_equal(
datum.numpy, tensor_util.make_ndarray(event.tensor_proto)
)


if __name__ == "__main__":
tf.test.main()