Skip to content

Commit

Permalink
DataLoaders 5: add support for external binary DataLoaders (PATH) (
Browse files Browse the repository at this point in the history
…#4521)

Turn any executable in your `PATH` into a `DataLoader` by logging data
to stdout.

Includes examples for all supported languages.


![image](https://github.com/rerun-io/rerun/assets/2910679/82391961-9ac1-4bd8-bdbe-a7378ed7a3fa)


Checks:
- [x] `rerun-loader-rust-file` just prints help message
- [x] `rerun-loader-rust-file
examples/rust/external_data_loader/src/main.rs` successfully does
nothing
- [x] `rerun-loader-rust-file
examples/rust/external_data_loader/src/main.rs | rerun -` works
- [x] `rerun-loader-python-file` just prints help message
- [x] `rerun-loader-python-file
examples/python/external_data_loader/main.py` successfully does nothing
- [x] `rerun-loader-python-file
examples/python/external_data_loader/main.py | rerun -` works
- [x] `rerun-loader-cpp-file` just prints help message
- [x] `rerun-loader-cpp-file examples/cpp/external_data_loader/main.cpp`
successfully does nothing
- [x] `rerun-loader-cpp-file examples/cpp/external_data_loader/main.cpp
| rerun -` works
- [x] `cargo r -p rerun-cli --no-default-features --features
native_viewer -- examples/assets`
- [x] Native: `File > Open > examples/assets/`
- [x] Native: `Drag-n-drop > examples/assets/`


---

Part of a series of PRs to make it possible to load _any_ file from the
local filesystem, by any means, on web and native:
- #4516
- #4517 
- #4518 
- #4519 
- #4520 
- #4521 
- TODO: register custom loaders
- TODO: high level docs and guides for everything related to loading
files
  • Loading branch information
teh-cmc authored Dec 15, 2023
1 parent 5254c42 commit 8ebaf8d
Show file tree
Hide file tree
Showing 21 changed files with 557 additions and 10 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

190 changes: 190 additions & 0 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::io::Read;

use once_cell::sync::Lazy;

/// To register a new external data loader, simply add an executable in your $PATH whose name
/// starts with this prefix.
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";

/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
///
/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
///
/// External loaders are _not_ registered on a per-extension basis: we want users to be able to
/// filter data on a much more fine-grained basis that just file extensions (e.g. checking the file
/// itself for magic bytes).
pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
re_tracing::profile_function!();

use walkdir::WalkDir;

let dirpaths = std::env::var("PATH")
.ok()
.into_iter()
.flat_map(|paths| paths.split(':').map(ToOwned::to_owned).collect::<Vec<_>>())
.map(std::path::PathBuf::from);

let executables: ahash::HashSet<_> = dirpaths
.into_iter()
.flat_map(|dirpath| {
WalkDir::new(dirpath).into_iter().filter_map(|entry| {
let Ok(entry) = entry else {
return None;
};
let filepath = entry.path();
let is_rerun_loader = filepath.file_name().map_or(false, |filename| {
filename
.to_string_lossy()
.starts_with(EXTERNAL_DATA_LOADER_PREFIX)
});
(filepath.is_file() && is_rerun_loader).then(|| filepath.to_owned())
})
})
.collect();

// NOTE: We call all available loaders and do so in parallel: order is irrelevant here.
executables.into_iter().collect()
});

/// Iterator over all registered external [`crate::DataLoader`]s.
#[inline]
pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
EXTERNAL_LOADER_PATHS.iter().cloned()
}

// ---

/// A [`crate::DataLoader`] that forwards the path to load to all executables present in
/// the user's `PATH` with a name that starts with `EXTERNAL_DATA_LOADER_PREFIX`.
///
/// The external loaders are expected to log rrd data to their standard output.
///
/// Refer to our `external_data_loader` example for more information.
pub struct ExternalLoader;

impl crate::DataLoader for ExternalLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.External".into()
}

fn load_from_path(
&self,
store_id: re_log_types::StoreId,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
use std::process::{Command, Stdio};

re_tracing::profile_function!(filepath.display().to_string());

for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let filepath = filepath.clone();
let tx = tx.clone();

// NOTE: spawn is fine, the entire loader is native-only.
rayon::spawn(move || {
re_tracing::profile_function!();

let child = Command::new(exe)
.arg(filepath.clone())
.args(["--recording-id".to_owned(), store_id.to_string()])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();

let mut child = match child {
Ok(child) => child,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};

let Some(stdout) = child.stdout.take() else {
let reason = "stdout unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
let Some(stderr) = child.stderr.take() else {
let reason = "stderr unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};

re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
decode_and_stream(&filepath, &tx, decoder);
}
Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
// The child was not interested in that file and left without logging
// anything.
// That's fine, we just need to make sure to check its exit status further
// down, still.
return;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
return;
}
};

let status = match child.wait() {
Ok(output) => output,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};

if !status.success() {
let mut stderr = std::io::BufReader::new(stderr);
let mut reason = String::new();
stderr.read_to_string(&mut reason).ok();
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
}
});
}

Ok(())
}

#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Ok(()) // simply not interested
}
}

fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

for msg in decoder {
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
continue;
}
};
if tx.send(msg.into()).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
}
20 changes: 18 additions & 2 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
/// [There are plans to make this generic over any URI](https://github.com/rerun-io/rerun/issues/4525).
///
/// Rerun comes with a few [`DataLoader`]s by default:
/// - [`RrdLoader`] for [Rerun files],
/// - [`RrdLoader`] for [Rerun files].
/// - [`ArchetypeLoader`] for:
/// - [3D models]
/// - [Images]
/// - [Point clouds]
/// - [Text files]
/// - [`DirectoryLoader`] for recursively loading folders.
/// - [`ExternalLoader`], which looks for user-defined data loaders in $PATH.
///
/// ## Execution
///
Expand All @@ -36,9 +39,10 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
///
/// On native, [`DataLoader`]s are executed in parallel.
///
/// [Rerun extensions]: crate::SUPPORTED_RERUN_EXTENSIONS
/// [Rerun files]: crate::SUPPORTED_RERUN_EXTENSIONS
/// [3D models]: crate::SUPPORTED_MESH_EXTENSIONS
/// [Images]: crate::SUPPORTED_IMAGE_EXTENSIONS
/// [Point clouds]: crate::SUPPORTED_POINT_CLOUD_EXTENSIONS
/// [Text files]: crate::SUPPORTED_TEXT_EXTENSIONS
//
// TODO(#4525): `DataLoader`s should support arbitrary URIs
Expand Down Expand Up @@ -206,6 +210,8 @@ static BUILTIN_LOADERS: Lazy<Vec<Arc<dyn DataLoader>>> = Lazy::new(|| {
Arc::new(RrdLoader) as Arc<dyn DataLoader>,
Arc::new(ArchetypeLoader),
Arc::new(DirectoryLoader),
#[cfg(not(target_arch = "wasm32"))]
Arc::new(ExternalLoader),
]
});

Expand All @@ -221,6 +227,16 @@ mod loader_archetype;
mod loader_directory;
mod loader_rrd;

#[cfg(not(target_arch = "wasm32"))]
mod loader_external;

pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) use self::loader_external::EXTERNAL_LOADER_PATHS;
#[cfg(not(target_arch = "wasm32"))]
pub use self::loader_external::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_PREFIX,
};
6 changes: 5 additions & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ mod web_sockets;
mod load_stdin;

pub use self::data_loader::{
iter_loaders, ArchetypeLoader, DataLoader, DataLoaderError, LoadedData, RrdLoader,
iter_loaders, ArchetypeLoader, DataLoader, DataLoaderError, DirectoryLoader, LoadedData,
RrdLoader,
};
pub use self::data_source::DataSource;
pub use self::load_file::{extension, load_from_file_contents};
pub use self::web_sockets::connect_to_ws_url;

#[cfg(not(target_arch = "wasm32"))]
pub use self::data_loader::{iter_external_loaders, ExternalLoader};

#[cfg(not(target_arch = "wasm32"))]
pub use self::load_file::load_from_path;

Expand Down
9 changes: 8 additions & 1 deletion crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,17 @@ pub(crate) fn load(
is_dir: bool,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
#[cfg(target_arch = "wasm32")]
let has_external_loaders = false;
#[cfg(not(target_arch = "wasm32"))]
let has_external_loaders = !crate::data_loader::EXTERNAL_LOADER_PATHS.is_empty();

let extension = extension(path);
let is_builtin = is_associated_with_builtin_loader(path, is_dir);

if !is_builtin {
// If there are no external loaders registered (which is always the case on wasm) and we don't
// have a builtin loader for it, then we know for a fact that we won't be able to load it.
if !is_builtin && !has_external_loaders {
return if extension.is_empty() {
Err(anyhow::anyhow!("files without extensions (file.XXX) are not supported").into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/re_ui/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl UICommand {
"Save data for the current loop selection to a Rerun data file (.rrd)",
),

UICommand::Open => ("Open…", "Open a Rerun Data File (.rrd)"),
UICommand::Open => ("Open…", "Open any supported files (.rrd, images, meshes, …)"),

UICommand::CloseCurrentRecording => (
"Close current Recording",
Expand Down
20 changes: 15 additions & 5 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,11 +1297,21 @@ fn file_saver_progress_ui(egui_ctx: &egui::Context, background_tasks: &mut Backg
#[cfg(not(target_arch = "wasm32"))]
fn open_file_dialog_native() -> Vec<std::path::PathBuf> {
re_tracing::profile_function!();
let supported: Vec<_> = re_data_source::supported_extensions().collect();
rfd::FileDialog::new()
.add_filter("Supported files", &supported)
.pick_files()
.unwrap_or_default()

let supported: Vec<_> = if re_data_source::iter_external_loaders().len() == 0 {
re_data_source::supported_extensions().collect()
} else {
vec![]
};

let mut dialog = rfd::FileDialog::new();

// If there's at least one external loader registered, then literally anything goes!
if !supported.is_empty() {
dialog = dialog.add_filter("Supported files", &supported);
}

dialog.pick_files().unwrap_or_default()
}

#[cfg(target_arch = "wasm32")]
Expand Down
4 changes: 4 additions & 0 deletions examples/assets/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
int main() {
std::cout << "That will only work with the right plugin in your $PATH!" << std::endl;
std::cout << "Checkout the `external_data_loader` C++ example." << std::endl;
}
4 changes: 4 additions & 0 deletions examples/assets/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from __future__ import annotations

print("That will only work with the right plugin in your $PATH!")
print("Checkout the `external_data_loader` Python example.")
4 changes: 4 additions & 0 deletions examples/assets/example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() {
println!("That will only work with the right plugin in your $PATH!");
println!("Checkout the `external_data_loader` Rust example.");
}
2 changes: 2 additions & 0 deletions examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(clock)
add_subdirectory(custom_collection_adapter)
add_subdirectory(dna)
add_subdirectory(external_data_loader)
add_subdirectory(minimal)
add_subdirectory(shared_recording)
add_subdirectory(spawn_viewer)
Expand All @@ -15,3 +16,4 @@ add_dependencies(examples example_minimal)
add_dependencies(examples example_shared_recording)
add_dependencies(examples example_spawn_viewer)
add_dependencies(examples example_stdio)
add_dependencies(examples rerun-loader-cpp-file)
32 changes: 32 additions & 0 deletions examples/cpp/external_data_loader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
cmake_minimum_required(VERSION 3.16...3.27)

# If you use the example outside of the Rerun SDK you need to specify
# where the rerun_c build is to be found by setting the `RERUN_CPP_URL` variable.
# This can be done by passing `-DRERUN_CPP_URL=<path to rerun_sdk_cpp zip>` to cmake.
if(DEFINED RERUN_REPOSITORY)
add_executable(rerun-loader-cpp-file main.cpp)
rerun_strict_warning_settings(rerun-loader-cpp-file)
else()
project(rerun-loader-cpp-file LANGUAGES CXX)

add_executable(rerun-loader-cpp-file main.cpp)

# Set the path to the rerun_c build.
set(RERUN_CPP_URL "https://github.com/rerun-io/rerun/releases/latest/download/rerun_cpp_sdk.zip" CACHE STRING "URL to the rerun_cpp zip.")
option(RERUN_FIND_PACKAGE "Whether to use find_package to find a preinstalled rerun package (instead of using FetchContent)." OFF)

if(RERUN_FIND_PACKAGE)
find_package(rerun_sdk REQUIRED)
else()
# Download the rerun_sdk
include(FetchContent)
FetchContent_Declare(rerun_sdk URL ${RERUN_CPP_URL})
FetchContent_MakeAvailable(rerun_sdk)
endif()

# Rerun requires at least C++17, but it should be compatible with newer versions.
set_property(TARGET rerun-loader-cpp-file PROPERTY CXX_STANDARD 17)
endif()

# Link against rerun_sdk.
target_link_libraries(rerun-loader-cpp-file PRIVATE rerun_sdk)
Loading

0 comments on commit 8ebaf8d

Please sign in to comment.