Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tensorboard/data/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ rust_library(
"lib.rs",
"blob_key.rs",
"cli.rs",
"cli/dynamic_logdir.rs",
"commit.rs",
"data_compat.rs",
"disk_logdir.rs",
"downsample.rs",
"event_file.rs",
"gcs.rs",
"gcs/client.rs",
"gcs/logdir.rs",
"logdir.rs",
"masked_crc.rs",
"reservoir.rs",
Expand Down
9 changes: 3 additions & 6 deletions tensorboard/data/server/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use log::info;
use std::path::PathBuf;
use std::time::Instant;

use rustboard_core::cli::dynamic_logdir::DynLogdir;
use rustboard_core::commit::Commit;
use rustboard_core::disk_logdir::DiskLogdir;
use rustboard_core::logdir::LogdirLoader;

#[derive(Clap)]
Expand All @@ -45,11 +45,8 @@ fn main() {
init_logging(&opts);

let commit = Commit::new();
let mut loader = LogdirLoader::new(
&commit,
DiskLogdir::new(opts.logdir),
opts.reload_threads.unwrap_or(0),
);
let logdir = DynLogdir::new(opts.logdir).expect("DynLogdir::new");
let mut loader = LogdirLoader::new(&commit, logdir, opts.reload_threads.unwrap_or(0));
loader.checksum(opts.checksum); // if neither `--[no-]checksum` given, defaults to false

info!("Starting load cycle");
Expand Down
37 changes: 23 additions & 14 deletions tensorboard/data/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;

use crate::commit::Commit;
use crate::disk_logdir::DiskLogdir;
use crate::logdir::LogdirLoader;
use crate::proto::tensorboard::data;
use crate::server::DataProviderHandler;

use data::tensor_board_data_provider_server::TensorBoardDataProviderServer;

pub mod dynamic_logdir;
use dynamic_logdir::DynLogdir;

#[derive(Clap, Debug)]
#[clap(name = "rustboard", version = crate::VERSION)]
struct Opts {
Expand Down Expand Up @@ -177,19 +179,26 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
.name("Reloader".to_string())
.spawn({
let reload_strategy = opts.reload;
let mut loader = LogdirLoader::new(commit, DiskLogdir::new(opts.logdir), 0);
// Checksum only if `--checksum` given (i.e., off by default).
loader.checksum(opts.checksum);
move || loop {
info!("Starting load cycle");
let start = Instant::now();
loader.reload();
let end = Instant::now();
info!("Finished load cycle ({:?})", end - start);
match reload_strategy {
ReloadStrategy::Loop { delay } => thread::sleep(delay),
ReloadStrategy::Once => break,
};
let raw_logdir = opts.logdir;
let checksum = opts.checksum;
move || {
// Create the logdir in the child thread, where no async runtime is active (see
// docs for `DynLogdir::new`).
let logdir = DynLogdir::new(raw_logdir).unwrap_or_else(|| std::process::exit(1));
let mut loader = LogdirLoader::new(commit, logdir, 0);
// Checksum only if `--checksum` given (i.e., off by default).
loader.checksum(checksum);
loop {
info!("Starting load cycle");
let start = Instant::now();
loader.reload();
let end = Instant::now();
info!("Finished load cycle ({:?})", end - start);
match reload_strategy {
ReloadStrategy::Loop { delay } => thread::sleep(delay),
ReloadStrategy::Once => break,
};
}
}
})
.expect("failed to spawn reloader thread");
Expand Down
98 changes: 98 additions & 0 deletions tensorboard/data/server/cli/dynamic_logdir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! Log directory as specified by user arguments.

use log::error;
use std::collections::HashMap;
use std::io::{self, Read};
use std::path::PathBuf;

use crate::disk_logdir::DiskLogdir;
use crate::gcs;
use crate::logdir::{EventFileBuf, Logdir};
use crate::types::Run;

/// A logdir dynamically dispatched over supported implementations.
pub enum DynLogdir {
Disk(DiskLogdir),
Gcs(gcs::Logdir),
}
Comment on lines +28 to +32
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfelt, re: discussion: you can see from the structure of these types
and implementations how they could be easily macroed into existence—all
except DynLogdir::new, of course. It’s basically “the same thing with
more $ signs”:
https://gist.github.com/wchargin/f44c370bebb61210dbeb11037f0b2d99

I think that I have a weak preference for the explicit implementation,
but the macro’s actually not that bad. It does make new cases a one-line
change. Open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit seems clearer to me given especially that we only have 2 implementations now.

What I had in mind when talking about "it would be nice to have a more automatic way to do this" was really more about whether there was something that would generalize not just to additional implementations but also cover the full set of trait methods as well (i.e. the dispatching logic for each method would be autogenerated without even having to include the macro template for each method). So it'd be sort of a compromise position between enums and trait objects, where the downside is that you need both a closed set of types known to the compiler and the trait must be object safe, but the upside is that you have no vtable overhead and you don't have to hand-roll the dispatching code for each trait method.

I'm not even sure that would work for this case where we have the associated type as a complicating factor, but it seems like the sort of thing that would in principle be possible to autogenerate in the basic case.

But this is just hypothesizing; no changes required or expected.


/// A file from any one of [`DynLogdir`]'s underlying implementations.
pub enum DynFile {
Disk(<DiskLogdir as Logdir>::File),
Gcs(<gcs::Logdir as Logdir>::File),
}

impl DynLogdir {
/// Parses a `DynLogdir` from a user-supplied path.
///
/// This succeeds unless the path represents a GCS logdir and no HTTP client can be opened. In
/// case of failure, errors will be logged to the active logger.
///
/// # Panics
///
/// May panic in debug mode if called from a thread with an active Tokio runtime; see
/// [seanmonstar/reqwest#1017].
///
/// [seanmonstar/reqwest#1017]: https://github.com/seanmonstar/reqwest/issues/1017
pub fn new(path: PathBuf) -> Option<Self> {
let path_str = path.to_string_lossy();
let gcs_path = match path_str.strip_prefix("gs://") {
// Assume that anything not starting with `gs://` is a path on disk.
None => return Some(DynLogdir::Disk(DiskLogdir::new(path))),
Some(p) => p,
};
let mut parts = gcs_path.splitn(2, '/');
let bucket = parts.next().unwrap().to_string(); // splitn always yields at least one element
let prefix = parts.next().unwrap_or("").to_string();
let client = match gcs::Client::new() {
Err(e) => {
error!("Could not open GCS connection: {}", e);
return None;
}
Ok(c) => c,
};
Some(DynLogdir::Gcs(gcs::Logdir::new(client, bucket, prefix)))
}
}

impl crate::logdir::Logdir for DynLogdir {
type File = DynFile;

fn discover(&self) -> io::Result<HashMap<Run, Vec<EventFileBuf>>> {
match self {
Self::Disk(x) => x.discover(),
Self::Gcs(x) => x.discover(),
}
}

fn open(&self, path: &EventFileBuf) -> io::Result<Self::File> {
match self {
Self::Disk(x) => x.open(path).map(DynFile::Disk),
Self::Gcs(x) => x.open(path).map(DynFile::Gcs),
}
}
}

impl Read for DynFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::Disk(x) => x.read(buf),
Self::Gcs(x) => x.read(buf),
}
}
}
2 changes: 2 additions & 0 deletions tensorboard/data/server/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ limitations under the License.
//! Google Cloud Storage interop.

mod client;
mod logdir;

pub use client::Client;
pub use logdir::Logdir;
4 changes: 4 additions & 0 deletions tensorboard/data/server/gcs/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const STORAGE_BASE: &str = "https://storage.googleapis.com";
const API_BASE: &str = "https://www.googleapis.com/storage/v1";

/// GCS client.
///
/// Cloning a GCS client is cheap and shares the underlying connection pool, as with a
/// [`reqwest::Client`].
#[derive(Clone)]
pub struct Client {
http: HttpClient,
}
Expand Down
145 changes: 145 additions & 0 deletions tensorboard/data/server/gcs/logdir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/* Copyright 2021 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! Adapter from GCS to TensorBoard logdirs.

use log::warn;
use reqwest::StatusCode;
use std::collections::HashMap;
use std::io::{self, BufReader, Read};
use std::path::{Path, PathBuf};

use super::Client;
use crate::logdir::{EventFileBuf, EVENT_FILE_BASENAME_INFIX};
use crate::types::Run;

/// A reference to a GCS object with a read offset.
pub struct File {
gcs: Client,
bucket: String,
object: String,
pos: u64,
}

impl File {
fn new(gcs: Client, bucket: String, object: String) -> Self {
Self {
gcs,
bucket,
object,
pos: 0,
}
}
}

fn reqwest_to_io_error(e: reqwest::Error) -> io::Error {
let kind = match e.status() {
Some(StatusCode::NOT_FOUND) => io::ErrorKind::NotFound,
Some(StatusCode::FORBIDDEN) => io::ErrorKind::PermissionDenied,
Some(StatusCode::UNAUTHORIZED) => io::ErrorKind::PermissionDenied,
Some(StatusCode::REQUEST_TIMEOUT) => io::ErrorKind::TimedOut,
_ if e.is_timeout() => io::ErrorKind::TimedOut,
_ if e.is_decode() => io::ErrorKind::InvalidData,
_ => io::ErrorKind::Other,
};
io::Error::new(kind, e)
}

impl Read for File {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let range = self.pos..=self.pos + (buf.len() as u64 - 1);
let result = self
.gcs
.read(&self.bucket, &self.object, range)
.map_err(reqwest_to_io_error)?;
(&mut buf[0..result.len()]).copy_from_slice(&result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not be reading this right but it looks like we have 2 copies here - one inside gcs::Client where we do body.to_vec() to go from Bytes to Vec<u8> and then another one here where we go from Vec<u8> to our final [u8].

I doubt it's really a big deal performance-wise but if that is indeed happening, it seems slightly nicer if we could do only one copy? E.g. if we were to have our client directly return Bytes rather than Vec<u8> it seems like we could directly call copy_from_slice(Bytes) since Bytes is Borrow<[u8]>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! I was planning to do this as an optimization in a follow-up,
since using Bytes directly requires adding a dep on the bytes crate.
But it’s well spotted, and a perfectly appropriate use of Bytes.

I don’t see an easy way to zero-copy the bytes all the way from the HTTP
response into the io::Read output: e.g., neither reqwest nor hyper
seems to expose a response.bytes_into(&mut buf).

There is a similar large copy in the ReadBlob RPC handler, which in
previous drafts I had replaced with an Arc<[u8]>, but Bytes might be
better there, too. (After all, Bytes is “Arc<[u8]> on steroids”.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, fine by me to do it in a follow-up.

I may be misunderstanding but isn't it impossible to do a zero-copy transfer of HTTP response bytes into an &mut [u8] which is what io::Read takes? Since the signature requires that we stuff the bytes into a mutable buffer we don't control? Unless what you're saying is that we can actually change the underlying buffer pointed to by buf to literally just point to the one occupied by Bytes, but my understanding was that we can only mutate the contents of buf, not repoint it entirely.

Copy link
Contributor Author

@wchargin wchargin Feb 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that I don’t see a way to do it with the reqwest/et al. APIs,
but you could imagine that it could be possible if each of the layers of
the stack supported it. Something like:

// mod reqwest::blocking::response
impl Response {
    /// Streams some bytes from the response body directly into the
    /// given buffer. Returns the number of bytes written.
    fn bytes_into<T: BufMut>(&self, x: &mut T) -> crate::Result<usize>;
}

// mod gcs::client
impl Client {
    fn read(&self, bucket, object, buf: impl BufMut) -> self::Result<usize> {
        let res = self.http.get(...).send()?;
        res.bytes_into(&mut buf)
    }
}

// mod gcs::logdir
impl Read for File {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let n = self.gcs.read(bucket, obj, buf)?;
        self.pos += n as u64;
        Ok(n)
    }
}

Or, from a lower-level perspective: at the end of the day, we receive
network data by calling recvfrom(2) and passing the kernel a mutable
pointer to a user-space buffer into which to write the received data, so
there’s no fundamental reason that that pointer can’t point into the
buffer given to io::Read::read (at least, as far as I can tell).

but my understanding was that we can only mutate the contents of buf, not repoint it entirely.

Correct, and that is indeed not what I am alluding to.

self.pos += result.len() as u64;
Ok(result.len())
}
}

pub struct Logdir {
gcs: Client,
bucket: String,
/// Invariant: `prefix` either is empty or ends with `/`, and thus an event file name should be
/// joined onto `prefix` to form its full object name.
prefix: String,
}

impl Logdir {
pub fn new(gcs: Client, bucket: String, mut prefix: String) -> Self {
if !prefix.is_empty() && !prefix.ends_with('/') {
prefix.push('/');
}
Self {
gcs,
bucket,
prefix,
}
}
}

/// Read large chunks from GCS to reduce network roundtrips.
const BUFFER_CAPACITY: usize = 1024 * 1024 * 16;

impl crate::logdir::Logdir for Logdir {
type File = BufReader<File>;

fn discover(&self) -> io::Result<HashMap<Run, Vec<EventFileBuf>>> {
let res = self.gcs.list(&self.bucket, &self.prefix);
let objects = res.map_err(reqwest_to_io_error)?;
let mut run_map: HashMap<Run, Vec<EventFileBuf>> = HashMap::new();
for name in objects {
let name = match name.strip_prefix(&self.prefix) {
Some(x) => x,
None => {
warn!(
"Unexpected object name {:?} with putative prefix {:?}",
&name, &self.prefix
);
continue;
}
};
let path = PathBuf::from(name);
let is_event_file = path.file_name().map_or(false, |n| {
n.to_string_lossy().contains(EVENT_FILE_BASENAME_INFIX)
});
if !is_event_file {
continue;
}
let mut run_relpath = path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(PathBuf::new);
if run_relpath == Path::new("") {
run_relpath.push(".");
}
let run = Run(run_relpath.display().to_string());
run_map.entry(run).or_default().push(EventFileBuf(path));
}
Ok(run_map)
}

fn open(&self, path: &EventFileBuf) -> io::Result<Self::File> {
// Paths as returned by `discover` are always valid Unicode.
let mut object = self.prefix.clone();
object.push_str(path.0.to_string_lossy().as_ref());
let file = File::new(self.gcs.clone(), self.bucket.clone(), object);
Ok(BufReader::with_capacity(BUFFER_CAPACITY, file))
}
}