diff --git a/tensorboard/data/server/BUILD b/tensorboard/data/server/BUILD index a08c161e2b..03ecd17195 100644 --- a/tensorboard/data/server/BUILD +++ b/tensorboard/data/server/BUILD @@ -28,6 +28,7 @@ rust_library( "lib.rs", "blob_key.rs", "cli.rs", + "cli/dynamic_logdir.rs", "commit.rs", "data_compat.rs", "disk_logdir.rs", @@ -35,6 +36,7 @@ rust_library( "event_file.rs", "gcs.rs", "gcs/client.rs", + "gcs/logdir.rs", "logdir.rs", "masked_crc.rs", "reservoir.rs", diff --git a/tensorboard/data/server/bench.rs b/tensorboard/data/server/bench.rs index 7ad7ba57a4..b485a16986 100644 --- a/tensorboard/data/server/bench.rs +++ b/tensorboard/data/server/bench.rs @@ -20,8 +20,8 @@ use log::info; use std::path::PathBuf; use std::time::Instant; +use rustboard_core::cli::dynamic_logdir::DynLogdir; use rustboard_core::commit::Commit; -use rustboard_core::disk_logdir::DiskLogdir; use rustboard_core::logdir::LogdirLoader; #[derive(Clap)] @@ -45,11 +45,8 @@ fn main() { init_logging(&opts); let commit = Commit::new(); - let mut loader = LogdirLoader::new( - &commit, - DiskLogdir::new(opts.logdir), - opts.reload_threads.unwrap_or(0), - ); + let logdir = DynLogdir::new(opts.logdir).expect("DynLogdir::new"); + let mut loader = LogdirLoader::new(&commit, logdir, opts.reload_threads.unwrap_or(0)); loader.checksum(opts.checksum); // if neither `--[no-]checksum` given, defaults to false info!("Starting load cycle"); diff --git a/tensorboard/data/server/cli.rs b/tensorboard/data/server/cli.rs index 58870cb09c..b78c14e41e 100644 --- a/tensorboard/data/server/cli.rs +++ b/tensorboard/data/server/cli.rs @@ -29,13 +29,15 @@ use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Server; use crate::commit::Commit; -use crate::disk_logdir::DiskLogdir; use crate::logdir::LogdirLoader; use crate::proto::tensorboard::data; use crate::server::DataProviderHandler; use data::tensor_board_data_provider_server::TensorBoardDataProviderServer; +pub mod dynamic_logdir; +use dynamic_logdir::DynLogdir; + #[derive(Clap, Debug)] #[clap(name = "rustboard", version = crate::VERSION)] struct Opts { @@ -177,19 +179,26 @@ pub async fn main() -> Result<(), Box> { .name("Reloader".to_string()) .spawn({ let reload_strategy = opts.reload; - let mut loader = LogdirLoader::new(commit, DiskLogdir::new(opts.logdir), 0); - // Checksum only if `--checksum` given (i.e., off by default). - loader.checksum(opts.checksum); - move || loop { - info!("Starting load cycle"); - let start = Instant::now(); - loader.reload(); - let end = Instant::now(); - info!("Finished load cycle ({:?})", end - start); - match reload_strategy { - ReloadStrategy::Loop { delay } => thread::sleep(delay), - ReloadStrategy::Once => break, - }; + let raw_logdir = opts.logdir; + let checksum = opts.checksum; + move || { + // Create the logdir in the child thread, where no async runtime is active (see + // docs for `DynLogdir::new`). + let logdir = DynLogdir::new(raw_logdir).unwrap_or_else(|| std::process::exit(1)); + let mut loader = LogdirLoader::new(commit, logdir, 0); + // Checksum only if `--checksum` given (i.e., off by default). + loader.checksum(checksum); + loop { + info!("Starting load cycle"); + let start = Instant::now(); + loader.reload(); + let end = Instant::now(); + info!("Finished load cycle ({:?})", end - start); + match reload_strategy { + ReloadStrategy::Loop { delay } => thread::sleep(delay), + ReloadStrategy::Once => break, + }; + } } }) .expect("failed to spawn reloader thread"); diff --git a/tensorboard/data/server/cli/dynamic_logdir.rs b/tensorboard/data/server/cli/dynamic_logdir.rs new file mode 100644 index 0000000000..c275485b68 --- /dev/null +++ b/tensorboard/data/server/cli/dynamic_logdir.rs @@ -0,0 +1,98 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! Log directory as specified by user arguments. + +use log::error; +use std::collections::HashMap; +use std::io::{self, Read}; +use std::path::PathBuf; + +use crate::disk_logdir::DiskLogdir; +use crate::gcs; +use crate::logdir::{EventFileBuf, Logdir}; +use crate::types::Run; + +/// A logdir dynamically dispatched over supported implementations. +pub enum DynLogdir { + Disk(DiskLogdir), + Gcs(gcs::Logdir), +} + +/// A file from any one of [`DynLogdir`]'s underlying implementations. +pub enum DynFile { + Disk(::File), + Gcs(::File), +} + +impl DynLogdir { + /// Parses a `DynLogdir` from a user-supplied path. + /// + /// This succeeds unless the path represents a GCS logdir and no HTTP client can be opened. In + /// case of failure, errors will be logged to the active logger. + /// + /// # Panics + /// + /// May panic in debug mode if called from a thread with an active Tokio runtime; see + /// [seanmonstar/reqwest#1017]. + /// + /// [seanmonstar/reqwest#1017]: https://github.com/seanmonstar/reqwest/issues/1017 + pub fn new(path: PathBuf) -> Option { + let path_str = path.to_string_lossy(); + let gcs_path = match path_str.strip_prefix("gs://") { + // Assume that anything not starting with `gs://` is a path on disk. + None => return Some(DynLogdir::Disk(DiskLogdir::new(path))), + Some(p) => p, + }; + let mut parts = gcs_path.splitn(2, '/'); + let bucket = parts.next().unwrap().to_string(); // splitn always yields at least one element + let prefix = parts.next().unwrap_or("").to_string(); + let client = match gcs::Client::new() { + Err(e) => { + error!("Could not open GCS connection: {}", e); + return None; + } + Ok(c) => c, + }; + Some(DynLogdir::Gcs(gcs::Logdir::new(client, bucket, prefix))) + } +} + +impl crate::logdir::Logdir for DynLogdir { + type File = DynFile; + + fn discover(&self) -> io::Result>> { + match self { + Self::Disk(x) => x.discover(), + Self::Gcs(x) => x.discover(), + } + } + + fn open(&self, path: &EventFileBuf) -> io::Result { + match self { + Self::Disk(x) => x.open(path).map(DynFile::Disk), + Self::Gcs(x) => x.open(path).map(DynFile::Gcs), + } + } +} + +impl Read for DynFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + Self::Disk(x) => x.read(buf), + Self::Gcs(x) => x.read(buf), + } + } +} diff --git a/tensorboard/data/server/gcs.rs b/tensorboard/data/server/gcs.rs index bbf6727211..8c2b8c1ba7 100644 --- a/tensorboard/data/server/gcs.rs +++ b/tensorboard/data/server/gcs.rs @@ -16,5 +16,7 @@ limitations under the License. //! Google Cloud Storage interop. mod client; +mod logdir; pub use client::Client; +pub use logdir::Logdir; diff --git a/tensorboard/data/server/gcs/client.rs b/tensorboard/data/server/gcs/client.rs index 7c567df7fb..5269e885a8 100644 --- a/tensorboard/data/server/gcs/client.rs +++ b/tensorboard/data/server/gcs/client.rs @@ -25,6 +25,10 @@ const STORAGE_BASE: &str = "https://storage.googleapis.com"; const API_BASE: &str = "https://www.googleapis.com/storage/v1"; /// GCS client. +/// +/// Cloning a GCS client is cheap and shares the underlying connection pool, as with a +/// [`reqwest::Client`]. +#[derive(Clone)] pub struct Client { http: HttpClient, } diff --git a/tensorboard/data/server/gcs/logdir.rs b/tensorboard/data/server/gcs/logdir.rs new file mode 100644 index 0000000000..57e2406647 --- /dev/null +++ b/tensorboard/data/server/gcs/logdir.rs @@ -0,0 +1,145 @@ +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +//! Adapter from GCS to TensorBoard logdirs. + +use log::warn; +use reqwest::StatusCode; +use std::collections::HashMap; +use std::io::{self, BufReader, Read}; +use std::path::{Path, PathBuf}; + +use super::Client; +use crate::logdir::{EventFileBuf, EVENT_FILE_BASENAME_INFIX}; +use crate::types::Run; + +/// A reference to a GCS object with a read offset. +pub struct File { + gcs: Client, + bucket: String, + object: String, + pos: u64, +} + +impl File { + fn new(gcs: Client, bucket: String, object: String) -> Self { + Self { + gcs, + bucket, + object, + pos: 0, + } + } +} + +fn reqwest_to_io_error(e: reqwest::Error) -> io::Error { + let kind = match e.status() { + Some(StatusCode::NOT_FOUND) => io::ErrorKind::NotFound, + Some(StatusCode::FORBIDDEN) => io::ErrorKind::PermissionDenied, + Some(StatusCode::UNAUTHORIZED) => io::ErrorKind::PermissionDenied, + Some(StatusCode::REQUEST_TIMEOUT) => io::ErrorKind::TimedOut, + _ if e.is_timeout() => io::ErrorKind::TimedOut, + _ if e.is_decode() => io::ErrorKind::InvalidData, + _ => io::ErrorKind::Other, + }; + io::Error::new(kind, e) +} + +impl Read for File { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + let range = self.pos..=self.pos + (buf.len() as u64 - 1); + let result = self + .gcs + .read(&self.bucket, &self.object, range) + .map_err(reqwest_to_io_error)?; + (&mut buf[0..result.len()]).copy_from_slice(&result); + self.pos += result.len() as u64; + Ok(result.len()) + } +} + +pub struct Logdir { + gcs: Client, + bucket: String, + /// Invariant: `prefix` either is empty or ends with `/`, and thus an event file name should be + /// joined onto `prefix` to form its full object name. + prefix: String, +} + +impl Logdir { + pub fn new(gcs: Client, bucket: String, mut prefix: String) -> Self { + if !prefix.is_empty() && !prefix.ends_with('/') { + prefix.push('/'); + } + Self { + gcs, + bucket, + prefix, + } + } +} + +/// Read large chunks from GCS to reduce network roundtrips. +const BUFFER_CAPACITY: usize = 1024 * 1024 * 16; + +impl crate::logdir::Logdir for Logdir { + type File = BufReader; + + fn discover(&self) -> io::Result>> { + let res = self.gcs.list(&self.bucket, &self.prefix); + let objects = res.map_err(reqwest_to_io_error)?; + let mut run_map: HashMap> = HashMap::new(); + for name in objects { + let name = match name.strip_prefix(&self.prefix) { + Some(x) => x, + None => { + warn!( + "Unexpected object name {:?} with putative prefix {:?}", + &name, &self.prefix + ); + continue; + } + }; + let path = PathBuf::from(name); + let is_event_file = path.file_name().map_or(false, |n| { + n.to_string_lossy().contains(EVENT_FILE_BASENAME_INFIX) + }); + if !is_event_file { + continue; + } + let mut run_relpath = path + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(PathBuf::new); + if run_relpath == Path::new("") { + run_relpath.push("."); + } + let run = Run(run_relpath.display().to_string()); + run_map.entry(run).or_default().push(EventFileBuf(path)); + } + Ok(run_map) + } + + fn open(&self, path: &EventFileBuf) -> io::Result { + // Paths as returned by `discover` are always valid Unicode. + let mut object = self.prefix.clone(); + object.push_str(path.0.to_string_lossy().as_ref()); + let file = File::new(self.gcs.clone(), self.bucket.clone(), object); + Ok(BufReader::with_capacity(BUFFER_CAPACITY, file)) + } +}