Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tensorboard/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,10 @@ py_library(
":version",
"//tensorboard:expect_absl_flags_installed",
"//tensorboard:expect_absl_logging_installed",
"//tensorboard:expect_grpc_installed",
"//tensorboard/backend:application",
"//tensorboard/backend/event_processing:data_ingester",
"//tensorboard/backend/event_processing:event_file_inspector",
"//tensorboard/data:grpc_provider",
"//tensorboard/data:server_ingester",
"//tensorboard/util:argparse_util",
"@org_pocoo_werkzeug",
"@org_pythonhosted_six",
Expand Down
1 change: 1 addition & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ py_library(
":data_provider",
":event_multiplexer",
":tag_types",
"//tensorboard/data:ingester",
"//tensorboard/plugins/audio:metadata",
"//tensorboard/plugins/histogram:metadata",
"//tensorboard/plugins/image:metadata",
Expand Down
3 changes: 2 additions & 1 deletion tensorboard/backend/event_processing/data_ingester.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tensorboard.backend.event_processing import data_provider
from tensorboard.backend.event_processing import plugin_event_multiplexer
from tensorboard.backend.event_processing import tag_types
from tensorboard.data import ingester
from tensorboard.plugins.audio import metadata as audio_metadata
from tensorboard.plugins.histogram import metadata as histogram_metadata
from tensorboard.plugins.image import metadata as image_metadata
Expand All @@ -48,7 +49,7 @@
logger = tb_logging.get_logger()


class LocalDataIngester(object):
class LocalDataIngester(ingester.DataIngester):
"""Data ingestion implementation to use when running locally."""

def __init__(self, flags):
Expand Down
53 changes: 53 additions & 0 deletions tensorboard/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,59 @@ py_test(
],
)

py_library(
name = "ingester",
srcs = ["ingester.py"],
srcs_version = "PY3",
)

py_test(
name = "ingester_test",
size = "small",
srcs = ["ingester_test.py"],
srcs_version = "PY3",
tags = ["support_notf"],
deps = [
":ingester",
],
)

config_setting(
name = "link_data_server",
define_values = {"link_data_server": "true"},
)

py_library(
name = "server_ingester",
srcs = ["server_ingester.py"],
data = select({
":link_data_server": ["//tensorboard/data/server"],
"//conditions:default": [],
}),
srcs_version = "PY3",
deps = [
":grpc_provider",
":ingester",
"//tensorboard:expect_grpc_installed",
"//tensorboard/util:tb_logging",
],
)

py_test(
name = "server_ingester_test",
size = "medium", # time.sleep
timeout = "short",
srcs = ["server_ingester_test.py"],
srcs_version = "PY3",
tags = ["support_notf"],
deps = [
":grpc_provider",
":server_ingester",
"//tensorboard:expect_grpc_installed",
"//tensorboard:test",
],
)

py_library(
name = "grpc_provider",
srcs = ["grpc_provider.py"],
Expand Down
46 changes: 46 additions & 0 deletions tensorboard/data/ingester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Abstraction for data ingestion logic."""

import abc


class DataIngester(metaclass=abc.ABCMeta):
"""Link between a data source and a data provider.

A data ingester starts a reload operation in the background and
provides a data provider as a view.
"""

@property
@abc.abstractmethod
def data_provider(self):
"""Returns a `DataProvider`.

It may be an error to dereference this before `start` is called.
"""
pass

@abc.abstractmethod
def start(self):
"""Starts ingesting data.

This may start a background thread or process, and will return
once communication with that task is established. It won't block
forever as data is reloaded.

Must only be called once.
"""
pass
21 changes: 21 additions & 0 deletions tensorboard/data/ingester_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Unit tests for `tensorboard.ingester`."""

from tensorboard.data import ingester

# This is a pure abstract class. There's really nothing to test other than that
# it executes successfully.
del ingester
43 changes: 41 additions & 2 deletions tensorboard/data/server/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,45 @@ Rust source files in your editor. For editor setup, consult

[ra-docs]: https://rust-analyzer.github.io/

## Running under TensorBoard

You can point TensorBoard at a data server in two ways: start the server
yourself and give TensorBoard an address, or tell TensorBoard to start the
server as a subprocess.

To connect to an existing server, pass `--grpc_data_provider ADDRESS`, where the
address is like `localhost:6806`. Thus:

```
bazel run -c opt //tensorboard -- --grpc_data_provider localhost:6806
```

You don’t have to pass a `--logdir` if you do this, but you do have to
concurrently run `//tensorboard/data/server` (say, in the background, or in a
separate shell). You can also swap out the data server whenever you want without
restarting TensorBoard; new RPCs will transparently reconnect. The server
doesn’t have to be running when TensorBoard starts.

To tell TensorBoard to start the server as a subprocess, build with
`--define=link_data_server=true` and pass `--load_fast` to TensorBoard along
with a normal `--logdir`. Thus:

```
bazel run -c opt --define=link_data_server=true //tensorboard -- \
--load_fast --logdir ~/tensorboard_data/mnist/ --bind_all --verbosity 0
```

This is an easier one-shot solution, but requires a `--define` flag, offers less
flexibility over the flags to the data server, and requires restarting
TensorBoard if you want to restart the data server (though that’s not usually a
big deal). The data server will automatically shut down when TensorBoard exits
for any reason.

As an alternative to `--define=link_data_server=true`, you can set the
`TENSORBOARD_DATA_SERVER_BINARY` environment variable to the path to a data
server binary, and pass `--load_fast`. If running with `bazel run`, this should
be an absolute path.

## Adding third-party dependencies

Rust dependencies are usually hosted on [crates.io]. We use [`cargo-raze`][raze]
Expand All @@ -51,8 +90,8 @@ dependency:
package on <https://crates.io/>.
3. Change into the `tensorboard/data/server/` directory.
4. Run `cargo fetch` to update `Cargo.lock`. Running this before `cargo raze`
ensures that the `http_archive` workspace rules in the generated build
files will have `sha256` checksums.
ensures that the `http_archive` workspace rules in the generated build files
will have `sha256` checksums.
5. Run `cargo raze` to update `third_party/rust/...`.

This will add a new target like `//third_party/rust:rand`. Manually build it
Expand Down
60 changes: 48 additions & 12 deletions tensorboard/data/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ limitations under the License.
==============================================================================*/

use clap::Clap;
use log::{debug, info, LevelFilter};
use std::io::Read;
use log::{debug, error, info, LevelFilter};
use std::fs::File;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,6 +75,15 @@ struct Opts {
/// sense) but not killed.
#[clap(long)]
die_after_stdin: bool,

/// Write bound port to this file
///
/// Once a server socket is opened, write the port on which it's listening to the file at this
/// path. Useful with `--port 0`. Port will be written as ASCII decimal followed by a newline
/// (e.g., "6806\n"). If the server fails to start, this file may not be written at all. If the
/// port file is specified but cannot be written, the server will die.
#[clap(long)]
port_file: Option<PathBuf>,
}

/// A duration in seconds.
Expand Down Expand Up @@ -113,21 +123,39 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bound = listener.local_addr()?;
eprintln!("listening on {:?}", bound);

if let Some(port_file) = opts.port_file {
let port = bound.port();
if let Err(e) = write_port_file(&port_file, port) {
error!(
"Failed to write port \"{}\" to {}: {}",
port,
port_file.display(),
e
);
std::process::exit(1);
}
info!("Wrote port \"{}\" to {}", port, port_file.display());
}

// Leak the commit object, since the Tonic server must have only 'static references. This only
// leaks the outer commit structure (of constant size), not the pointers to the actual data.
let commit: &'static Commit = Box::leak(Box::new(Commit::new()));

thread::Builder::new()
.name("Reloader".to_string())
.spawn(move || {
let mut loader = LogdirLoader::new(commit, opts.logdir);
loop {
info!("Starting load cycle");
let start = Instant::now();
loader.reload();
let end = Instant::now();
info!("Finished load cycle ({:?})", end - start);
thread::sleep(opts.reload_interval.duration());
.spawn({
let logdir = opts.logdir;
let reload_interval = opts.reload_interval;
move || {
let mut loader = LogdirLoader::new(commit, logdir);
loop {
info!("Starting load cycle");
let start = Instant::now();
loader.reload();
let end = Instant::now();
info!("Finished load cycle ({:?})", end - start);
thread::sleep(reload_interval.duration());
}
}
})
.expect("failed to spawn reloader thread");
Expand Down Expand Up @@ -156,3 +184,11 @@ fn die_after_stdin() {
info!("Stdin closed; exiting");
std::process::exit(0);
}

/// Writes `port` to file `path` as an ASCII decimal followed by newline.
fn write_port_file(path: &Path, port: u16) -> std::io::Result<()> {
let mut f = File::create(path)?;
writeln!(f, "{}", port)?;
f.sync_all()?;
Ok(())
}
Loading