diff --git a/tensorboard/data/BUILD b/tensorboard/data/BUILD index 45e151cfa9..508c607789 100644 --- a/tensorboard/data/BUILD +++ b/tensorboard/data/BUILD @@ -124,6 +124,7 @@ py_library( "//tensorboard:expect_grpc_installed", "//tensorboard/data/proto:protos_all_py_pb2", "//tensorboard/data/proto:protos_all_py_pb2_grpc", + "//tensorboard/util:tensor_util", "//tensorboard/util:timing", ], ) @@ -141,8 +142,10 @@ py_test( "//tensorboard:errors", "//tensorboard:expect_grpc_installed", "//tensorboard:expect_grpc_testing_installed", + "//tensorboard:expect_numpy_installed", "//tensorboard:test", "//tensorboard/data/proto:protos_all_py_pb2", "//tensorboard/data/proto:protos_all_py_pb2_grpc", + "//tensorboard/util:tensor_util", ], ) diff --git a/tensorboard/data/grpc_provider.py b/tensorboard/data/grpc_provider.py index 656123d894..19b2082b02 100644 --- a/tensorboard/data/grpc_provider.py +++ b/tensorboard/data/grpc_provider.py @@ -18,6 +18,7 @@ import grpc +from tensorboard.util import tensor_util from tensorboard.util import timing from tensorboard import errors from tensorboard.data import provider @@ -134,6 +135,71 @@ def read_scalars( series.append(point) return result + @timing.log_latency + def list_tensors( + self, ctx, *, experiment_id, plugin_name, run_tag_filter=None + ): + with timing.log_latency("build request"): + req = data_provider_pb2.ListTensorsRequest() + req.experiment_id = experiment_id + req.plugin_filter.plugin_name = plugin_name + _populate_rtf(run_tag_filter, req.run_tag_filter) + with timing.log_latency("_stub.ListTensors"): + with _translate_grpc_error(): + res = self._stub.ListTensors(req) + with timing.log_latency("build result"): + result = {} + for run_entry in res.runs: + tags = {} + result[run_entry.run_name] = tags + for tag_entry in run_entry.tags: + time_series = tag_entry.metadata + tags[tag_entry.tag_name] = provider.TensorTimeSeries( + max_step=time_series.max_step, + max_wall_time=time_series.max_wall_time, + plugin_content=time_series.summary_metadata.plugin_data.content, + description=time_series.summary_metadata.summary_description, + display_name=time_series.summary_metadata.display_name, + ) + return result + + @timing.log_latency + def read_tensors( + self, + ctx, + *, + experiment_id, + plugin_name, + downsample=None, + run_tag_filter=None, + ): + with timing.log_latency("build request"): + req = data_provider_pb2.ReadTensorsRequest() + req.experiment_id = experiment_id + req.plugin_filter.plugin_name = plugin_name + _populate_rtf(run_tag_filter, req.run_tag_filter) + req.downsample.num_points = downsample + with timing.log_latency("_stub.ReadTensors"): + with _translate_grpc_error(): + res = self._stub.ReadTensors(req) + with timing.log_latency("build result"): + result = {} + for run_entry in res.runs: + tags = {} + result[run_entry.run_name] = tags + for tag_entry in run_entry.tags: + series = [] + tags[tag_entry.tag_name] = series + d = tag_entry.data + for (step, wt, value) in zip(d.step, d.wall_time, d.value): + point = provider.TensorDatum( + step=step, + wall_time=wt, + numpy=tensor_util.make_ndarray(value), + ) + series.append(point) + return result + @timing.log_latency def list_blob_sequences( self, ctx, experiment_id, plugin_name, run_tag_filter=None diff --git a/tensorboard/data/grpc_provider_test.py b/tensorboard/data/grpc_provider_test.py index dd7a66a10b..9f1c75f156 100644 --- a/tensorboard/data/grpc_provider_test.py +++ b/tensorboard/data/grpc_provider_test.py @@ -17,6 +17,7 @@ import grpc import grpc_testing +import numpy as np from tensorboard import errors from tensorboard import test as tb_test @@ -25,6 +26,7 @@ from tensorboard.data import provider from tensorboard.data.proto import data_provider_pb2 from tensorboard.data.proto import data_provider_pb2_grpc +from tensorboard.util import tensor_util def _create_mock_client(): @@ -54,10 +56,11 @@ def test_list_plugins(self): res = data_provider_pb2.ListPluginsResponse() res.plugins.add(name="scalars") res.plugins.add(name="images") + res.plugins.add(name="text") self.stub.ListPlugins.return_value = res actual = self.provider.list_plugins(self.ctx, experiment_id="123") - self.assertEqual(actual, ["scalars", "images"]) + self.assertEqual(actual, ["scalars", "images", "text"]) req = data_provider_pb2.ListPluginsRequest() req.experiment_id = "123" @@ -174,6 +177,112 @@ def test_read_scalars(self): req.downsample.num_points = 4 self.stub.ReadScalars.assert_called_once_with(req) + def test_list_tensors(self): + res = data_provider_pb2.ListTensorsResponse() + run1 = res.runs.add(run_name="val") + tag11 = run1.tags.add(tag_name="weights") + tag11.metadata.max_step = 7 + tag11.metadata.max_wall_time = 7.77 + tag11.metadata.summary_metadata.plugin_data.content = b"magic" + tag11.metadata.summary_metadata.summary_description = "hey" + tag12 = run1.tags.add(tag_name="other") + tag12.metadata.max_step = 8 + tag12.metadata.max_wall_time = 8.88 + run2 = res.runs.add(run_name="test") + tag21 = run2.tags.add(tag_name="weights") + tag21.metadata.max_step = 9 + tag21.metadata.max_wall_time = 9.99 + self.stub.ListTensors.return_value = res + + actual = self.provider.list_tensors( + self.ctx, + experiment_id="123", + plugin_name="histograms", + run_tag_filter=provider.RunTagFilter(tags=["weights", "other"]), + ) + expected = { + "val": { + "weights": provider.TensorTimeSeries( + max_step=7, + max_wall_time=7.77, + plugin_content=b"magic", + description="hey", + display_name="", + ), + "other": provider.TensorTimeSeries( + max_step=8, + max_wall_time=8.88, + plugin_content=b"", + description="", + display_name="", + ), + }, + "test": { + "weights": provider.TensorTimeSeries( + max_step=9, + max_wall_time=9.99, + plugin_content=b"", + description="", + display_name="", + ), + }, + } + self.assertEqual(actual, expected) + + req = data_provider_pb2.ListTensorsRequest() + req.experiment_id = "123" + req.plugin_filter.plugin_name = "histograms" + req.run_tag_filter.tags.names.extend(["other", "weights"]) # sorted + self.stub.ListTensors.assert_called_once_with(req) + + def test_read_tensors(self): + res = data_provider_pb2.ReadTensorsResponse() + run = res.runs.add(run_name="test") + tag = run.tags.add(tag_name="weights") + tag.data.step.extend([0, 1, 2]) + tag.data.wall_time.extend([1234.0, 1235.0, 1236.0]) + tag.data.value.append(tensor_util.make_tensor_proto([0.0, 0.0, 42.0])) + tag.data.value.append(tensor_util.make_tensor_proto([1.0, 1.0, 43.0])) + tag.data.value.append(tensor_util.make_tensor_proto([2.0, 2.0, 44.0])) + self.stub.ReadTensors.return_value = res + + actual = self.provider.read_tensors( + self.ctx, + experiment_id="123", + plugin_name="histograms", + run_tag_filter=provider.RunTagFilter(runs=["test", "nope"]), + downsample=3, + ) + expected = { + "test": { + "weights": [ + provider.TensorDatum( + step=0, + wall_time=1234.0, + numpy=np.array([0.0, 0.0, 42.0]), + ), + provider.TensorDatum( + step=1, + wall_time=1235.0, + numpy=np.array([1.0, 1.0, 43.0]), + ), + provider.TensorDatum( + step=2, + wall_time=1236.0, + numpy=np.array([2.0, 2.0, 44.0]), + ), + ], + }, + } + self.assertEqual(actual, expected) + + req = data_provider_pb2.ReadTensorsRequest() + req.experiment_id = "123" + req.plugin_filter.plugin_name = "histograms" + req.run_tag_filter.runs.names.extend(["nope", "test"]) # sorted + req.downsample.num_points = 3 + self.stub.ReadTensors.assert_called_once_with(req) + def test_list_blob_sequences(self): res = data_provider_pb2.ListBlobSequencesResponse() run1 = res.runs.add(run_name="train") diff --git a/tensorboard/data/server/server.rs b/tensorboard/data/server/server.rs index cffb753f73..12ec18ab57 100644 --- a/tensorboard/data/server/server.rs +++ b/tensorboard/data/server/server.rs @@ -70,6 +70,7 @@ impl TensorBoardDataProvider for DataProviderHandler { .read() .map_err(|_| Status::internal(format!("failed to read run data for {:?}", run)))?; for metadata in (data.scalars.values().map(|ts| ts.metadata.as_ref())) + .chain(data.tensors.values().map(|ts| ts.metadata.as_ref())) .chain(data.blob_sequences.values().map(|ts| ts.metadata.as_ref())) { let plugin_name = match &metadata.plugin_data { @@ -234,16 +235,114 @@ impl TensorBoardDataProvider for DataProviderHandler { async fn list_tensors( &self, - _request: Request, + req: Request, ) -> Result, Status> { - Err(Status::unimplemented("not yet implemented")) + let req = req.into_inner(); + let want_plugin = parse_plugin_filter(req.plugin_filter)?; + let (run_filter, tag_filter) = parse_rtf(req.run_tag_filter); + let runs = self.read_runs()?; + + let mut res: data::ListTensorsResponse = Default::default(); + for (run, data) in runs.iter() { + if !run_filter.want(run) { + continue; + } + let data = data + .read() + .map_err(|_| Status::internal(format!("failed to read run data for {:?}", run)))?; + let mut run_res: data::list_tensors_response::RunEntry = Default::default(); + for (tag, ts) in &data.tensors { + if !tag_filter.want(tag) { + continue; + } + if plugin_name(&ts.metadata) != Some(&want_plugin) { + continue; + } + let max_step = match ts.valid_values().last() { + None => continue, + Some((step, _, _)) => step, + }; + // TODO(@wchargin): Consider tracking this on the time series itself? + let max_wall_time = ts + .valid_values() + .map(|(_, wt, _)| wt) + .max() + .expect("have valid values for step but not wall time"); + run_res.tags.push(data::list_tensors_response::TagEntry { + tag_name: tag.0.clone(), + metadata: Some(data::TensorMetadata { + max_step: max_step.into(), + max_wall_time: max_wall_time.into(), + summary_metadata: Some(*ts.metadata.clone()), + ..Default::default() + }), + }); + } + if !run_res.tags.is_empty() { + run_res.run_name = run.0.clone(); + res.runs.push(run_res); + } + } + + Ok(Response::new(res)) } async fn read_tensors( &self, - _request: Request, + req: Request, ) -> Result, Status> { - Err(Status::unimplemented("not yet implemented")) + let req = req.into_inner(); + let want_plugin = parse_plugin_filter(req.plugin_filter)?; + let (run_filter, tag_filter) = parse_rtf(req.run_tag_filter); + let num_points = parse_downsample(req.downsample)?; + let runs = self.read_runs()?; + + let mut res: data::ReadTensorsResponse = Default::default(); + for (run, data) in runs.iter() { + if !run_filter.want(run) { + continue; + } + let data = data + .read() + .map_err(|_| Status::internal(format!("failed to read run data for {:?}", run)))?; + let mut run_res: data::read_tensors_response::RunEntry = Default::default(); + for (tag, ts) in &data.tensors { + if !tag_filter.want(tag) { + continue; + } + if plugin_name(&ts.metadata) != Some(&want_plugin) { + continue; + } + + let mut points = ts.valid_values().collect::>(); + downsample::downsample(&mut points, num_points); + let n = points.len(); + let mut steps = Vec::with_capacity(n); + let mut wall_times = Vec::with_capacity(n); + let mut values = Vec::with_capacity(n); + for (step, wall_time, value) in points { + steps.push(step.into()); + wall_times.push(wall_time.into()); + // Clone the TensorProto to get a copy to send in the response. + values.push(value.clone()); + } + + run_res.tags.push(data::read_tensors_response::TagEntry { + tag_name: tag.0.clone(), + data: Some(data::TensorData { + step: steps, + wall_time: wall_times, + value: values, + }), + }); + } + if !run_res.tags.is_empty() { + run_res.run_name = run.0.clone(); + res.runs.push(run_res); + } + } + + Ok(Response::new(res)) } async fn list_blob_sequences( @@ -544,6 +643,9 @@ mod tests { async fn test_list_plugins() { let commit = CommitBuilder::new() .scalars("train", "xent", |b| b.build()) + .tensors("train", "weights", |mut b| { + b.plugin_name("histograms").build() + }) .blob_sequences("train", "input_image", |mut b| { b.plugin_name("images").build() }) @@ -558,7 +660,7 @@ mod tests { .iter() .map(|p| p.name.as_str()) .collect::>(), - vec!["scalars", "images"] + vec!["scalars", "histograms", "images"] .into_iter() .collect::>(), ); @@ -820,6 +922,107 @@ mod tests { assert_eq!(xent_data.value, Vec::::new()); } + #[tokio::test] + async fn test_list_tensors() { + let commit = CommitBuilder::new() + .run("run_with_no_data", None) + .scalars("train", "accuracy", |b| b.build()) + .tensors("train", "weights", |b| b.build()) + .tensors("test", "weights", |mut b| { + b.wall_time_start(1235.0).step_start(0).len(3).build() + }) + .build(); + let handler = sample_handler(commit); + let req = Request::new(data::ListTensorsRequest { + experiment_id: "123".to_string(), + plugin_filter: Some(data::PluginFilter { + plugin_name: "tensors".to_string(), + }), + ..Default::default() + }); + let res = handler.list_tensors(req).await.unwrap().into_inner(); + + assert_eq!(res.runs.len(), 2); + let map = run_tag_map!(res.runs); + + let test_run = &map[&Run("test".to_string())]; + assert_eq!(test_run.len(), 1); + let weights_metadata = &test_run[&Tag("weights".to_string())] + .metadata + .as_ref() + .unwrap(); + assert_eq!(weights_metadata.max_step, 2); + assert_eq!(weights_metadata.max_wall_time, 1237.0); + assert_eq!( + weights_metadata + .summary_metadata + .as_ref() + .unwrap() + .plugin_data + .as_ref() + .unwrap() + .plugin_name, + "tensors".to_string() + ); + } + + #[tokio::test] + async fn test_read_tensors() { + fn make_string_tensor_proto(value: impl Into) -> pb::TensorProto { + pb::TensorProto { + dtype: pb::DataType::DtString.into(), + tensor_shape: None, // Scalar shape + string_val: vec![value.into()], + ..Default::default() + } + }; + let commit = CommitBuilder::new() + .tensors("train", "status", |mut b| { + b.plugin_name("text") + .len(3) + .wall_time_start(1235.0) + .step_start(0) + .eval(|Step(i)| make_string_tensor_proto(format!("Step {}", i))) + .build() + }) + .tensors("test", "weights", |b| b.build()) + .build(); + let handler = sample_handler(commit); + let req = Request::new(data::ReadTensorsRequest { + experiment_id: "123".to_string(), + plugin_filter: Some(data::PluginFilter { + plugin_name: "text".to_string(), + }), + run_tag_filter: Some(data::RunTagFilter { + runs: Some(data::RunFilter { + names: vec!["train".to_string(), "nonexistent".to_string()], + }), + tags: None, + }), + downsample: Some(data::Downsample { num_points: 1000 }), + ..Default::default() + }); + + let res = handler.read_tensors(req).await.unwrap().into_inner(); + + assert_eq!(res.runs.len(), 1); + let map = run_tag_map!(res.runs); + + let train_run = &map[&Run("train".to_string())]; + assert_eq!(train_run.len(), 1); + let status_data = &train_run[&Tag("status".to_string())].data.as_ref().unwrap(); + assert_eq!(status_data.step, vec![0, 1, 2]); + assert_eq!(status_data.wall_time, vec![1235.0, 1236.0, 1237.0]); + assert_eq!( + status_data.value, + vec![ + make_string_tensor_proto("Step 0"), + make_string_tensor_proto("Step 1"), + make_string_tensor_proto("Step 2"), + ] + ); + } + #[tokio::test] async fn test_blob_sequences() { let commit = CommitBuilder::new()