From 0c2e15cd3e01a2f5712d78557854d116b605a508 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 13 Jan 2021 11:36:20 -0800 Subject: [PATCH 1/3] rust: load graphs into memory as blob sequences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: The `Commit` structure now has a `blob_sequences` store in addition to its `scalars` store. For now, we only actually load run graphs into memory (since it’s trivial, and everything else is merely easy). Test Plan: Unit tests included. There are no user-facing changes to the RPC layer. Loading an MNIST dataset with scalars, graphs, and images no longer prints warnings about `graph_def`s, and still serves scalars properly. wchargin-branch: rust-load-graphs wchargin-source: a16be4c845d05f74034e5a038439c210d8b3e937 --- tensorboard/data/server/commit.rs | 9 ++++ tensorboard/data/server/data_compat.rs | 28 ++++++++++- tensorboard/data/server/run.rs | 66 +++++++++++++++++++------- tensorboard/data/server/writer.rs | 35 ++++++++++++++ 4 files changed, 119 insertions(+), 19 deletions(-) diff --git a/tensorboard/data/server/commit.rs b/tensorboard/data/server/commit.rs index fb8574fed9..24587c68d9 100644 --- a/tensorboard/data/server/commit.rs +++ b/tensorboard/data/server/commit.rs @@ -56,6 +56,9 @@ pub struct RunData { /// Scalar time series for this run. pub scalars: TagStore, + + /// Blob sequence time series for this run. + pub blob_sequences: TagStore, } pub type TagStore = HashMap>; @@ -105,6 +108,12 @@ pub struct DataLoss; #[derive(Debug, Copy, Clone, PartialEq)] pub struct ScalarValue(pub f32); +/// The value of a blob sequence time series at a single point. +/// +/// This value is a sequence of zero or more blobs, stored in memory. +#[derive(Debug, Clone, PartialEq)] +pub struct BlobSequenceValue(pub Vec>); + #[cfg(test)] mod tests { use super::*; diff --git a/tensorboard/data/server/data_compat.rs b/tensorboard/data/server/data_compat.rs index 4adb93a4fc..f670711d60 100644 --- a/tensorboard/data/server/data_compat.rs +++ b/tensorboard/data/server/data_compat.rs @@ -18,7 +18,7 @@ limitations under the License. use std::convert::TryInto; use std::fmt::Debug; -use crate::commit::{DataLoss, ScalarValue}; +use crate::commit::{BlobSequenceValue, DataLoss, ScalarValue}; use crate::proto::tensorboard as pb; use pb::summary_metadata::PluginData; @@ -67,6 +67,16 @@ impl EventValue { _ => Err(DataLoss), } } + + /// Consumes this event value and enriches it into a blob sequence. + /// + /// For now, this succeeds only for graphs. + pub fn into_blob_sequence(self) -> Result { + match self { + EventValue::Summary(_) => Err(DataLoss), + EventValue::GraphDef(GraphDefValue(blob)) => Ok(BlobSequenceValue(vec![blob])), + } + } } fn tensor_proto_to_scalar(tp: &pb::TensorProto) -> Option { @@ -118,6 +128,11 @@ pub struct GraphDefValue(pub Vec); pub struct SummaryValue(pub Box); impl GraphDefValue { + /// Tag name used for run-level graphs. + /// + /// This must match `tensorboard.plugins.graph.metadata.RUN_GRAPH_NAME`. + pub const TAG_NAME: &'static str = "__run_graph__"; + /// Determines the metadata for a time series whose first event is a /// [`GraphDef`][`EventValue::GraphDef`]. pub fn initial_metadata() -> Box { @@ -450,11 +465,20 @@ mod tests { use super::*; #[test] - fn test() { + fn test_metadata() { let md = GraphDefValue::initial_metadata(); assert_eq!(&md.plugin_data.unwrap().plugin_name, GRAPHS_PLUGIN_NAME); assert_eq!(md.data_class, pb::DataClass::BlobSequence.into()); } + + #[test] + fn test_enrich_graph_def() { + let v = EventValue::GraphDef(GraphDefValue(vec![1, 2, 3, 4])); + assert_eq!( + v.into_blob_sequence(), + Ok(BlobSequenceValue(vec![vec![1, 2, 3, 4]])) + ); + } } mod unknown { diff --git a/tensorboard/data/server/run.rs b/tensorboard/data/server/run.rs index 770c96fec8..9491c95950 100644 --- a/tensorboard/data/server/run.rs +++ b/tensorboard/data/server/run.rs @@ -23,7 +23,7 @@ use std::path::PathBuf; use std::sync::RwLock; use crate::commit; -use crate::data_compat::{EventValue, SummaryValue}; +use crate::data_compat::{EventValue, GraphDefValue, SummaryValue}; use crate::event_file::EventFileReader; use crate::proto::tensorboard as pb; use crate::reservoir::StageReservoir; @@ -117,15 +117,7 @@ impl StageTimeSeries { ); } DataClass::BlobSequence => { - warn!( - "Blob sequence time series not yet supported (tag: {}, plugin: {})", - tag.0, - self.metadata - .plugin_data - .as_ref() - .map(|p| p.plugin_name.as_str()) - .unwrap_or("") - ); + self.commit_to(tag, &mut run.blob_sequences, EventValue::into_blob_sequence) } _ => (), }; @@ -270,9 +262,19 @@ fn read_event( *start_time = Some(wall_time); } match e.what { - Some(pb::event::What::GraphDef(_)) => { - // TODO(@wchargin): Handle run graphs. - warn!("`graph_def` events not yet handled"); + Some(pb::event::What::GraphDef(graph_bytes)) => { + let sv = StageValue { + wall_time, + payload: EventValue::GraphDef(GraphDefValue(graph_bytes)), + }; + use std::collections::hash_map::Entry; + let ts = match time_series.entry(Tag(GraphDefValue::TAG_NAME.to_string())) { + Entry::Occupied(o) => o.into_mut(), + Entry::Vacant(v) => { + v.insert(StageTimeSeries::new(GraphDefValue::initial_metadata())) + } + }; + ts.rsv.offer(step, sv); } Some(pb::event::What::Summary(sum)) => { for mut summary_pb_value in sum.value { @@ -338,6 +340,11 @@ mod test { // Write some data points across both files. let run = Run("train".to_string()); let tag = Tag("accuracy".to_string()); + f1.write_graph( + Step(0), + WallTime::new(1235.0).unwrap(), + b"".to_vec(), + )?; f1.write_scalar(&tag, Step(0), WallTime::new(1235.0).unwrap(), 0.25)?; f1.write_scalar(&tag, Step(1), WallTime::new(1236.0).unwrap(), 0.50)?; f1.write_scalar(&tag, Step(2), WallTime::new(1237.0).unwrap(), 0.75)?; @@ -371,9 +378,9 @@ mod test { .expect("read-locking run data map"); assert_eq!(run_data.scalars.keys().collect::>(), vec![&tag]); - let ts = run_data.scalars.get(&tag).unwrap(); + let scalar_ts = run_data.scalars.get(&tag).unwrap(); assert_eq!( - *ts.metadata, + *scalar_ts.metadata, pb::SummaryMetadata { plugin_data: Some(pb::summary_metadata::PluginData { plugin_name: crate::data_compat::SCALARS_PLUGIN_NAME.to_string(), @@ -383,11 +390,10 @@ mod test { ..Default::default() } ); - // Points should be as expected (no downsampling at these sizes). let scalar = commit::ScalarValue; assert_eq!( - ts.valid_values().collect::>(), + scalar_ts.valid_values().collect::>(), vec![ (Step(0), WallTime::new(1235.0).unwrap(), &scalar(0.25)), (Step(1), WallTime::new(1236.0).unwrap(), &scalar(0.50)), @@ -397,6 +403,32 @@ mod test { ] ); + let run_graph_tag = Tag(GraphDefValue::TAG_NAME.to_string()); + assert_eq!( + run_data.blob_sequences.keys().collect::>(), + vec![&run_graph_tag] + ); + let graph_ts = run_data.blob_sequences.get(&run_graph_tag).unwrap(); + assert_eq!( + *graph_ts.metadata, + pb::SummaryMetadata { + plugin_data: Some(pb::summary_metadata::PluginData { + plugin_name: crate::data_compat::GRAPHS_PLUGIN_NAME.to_string(), + ..Default::default() + }), + data_class: pb::DataClass::BlobSequence.into(), + ..Default::default() + } + ); + assert_eq!( + graph_ts.valid_values().collect::>(), + vec![( + Step(0), + WallTime::new(1235.0).unwrap(), + &commit::BlobSequenceValue(vec![b"".to_vec()]) + )] + ); + Ok(()) } } diff --git a/tensorboard/data/server/writer.rs b/tensorboard/data/server/writer.rs index e0cb303b1d..3f2c5a3e89 100644 --- a/tensorboard/data/server/writer.rs +++ b/tensorboard/data/server/writer.rs @@ -54,6 +54,17 @@ pub trait SummaryWriteExt: Write { }; self.write_event(&event) } + + /// Writes a TFRecord containing a TF 1.x `graph_def` event. + fn write_graph(&mut self, step: Step, wt: WallTime, bytes: Vec) -> std::io::Result<()> { + let event = pb::Event { + step: step.0, + wall_time: wt.into(), + what: Some(pb::event::What::GraphDef(bytes)), + ..Default::default() + }; + self.write_event(&event) + } } impl SummaryWriteExt for W {} @@ -123,4 +134,28 @@ mod tests { }; assert_eq!(event, &expected); } + + #[test] + fn test_graph_roundtrip() { + let mut cursor = Cursor::new(Vec::::new()); + cursor + .write_graph( + Step(777), + WallTime::new(1234.5).unwrap(), + b"my graph".to_vec(), + ) + .unwrap(); + cursor.set_position(0); + let events = read_all_events(cursor).unwrap(); + assert_eq!(events.len(), 1); + + let event = &events[0]; + let expected = pb::Event { + step: 777, + wall_time: 1234.5, + what: Some(pb::event::What::GraphDef(b"my graph".to_vec())), + ..Default::default() + }; + assert_eq!(event, &expected); + } } From bb365a4ebce291220931b2f16e5a671d9d54c68f Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 13 Jan 2021 11:55:03 -0800 Subject: [PATCH 2/3] rust: implement `ListPlugins` for blobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: The existing `ListPlugins` RPC now explores blob sequence time series for plugin names, in addition to scalar time series. This patch also expands the `commit::test_data` builders to support blob sequences. Test Plan: Unit tests updated. As an end-to-end test, pointing TensorBoard at a directory with scalars and graphs now causes the graphs dashboard to appear active in addition to the scalars dashboard. The dashboard doesn’t yet work, since `ListBlobSequences`, et al. aren’t implemented. wchargin-branch: rust-listplugins-blobs wchargin-source: fbab637da1881c51f9b5559abb97141b58fe0387 --- tensorboard/data/server/commit.rs | 123 ++++++++++++++++++++++++++++++ tensorboard/data/server/server.rs | 19 +++-- 2 files changed, 137 insertions(+), 5 deletions(-) diff --git a/tensorboard/data/server/commit.rs b/tensorboard/data/server/commit.rs index 24587c68d9..9a84ffaca1 100644 --- a/tensorboard/data/server/commit.rs +++ b/tensorboard/data/server/commit.rs @@ -211,6 +211,45 @@ pub mod test_data { self } + /// Adds a blob sequence time series, creating the run if it doesn't exist, and setting its + /// start time if unset. + /// + /// # Examples + /// + /// ``` + /// use rustboard_core::commit::{test_data::CommitBuilder, BlobSequenceValue, Commit}; + /// + /// let my_commit: Commit = CommitBuilder::new() + /// .blob_sequences("train", "input_image", |mut b| { + /// b.plugin_name("images") + /// .values(vec![ + /// BlobSequenceValue(vec![b"step0img0".to_vec()]), + /// BlobSequenceValue(vec![b"step1img0".to_vec(), b"step1img1".to_vec()]), + /// ]) + /// .build() + /// }) + /// .build(); + /// ``` + pub fn blob_sequences( + self, + run: &str, + tag: &str, + build: impl FnOnce(BlobSequenceTimeSeriesBuilder) -> TimeSeries, + ) -> Self { + self.with_run_data(Run(run.to_string()), |run_data| { + let time_series = build(BlobSequenceTimeSeriesBuilder::default()); + if let (None, Some((_step, wall_time, _value))) = + (run_data.start_time, time_series.valid_values().next()) + { + run_data.start_time = Some(wall_time); + } + run_data + .blob_sequences + .insert(Tag(tag.to_string()), time_series); + }); + self + } + /// Ensures that a run is present and sets its start time. /// /// If you don't care about the start time and the run is going to have data, anyway, you @@ -307,4 +346,88 @@ pub mod test_data { time_series } } + + pub struct BlobSequenceTimeSeriesBuilder { + /// Initial step. Increments by `1` for each point. + step_start: Step, + /// Initial wall time. Increments by `1.0` for each point. + wall_time_start: WallTime, + /// Raw data for blob sequences in this time series. Defaults to + /// `vec![BlobSequenceValue(vec![])]`: i.e., one blob sequence, with one blob, which is + /// empty. + values: Vec, + /// Custom summary metadata. Leave `None` to use default. + metadata: Option>, + } + + impl Default for BlobSequenceTimeSeriesBuilder { + fn default() -> Self { + BlobSequenceTimeSeriesBuilder { + step_start: Step(0), + wall_time_start: WallTime::new(0.0).unwrap(), + values: vec![BlobSequenceValue(vec![])], + metadata: None, + } + } + } + + /// Creates a summary metadata value with plugin name and data class, but no other contents. + fn blank(plugin_name: &str, data_class: pb::DataClass) -> Box { + Box::new(pb::SummaryMetadata { + plugin_data: Some(pb::summary_metadata::PluginData { + plugin_name: plugin_name.to_string(), + ..Default::default() + }), + data_class: data_class.into(), + ..Default::default() + }) + } + + impl BlobSequenceTimeSeriesBuilder { + pub fn step_start(&mut self, raw_step: i64) -> &mut Self { + self.step_start = Step(raw_step); + self + } + pub fn wall_time_start(&mut self, raw_wall_time: f64) -> &mut Self { + self.wall_time_start = WallTime::new(raw_wall_time).unwrap(); + self + } + pub fn values(&mut self, values: Vec) -> &mut Self { + self.values = values; + self + } + pub fn metadata(&mut self, metadata: Option>) -> &mut Self { + self.metadata = metadata; + self + } + /// Sets the metadata to a blank, blob-sequence-class metadata value with the given plugin + /// name. Overwrites any existing call to [`metadata`]. + pub fn plugin_name(&mut self, plugin_name: &str) -> &mut Self { + self.metadata(Some(blank(plugin_name, pb::DataClass::BlobSequence))) + } + + /// Constructs a scalar time series from the state of this builder. + /// + /// # Panics + /// + /// If the wall time of a point would overflow to be infinite. + pub fn build(&self) -> TimeSeries { + let metadata = self + .metadata + .clone() + .unwrap_or_else(|| blank("blobs", pb::DataClass::BlobSequence)); + let mut time_series = TimeSeries::new(metadata); + + let mut rsv = StageReservoir::new(self.values.len()); + for (i, value) in self.values.iter().enumerate() { + let step = Step(self.step_start.0 + i as i64); + let wall_time = + WallTime::new(f64::from(self.wall_time_start) + (i as f64)).unwrap(); + rsv.offer(step, (wall_time, Ok(value.clone()))); + } + rsv.commit(&mut time_series.basin); + + time_series + } + } } diff --git a/tensorboard/data/server/server.rs b/tensorboard/data/server/server.rs index baca572634..aa7c19a2a1 100644 --- a/tensorboard/data/server/server.rs +++ b/tensorboard/data/server/server.rs @@ -25,7 +25,6 @@ use tonic::{Request, Response, Status}; use crate::commit::{self, Commit}; use crate::downsample; -use crate::proto::tensorboard as pb; use crate::proto::tensorboard::data; use crate::types::{Run, Tag, WallTime}; use data::tensor_board_data_provider_server::TensorBoardDataProvider; @@ -59,8 +58,9 @@ impl TensorBoardDataProvider for DataProviderHandler { let data = data .read() .map_err(|_| Status::internal(format!("failed to read run data for {:?}", run)))?; - for time_series in data.scalars.values() { - let metadata: &pb::SummaryMetadata = time_series.metadata.as_ref(); + for metadata in (data.scalars.values().map(|ts| ts.metadata.as_ref())) + .chain(data.blob_sequences.values().map(|ts| ts.metadata.as_ref())) + { let plugin_name = match &metadata.plugin_data { Some(d) => d.plugin_name.clone(), None => String::new(), @@ -348,6 +348,7 @@ mod tests { use tonic::Code; use crate::commit::test_data::CommitBuilder; + use crate::proto::tensorboard as pb; use crate::types::{Run, Step, Tag}; fn sample_handler(commit: Commit) -> DataProviderHandler { @@ -361,6 +362,9 @@ mod tests { async fn test_list_plugins() { let commit = CommitBuilder::new() .scalars("train", "xent", |b| b.build()) + .blob_sequences("train", "input_image", |mut b| { + b.plugin_name("images").build() + }) .build(); let handler = sample_handler(commit); let req = Request::new(data::ListPluginsRequest { @@ -368,8 +372,13 @@ mod tests { }); let res = handler.list_plugins(req).await.unwrap().into_inner(); assert_eq!( - res.plugins.into_iter().map(|p| p.name).collect::>(), - vec!["scalars"] + res.plugins + .iter() + .map(|p| p.name.as_str()) + .collect::>(), + vec!["scalars", "images"] + .into_iter() + .collect::>(), ); } From d2f8ef4e7c8a7eb1e79ca06dff1678b14c7b2c61 Mon Sep 17 00:00:00 2001 From: William Chargin Date: Wed, 13 Jan 2021 12:58:28 -0800 Subject: [PATCH 3/3] [rust-listplugins-blobs: update patch] wchargin-branch: rust-listplugins-blobs wchargin-source: 366fe5ecc221b2101e0190146b1d9e4e33de305e --- tensorboard/data/server/commit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorboard/data/server/commit.rs b/tensorboard/data/server/commit.rs index 9a84ffaca1..aa088dd069 100644 --- a/tensorboard/data/server/commit.rs +++ b/tensorboard/data/server/commit.rs @@ -401,7 +401,7 @@ pub mod test_data { self } /// Sets the metadata to a blank, blob-sequence-class metadata value with the given plugin - /// name. Overwrites any existing call to [`metadata`]. + /// name. Overwrites any existing call to [`metadata`][Self::metadata]. pub fn plugin_name(&mut self, plugin_name: &str) -> &mut Self { self.metadata(Some(blank(plugin_name, pb::DataClass::BlobSequence))) }