Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions tensorboard/data/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ struct Opts {
///
/// A comma separated list of `plugin_name=num_samples` pairs to explicitly specify how many
/// samples to keep per tag for the specified plugin. For unspecified plugins, series are
/// randomly downsampled to reasonable values to prevent out-of-memory errors in long running
/// jobs. For instance, `--samples_per_plugin=scalars=500,images=0` keeps 500 events in each
/// scalar series and keeps none of the images.
/// randomly downsampled to reasonable values to prevent out-of-memory errors in long-running
/// jobs. Each `num_samples` may be the special token `all` to retain all data without
/// downsampling. For instance, `--samples_per_plugin=scalars=500,images=all,audio=0` keeps 500
/// events in each scalar series, all of the images, and none of the audio.
#[clap(long, default_value = "", setting(clap::ArgSettings::AllowEmptyValues))]
samples_per_plugin: PluginSamplingHint,
}
Expand Down
95 changes: 76 additions & 19 deletions tensorboard/data/server/reservoir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ pub struct StageReservoir<T, C = ChaCha20Rng> {
/// Total capacity of this reservoir.
///
/// The combined physical capacities of `committed_steps` and `staged_items` may exceed this,
/// but their combined lengths will not. Behavior is undefined if `capacity == 0`.
capacity: usize,
/// but their combined lengths will not.
capacity: Capacity,
/// Reservoir control, to determine whether and whither a given new record should be included.
ctl: C,
/// Estimate of the total number of non-preempted records passed in the stream so far,
Expand All @@ -100,6 +100,24 @@ pub struct StageReservoir<T, C = ChaCha20Rng> {
seen: usize,
}

/// Reservoir capacity, determining if and when items should start being evicted.
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum Capacity {
/// The reservoir may have arbitrarily many records.
///
/// An unbounded reservoir still supports preemption, but otherwise behaves like a normal
/// vector.
Unbounded,
/// The reservoir may have at most a fixed number of records.
Bounded(usize),
}

impl From<usize> for Capacity {
fn from(n: usize) -> Self {
Capacity::Bounded(n)
}
}

/// A buffer of records that have been committed and not yet evicted from the reservoir.
///
/// This is a snapshot of the reservoir contents at some point in time that is periodically updated
Expand Down Expand Up @@ -154,20 +172,20 @@ impl<T> StageReservoir<T, ChaCha20Rng> {
/// All reservoirs created by this function will use the same sequence of random numbers.
///
/// This function does not allocate. Reservoir capacity is allocated as records are offered.
pub fn new(capacity: usize) -> Self {
Self::with_control(capacity, ChaCha20Rng::seed_from_u64(0))
pub fn new(capacity: impl Into<Capacity>) -> Self {
Self::with_control(capacity.into(), ChaCha20Rng::seed_from_u64(0))
}
}

impl<T, C: ReservoirControl> StageReservoir<T, C> {
/// Creates a new reservoir with the specified capacity and reservoir control.
///
/// This function does not allocate. Reservoir capacity is allocated as records are offered.
pub fn with_control(capacity: usize, ctl: C) -> Self {
pub fn with_control(capacity: impl Into<Capacity>, ctl: C) -> Self {
Self {
committed_steps: Vec::new(),
staged_items: Vec::new(),
capacity,
capacity: capacity.into(),
ctl,
seen: 0,
}
Expand All @@ -179,24 +197,26 @@ impl<T, C: ReservoirControl> StageReservoir<T, C> {
/// records kept form a simple random sample of the stream (or at least approximately so in the
/// case of preemptions).
pub fn offer(&mut self, step: Step, v: T) {
if self.capacity == 0 {
if self.capacity == Capacity::Bounded(0) {
return;
}
self.preempt(step);
self.seen += 1;

// If we can hold every record that we've seen, we can add this record unconditionally.
// Otherwise, we need to roll a destination---even if there's available space, to avoid
// bias right after a preemption.
if self.seen > self.capacity {
let dst = self.ctl.destination(self.seen);
if dst >= self.capacity {
// Didn't make the cut? Keep-last only.
self.pop();
} else if self.len() >= self.capacity {
// No room? Evict the destination.
// From `if`-guards, we know `dst < self.capacity <= self.len()`, so this is safe.
self.remove(dst);
if let Capacity::Bounded(capacity) = self.capacity {
// If we can hold every record that we've seen, we can add this record unconditionally.
// Otherwise, we need to roll a destination---even if there's available space, to avoid
// bias right after a preemption.
if self.seen > capacity {
let dst = self.ctl.destination(self.seen);
if dst >= capacity {
// Didn't make the cut? Keep-last only.
self.pop();
} else if self.len() >= capacity {
// No room? Evict the destination.
// From `if`-guards, we know `dst < capacity <= self.len()`, so this is safe.
self.remove(dst);
}
}
}
// In any case, add to end.
Expand Down Expand Up @@ -542,6 +562,43 @@ mod tests {
}
}

#[test]
fn test_unbounded() {
let mut rsv = StageReservoir::new(Capacity::Unbounded);
let mut head = Basin::new();

rsv.commit(&mut head);
assert_eq!(head.as_slice(), &[]);

rsv.offer(Step(0), "before");
rsv.offer(Step(1), "before");
rsv.offer(Step(2), "before");
rsv.offer(Step(4), "before");
rsv.commit(&mut head);
assert_eq!(
head.as_slice(),
&[
(Step(0), "before"),
(Step(1), "before"),
(Step(2), "before"),
(Step(4), "before")
]
);

rsv.offer(Step(2), "after");
rsv.offer(Step(5), "after");
rsv.commit(&mut head);
assert_eq!(
head.as_slice(),
&[
(Step(0), "before"),
(Step(1), "before"),
(Step(2), "after"),
(Step(5), "after")
]
);
}

#[test]
fn test_empty() {
let mut rsv = StageReservoir::new(0);
Expand Down
14 changes: 6 additions & 8 deletions tensorboard/data/server/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::data_compat::{EventValue, GraphDefValue, SummaryValue, TaggedRunMetad
use crate::event_file::EventFileReader;
use crate::logdir::{EventFileBuf, Logdir};
use crate::proto::tensorboard as pb;
use crate::reservoir::StageReservoir;
use crate::reservoir::{Capacity, StageReservoir};
use crate::types::{PluginSamplingHint, Run, Step, Tag, WallTime};

/// A loader to accumulate reservoir-sampled events in a single TensorBoard run.
Expand Down Expand Up @@ -119,23 +119,21 @@ impl StageTimeSeries {
fn capacity(
metadata: &pb::SummaryMetadata,
plugin_sampling_hint: Arc<PluginSamplingHint>,
) -> usize {
) -> Capacity {
let data_class =
pb::DataClass::from_i32(metadata.data_class).unwrap_or(pb::DataClass::Unknown);
let mut capacity = match data_class {
let mut capacity = Capacity::Bounded(match data_class {
pb::DataClass::Scalar => 1000,
pb::DataClass::Tensor => 100,
pb::DataClass::BlobSequence => 10,
_ => 0,
};
});

// Override the default capacity using the plugin-specific hint.
if data_class != pb::DataClass::Unknown {
if let Some(ref pd) = metadata.plugin_data {
if let Some(&num_samples) = plugin_sampling_hint.0.get(&pd.plugin_name) {
// TODO(psybuzz): if the hint prescribes 0 samples, the reservoir should ideally
// be unbounded. For now, it simply creates a reservoir with capacity 0.
capacity = num_samples;
if let Some(&hint) = plugin_sampling_hint.0.get(&pd.plugin_name) {
capacity = hint;
}
}
}
Expand Down
31 changes: 22 additions & 9 deletions tensorboard/data/server/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ limitations under the License.

//! Core simple types.

use log::error;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::str::FromStr;

use crate::reservoir::Capacity;

/// A step associated with a record, strictly increasing over time within a record stream.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
pub struct Step(pub i64);
Expand Down Expand Up @@ -102,7 +103,7 @@ pub enum ParsePluginSamplingHintError {

/// A map defining how many samples per plugin to keep.
#[derive(Debug, Default)]
pub struct PluginSamplingHint(pub HashMap<String, usize>);
pub struct PluginSamplingHint(pub HashMap<String, Capacity>);

impl FromStr for PluginSamplingHint {
type Err = ParsePluginSamplingHintError;
Expand All @@ -118,8 +119,12 @@ impl FromStr for PluginSamplingHint {
part: pair_str.to_string(),
});
}
let num_samples = pair[1].parse::<usize>()?;
let plugin_name: String = pair[0].to_string();
let num_samples = if pair[1] == "all" {
Capacity::Unbounded
} else {
Capacity::Bounded(pair[1].parse::<usize>()?)
};
result.insert(plugin_name, num_samples);
}
Ok(PluginSamplingHint(result))
Expand Down Expand Up @@ -176,17 +181,20 @@ mod tests {

#[test]
fn test_plugin_sampling_hint() {
use Capacity::{Bounded, Unbounded};

// Parse from a valid hint with arbitrary plugin names.
let hint1 = "scalars=500,images=0,unknown=10".parse::<PluginSamplingHint>();
let mut expected1: HashMap<String, usize> = HashMap::new();
expected1.insert("scalars".to_string(), 500);
expected1.insert("images".to_string(), 0);
expected1.insert("unknown".to_string(), 10);
let hint1 = "scalars=500,images=0,histograms=all,unknown=10".parse::<PluginSamplingHint>();
let mut expected1: HashMap<String, Capacity> = HashMap::new();
expected1.insert("scalars".to_string(), Bounded(500));
expected1.insert("images".to_string(), Bounded(0));
expected1.insert("histograms".to_string(), Unbounded);
expected1.insert("unknown".to_string(), Bounded(10));
assert_eq!(hint1.unwrap().0, expected1);

// Parse from an empty hint.
let hint2 = "".parse::<PluginSamplingHint>();
let expected2: HashMap<String, usize> = HashMap::new();
let expected2: HashMap<String, Capacity> = HashMap::new();
assert_eq!(hint2.unwrap().0, expected2);

// Parse from an invalid hint.
Expand All @@ -195,6 +203,11 @@ mod tests {
other => panic!("expected ParseIntError, got {:?}", other),
};

match "x=wat".parse::<PluginSamplingHint>().unwrap_err() {
ParsePluginSamplingHintError::ParseIntError(_) => (),
other => panic!("expected ParseIntError, got {:?}", other),
};

match "=1".parse::<PluginSamplingHint>().unwrap_err() {
ParsePluginSamplingHintError::SyntaxError { part: _ } => (),
other => panic!("expected SyntaxError, got {:?}", other),
Expand Down
3 changes: 2 additions & 1 deletion tensorboard/data/server_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def start(self):
reload = str(int(self._reload_interval))

sample_hint_pairs = [
"%s=%s" % (k, v) for k, v in self._samples_per_plugin.items()
"%s=%s" % (k, "all" if v == 0 else v)
for k, v in self._samples_per_plugin.items()
]
samples_per_plugin = ",".join(sample_hint_pairs)

Expand Down
5 changes: 5 additions & 0 deletions tensorboard/data/server_ingester_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def target():
logdir=logdir,
reload_interval=5,
channel_creds_type=grpc_util.ChannelCredsType.LOCAL,
samples_per_plugin={
"scalars": 500,
"images": 0,
},
)
ingester.start()
self.assertIsInstance(
Expand All @@ -99,6 +103,7 @@ def target():
"--port=0",
"--port-file=%s" % port_file,
"--die-after-stdin",
"--samples-per-plugin=scalars=500,images=all",
"--verbose", # logging is enabled in tests
]
popen.assert_called_once_with(expected_args, stdin=subprocess.PIPE)
Expand Down