From 179dbf1bbc1b36a7afc90b1546ca7ad692c0a211 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 2 May 2024 13:30:27 +0200 Subject: [PATCH 01/16] Simplify the `run.rs` script of the `rust-dataflow` example using `xshell` The `xshell` crate provides a more convenient way to build and run a `Command`, which is more similar to traditional bash scripts. Using this crate, we can simplify our `run.rs` script while still being platform-independent and not requiring any external dependencies. We can also still run the examples using `cargo run --example`. --- Cargo.lock | 1 + Cargo.toml | 1 + examples/rust-dataflow/run.rs | 77 +++++++++++++++++++---------------- 3 files changed, 44 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1dd9b003..4436bb645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2368,6 +2368,7 @@ dependencies = [ "tokio-stream", "tracing", "uuid", + "xshell", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 09690ea9c..08956d42e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ uuid = { version = "1.7", features = ["v7", "serde"] } tracing = "0.1.36" futures = "0.3.25" tokio-stream = "0.1.11" +xshell = "0.2.6" [[example]] name = "c-dataflow" diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index f5e035a50..9cd668b64 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -1,46 +1,53 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::path::Path; +use eyre::ContextCompat; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; - run_dataflow(dataflow).await?; + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_rust-sink.txt")?; + if sink_output.lines().count() < 50 { + eyre::bail!("sink did not receive the expected number of messages") + } Ok(()) } -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 6f85cfb4295265a7dc2b2b62d465d941380037de Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 2 May 2024 13:58:29 +0200 Subject: [PATCH 02/16] Use `xshell` for Python dataflow example to try venv activation --- examples/python-dataflow/run.rs | 168 +++++++++++++++++--------------- 1 file changed, 90 insertions(+), 78 deletions(-) diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index a14b553f0..c3136bde7 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,30 +1,77 @@ -use dora_core::{get_pip_path, get_python_path, run}; +use dora_core::{get_pip_path, get_python_path}; use dora_download::download_file; -use dora_tracing::set_up_tracing; -use eyre::{bail, ContextCompat, WrapErr}; -use std::path::Path; +use eyre::{ContextCompat, WrapErr}; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; #[tokio::main] async fn main() -> eyre::Result<()> { - set_up_tracing("python-dataflow-runner")?; + let python = get_python_path().context("Could not get python binary")?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // create a new shell in this folder + let sh = prepare_shell()?; + + // prepare Python virtual environment + prepare_venv(&sh, &python)?; + + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; + + // install/upgrade pip, then install requirements + cmd!(sh, "{python} -m pip install --upgrade pip").run()?; + let pip = get_pip_path().context("Could not get pip binary")?; + cmd!(sh, "{pip} install -r requirements.txt").run()?; + + // build the dora Python package (you can skip this if you installed the Python dora package) + { + let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("apis") + .join("python") + .join("node"); + let _dir = sh.push_dir(python_node_api_dir); + cmd!(sh, "maturin develop").run()?; + } - run( - get_python_path().context("Could not get python binary")?, - &["-m", "venv", "../.env"], - None, + download_file( + "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", + Path::new("yolov8n.pt"), ) .await - .context("failed to create venv")?; - let venv = &root.join("examples").join(".env"); - std::env::set_var( + .context("Could not download weights.")?; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; + + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; + + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_object_detection.txt")?; + if sink_output.lines().count() < 50 { + eyre::bail!("object dectection node did not receive the expected number of messages") + } + + Ok(()) +} + +/// Prepares a Python virtual environment. +/// +/// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` +/// if you're running bash. +fn prepare_venv(sh: &Shell, python: &Path) -> eyre::Result<()> { + cmd!(sh, "{python} -m venv ../.env").run()?; + let venv = sh.current_dir().join("..").join(".env"); + sh.set_var( "VIRTUAL_ENV", venv.to_str().context("venv path not valid unicode")?, ); - let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 let venv_bin = if cfg!(windows) { @@ -32,71 +79,36 @@ async fn main() -> eyre::Result<()> { } else { venv.join("bin") }; + let path_separator = if cfg!(windows) { ';' } else { ':' }; - if cfg!(windows) { - std::env::set_var( - "PATH", - format!( - "{};{orig_path}", - venv_bin.to_str().context("venv path not valid unicode")? - ), - ); - } else { - std::env::set_var( - "PATH", - format!( - "{}:{orig_path}", - venv_bin.to_str().context("venv path not valid unicode")? - ), - ); - } - - run( - get_python_path().context("Could not get pip binary")?, - &["-m", "pip", "install", "--upgrade", "pip"], - None, - ) - .await - .context("failed to install pip")?; - run( - get_pip_path().context("Could not get pip binary")?, - &["install", "-r", "requirements.txt"], - None, - ) - .await - .context("pip install failed")?; - - run( - "maturin", - &["develop"], - Some(&root.join("apis").join("python").join("node")), - ) - .await - .context("maturin develop failed")?; - download_file( - "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", - Path::new("yolov8n.pt"), - ) - .await - .context("Could not download weights.")?; - - let dataflow = Path::new("dataflow.yml"); - run_dataflow(dataflow).await?; + sh.set_var( + "PATH", + format!( + "{}{path_separator}{}", + venv_bin.to_str().context("venv path not valid unicode")?, + std::env::var("PATH")? + ), + ); Ok(()) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) +} + +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 1a5fa7fe45c69c1db26a6786dfdd999c9a9f206d Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 2 May 2024 16:18:51 +0200 Subject: [PATCH 03/16] Fix: Use correct python/pip binaries after setting up venv We don't modify the PATH for the current executable anymore, so we cannot use the `get_python_path`/`get_pip_path` functions anymore. --- examples/python-dataflow/run.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index c3136bde7..38ce105f5 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -6,21 +6,18 @@ use xshell::{cmd, Shell}; #[tokio::main] async fn main() -> eyre::Result<()> { - let python = get_python_path().context("Could not get python binary")?; - // create a new shell in this folder let sh = prepare_shell()?; // prepare Python virtual environment - prepare_venv(&sh, &python)?; + prepare_venv(&sh)?; // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) let dora = prepare_dora(&sh)?; // install/upgrade pip, then install requirements - cmd!(sh, "{python} -m pip install --upgrade pip").run()?; - let pip = get_pip_path().context("Could not get pip binary")?; - cmd!(sh, "{pip} install -r requirements.txt").run()?; + cmd!(sh, "python -m pip install --upgrade pip").run()?; + cmd!(sh, "pip install -r requirements.txt").run()?; // build the dora Python package (you can skip this if you installed the Python dora package) { @@ -64,9 +61,10 @@ async fn main() -> eyre::Result<()> { /// /// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` /// if you're running bash. -fn prepare_venv(sh: &Shell, python: &Path) -> eyre::Result<()> { +fn prepare_venv(sh: &Shell) -> eyre::Result<()> { + let python = get_python_path().context("Could not get python binary")?; cmd!(sh, "{python} -m venv ../.env").run()?; - let venv = sh.current_dir().join("..").join(".env"); + let venv = sh.current_dir().parent().unwrap().join(".env"); sh.set_var( "VIRTUAL_ENV", venv.to_str().context("venv path not valid unicode")?, From 20498f7e1eebd1fa091eafaf492aa27314f6cf39 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 3 May 2024 11:31:46 +0200 Subject: [PATCH 04/16] Remove unused import --- examples/python-dataflow/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index 38ce105f5..c118943a3 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,4 +1,4 @@ -use dora_core::{get_pip_path, get_python_path}; +use dora_core::get_python_path; use dora_download::download_file; use eyre::{ContextCompat, WrapErr}; use std::path::{Path, PathBuf}; From 4117e6087e09d21bd9d351f3fbd39d89a4f28d50 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 3 May 2024 11:40:06 +0200 Subject: [PATCH 05/16] Migrate run.rs script of rust-ros2-dataflow example to xshell --- examples/rust-ros2-dataflow/run.rs | 77 +++++++++++++++++------------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/examples/rust-ros2-dataflow/run.rs b/examples/rust-ros2-dataflow/run.rs index a14dce485..2cae73b38 100644 --- a/examples/rust-ros2-dataflow/run.rs +++ b/examples/rust-ros2-dataflow/run.rs @@ -1,46 +1,57 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::path::Path; +use eyre::ContextCompat; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; #[tokio::main] async fn main() -> eyre::Result<()> { - set_up_tracing("rust-ros2-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; - run_dataflow(dataflow).await?; + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_rust-node.txt")?; + if !sink_output + .lines() + .any(|l| l.starts_with("received pose event: Ok(")) + { + eyre::bail!("node did not receive any pose events") + } Ok(()) } -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 1b665b0c442a41ed517051bf9492fd98ac591bed Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 3 May 2024 12:25:40 +0200 Subject: [PATCH 06/16] CLI: Add a hidden `connected-machines` command This command is useful for testing examples with multiple daemons. --- binaries/cli/src/main.rs | 41 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8ac75485e..3e9ddad4c 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -14,7 +14,7 @@ use dora_daemon::Daemon; use dora_tracing::set_up_tracing; use duration_str::parse; use eyre::{bail, Context}; -use std::net::SocketAddr; +use std::{collections::BTreeSet, net::SocketAddr}; use std::{ net::{IpAddr, Ipv4Addr}, path::PathBuf, @@ -102,6 +102,8 @@ enum Command { dataflow: Option, node: String, }, + #[clap(hide = true)] + ConnectedMachines, // Metrics, // Stats, // Get, @@ -273,6 +275,12 @@ fn run() -> eyre::Result<()> { bail!("No dora coordinator seems to be running."); } }, + Command::ConnectedMachines => match connect_to_coordinator() { + Ok(mut session) => connected_machines(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } + }, Command::Stop { uuid, name, @@ -466,6 +474,37 @@ fn query_running_dataflows( Ok(ids) } +fn connected_machines(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { + let machines = query_connected_machines(session)?; + + if machines.is_empty() { + eprintln!("No machines are connected"); + } else { + for id in machines { + println!("{id}"); + } + } + + Ok(()) +} + +fn query_connected_machines( + session: &mut TcpRequestReplyConnection, +) -> eyre::Result> { + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::ConnectedMachines).unwrap()) + .wrap_err("failed to send connected machines message")?; + let reply: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + let ids = match reply { + ControlRequestReply::ConnectedMachines(ids) => ids, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected connected machines reply: {other:?}"), + }; + + Ok(ids) +} + fn connect_to_coordinator() -> std::io::Result> { TcpLayer::new().connect(control_socket_addr()) } From dcae010f8634fc1de6058288dd5c123e0954d5bf Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 3 May 2024 12:26:04 +0200 Subject: [PATCH 07/16] Simplify `multiple-daemons` examples using new `connected-machines` command and `xshell` crate --- examples/multiple-daemons/run.rs | 271 +++++++++---------------------- 1 file changed, 74 insertions(+), 197 deletions(-) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 6f9bcd4e5..83ac72dd0 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,219 +1,96 @@ -use dora_coordinator::{ControlEvent, Event}; -use dora_core::{ - descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, -}; -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; - +use eyre::ContextCompat; use std::{ - collections::BTreeSet, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::Path, + path::{Path, PathBuf}, + process::Command, time::Duration, }; -use tokio::{ - sync::{ - mpsc::{self, Sender}, - oneshot, - }, - task::JoinSet, -}; -use tokio_stream::wrappers::ReceiverStream; -use uuid::Uuid; - -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?; - - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; +use xshell::{cmd, Shell}; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); - let coordinator_bind = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - DORA_COORDINATOR_PORT_DEFAULT, - ); - let (coordinator_port, coordinator) = - dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) - .await?; - let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); - let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); - let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; - tracing::info!("Spawning coordinator and daemons"); - let mut tasks = JoinSet::new(); - tasks.spawn(coordinator); - tasks.spawn(daemon_a); - tasks.spawn(daemon_b); - - tracing::info!("waiting until daemons are connected to coordinator"); - let mut retries = 0; + // start the dora coordinator (in background) + Command::from(cmd!(sh, "{dora} coordinator")).spawn()?; + // wait until coordinator is ready loop { - let connected_machines = connected_machines(&coordinator_events_tx).await?; - if connected_machines.contains("A") && connected_machines.contains("B") { - break; - } else if retries > 20 { - bail!("daemon not connected after {retries} retries"); - } else { - std::thread::sleep(Duration::from_millis(500)); - retries += 1 + match cmd!(sh, "{dora} list").quiet().ignore_stderr().run() { + Ok(_) => { + println!("coordinator connected"); + break; + } + Err(_) => { + eprintln!("waiting for coordinator"); + std::thread::sleep(Duration::from_millis(100)) + } } } - tracing::info!("starting dataflow"); - let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?; - tracing::info!("started dataflow under ID `{uuid}`"); - - let running = running_dataflows(&coordinator_events_tx).await?; - if !running.iter().map(|d| d.uuid).any(|id| id == uuid) { - bail!("dataflow `{uuid}` is not running"); - } - - tracing::info!("waiting for dataflow `{uuid}` to finish"); - let mut retries = 0; + // start two daemons (in background) + Command::from(cmd!(sh, "{dora} daemon --machine-id A")).spawn()?; + Command::from(cmd!(sh, "{dora} daemon --machine-id B")).spawn()?; + // wait until both daemons are connected loop { - let running = running_dataflows(&coordinator_events_tx).await?; - if running.is_empty() { - break; - } else if retries > 100 { - bail!("dataflow not finished after {retries} retries"); - } else { - tracing::debug!("not done yet"); - std::thread::sleep(Duration::from_millis(500)); - retries += 1 + let output = cmd!(sh, "{dora} connected-machines") + .quiet() + .ignore_stderr() + .read(); + match output { + Ok(output) => { + let connected: Vec<&str> = output.lines().collect(); + if connected == ["A", "B"] { + println!("both daemons connected"); + break; + } else { + eprintln!("not all daemons connected yet (connected: {connected:?})"); + } + } + Err(err) => eprintln!("failed to query connected-machines: {err:?}"), } + std::thread::sleep(Duration::from_millis(100)); } - tracing::info!("dataflow `{uuid}` finished, destroying coordinator"); - destroy(&coordinator_events_tx).await?; - tracing::info!("joining tasks"); - while let Some(res) = tasks.join_next().await { - res.unwrap()?; + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + println!("starting dataflow"); + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + println!("dataflow finished successfully"); + let uuid = output.lines().next().context("no output")?; + + // stop the coordinator and both daemons again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_rust-sink.txt")?; + if sink_output.lines().count() < 50 { + eyre::bail!("sink did not receive the expected number of messages") } - tracing::info!("done"); Ok(()) } -async fn start_dataflow( - dataflow: &Path, - coordinator_events_tx: &Sender, -) -> eyre::Result { - let dataflow_descriptor = Descriptor::read(dataflow) - .await - .wrap_err("failed to read yaml dataflow")?; - let working_dir = dataflow - .canonicalize() - .context("failed to canonicalize dataflow path")? - .parent() - .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? - .to_owned(); - dataflow_descriptor - .check(&working_dir) - .wrap_err("could not validate yaml")?; - - let (reply_sender, reply) = oneshot::channel(); - coordinator_events_tx - .send(Event::Control(ControlEvent::IncomingRequest { - request: ControlRequest::Start { - dataflow: dataflow_descriptor, - local_working_dir: working_dir, - name: None, - }, - reply_sender, - })) - .await?; - let result = reply.await??; - let uuid = match result { - ControlRequestReply::DataflowStarted { uuid } => uuid, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - }; - Ok(uuid) -} - -async fn connected_machines( - coordinator_events_tx: &Sender, -) -> eyre::Result> { - let (reply_sender, reply) = oneshot::channel(); - coordinator_events_tx - .send(Event::Control(ControlEvent::IncomingRequest { - request: ControlRequest::ConnectedMachines, - reply_sender, - })) - .await?; - let result = reply.await??; - let machines = match result { - ControlRequestReply::ConnectedMachines(machines) => machines, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - }; - Ok(machines) -} - -async fn running_dataflows(coordinator_events_tx: &Sender) -> eyre::Result> { - let (reply_sender, reply) = oneshot::channel(); - coordinator_events_tx - .send(Event::Control(ControlEvent::IncomingRequest { - request: ControlRequest::List, - reply_sender, - })) - .await?; - let result = reply.await??; - let dataflows = match result { - ControlRequestReply::DataflowList { dataflows } => dataflows, - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - }; - Ok(dataflows) -} - -async fn destroy(coordinator_events_tx: &Sender) -> eyre::Result<()> { - let (reply_sender, reply) = oneshot::channel(); - coordinator_events_tx - .send(Event::Control(ControlEvent::IncomingRequest { - request: ControlRequest::Destroy, - reply_sender, - })) - .await?; - let result = reply.await??; - match result { - ControlRequestReply::DestroyOk => Ok(()), - ControlRequestReply::Error(err) => bail!("{err}"), - other => bail!("unexpected start dataflow reply: {other:?}"), - } -} - -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--machine-id") - .arg(machine_id) - .arg("--coordinator-addr") - .arg(coordinator); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 46433bb8c549751acfdbda3eb1861cfe40787f11 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 3 May 2024 12:35:51 +0200 Subject: [PATCH 08/16] Migrate run.rs script of cmake dataflow example to xshell --- examples/cmake-dataflow/run.rs | 90 ++++++++++++++++------------------ 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/examples/cmake-dataflow/run.rs b/examples/cmake-dataflow/run.rs index 30e3c9d11..6509286ed 100644 --- a/examples/cmake-dataflow/run.rs +++ b/examples/cmake-dataflow/run.rs @@ -1,6 +1,7 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::path::Path; +use eyre::{Context, ContextCompat}; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; #[tokio::main] async fn main() -> eyre::Result<()> { @@ -13,60 +14,55 @@ async fn main() -> eyre::Result<()> { return Ok(()); } + // create a new shell in this folder + let sh = prepare_shell()?; + + // build C++ source code using cmake let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + cmd!(sh, "cmake -DDORA_ROOT_DIR={root} -B build .").run()?; + cmd!(sh, "cmake --build build").run()?; + cmd!(sh, "cmake --install build").run()?; - tokio::fs::create_dir_all("build").await?; - let mut cmd = tokio::process::Command::new("cmake"); - cmd.arg(format!("-DDORA_ROOT_DIR={}", root.display())); - cmd.arg("-B").arg("build"); - cmd.arg("."); - if !cmd.status().await?.success() { - bail!("failed to generating make file"); - } + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let mut cmd = tokio::process::Command::new("cmake"); - cmd.arg("--build").arg("build"); - if !cmd.status().await?.success() { - bail!("failed to build a cmake-generated project binary tree"); - } + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - let mut cmd = tokio::process::Command::new("cmake"); - cmd.arg("--install").arg("build"); - if !cmd.status().await?.success() { - bail!("failed to build a cmake-generated project binary tree"); - } + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; + + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; - let dataflow = Path::new("dataflow.yml").to_owned(); - build_package("dora-runtime").await?; - run_dataflow(&dataflow).await?; + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_runtime-node-2.txt")?; + if sink_output.lines().count() < 20 { + eyre::bail!("sink did not receive the expected number of messages") + } Ok(()) } -async fn build_package(package: &str) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("build"); - cmd.arg("--package").arg(package); - if !cmd.status().await?.success() { - bail!("failed to build {package}"); - } - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From b5bd71c17fddc7bc2692c651f0c1b28d4183d3e9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 24 May 2024 15:01:07 +0200 Subject: [PATCH 09/16] Migrate benchmark example to xshell --- examples/benchmark/run.rs | 67 +++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index 78e6d88b5..7587de6d1 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -1,46 +1,43 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::path::Path; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("benchmark-runner").wrap_err("failed to set up tracing subscriber")?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora_optimized(&sh)?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + // start running the dataflow.yml + cmd!(sh, "{dora} start dataflow.yml --attach").run()?; - run_dataflow(dataflow).await?; + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; Ok(()) } -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora_optimized(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli --release").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("release").join("dora"); + Ok(dora) } From 7ec839fc6fac8851a8f9379510e211de48d95eb2 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 14:38:59 +0200 Subject: [PATCH 10/16] Migrate C dataflow example to `xshell` --- examples/c-dataflow/run.rs | 338 ++++++++++++++++++------------------- 1 file changed, 165 insertions(+), 173 deletions(-) diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index ad484edfd..a532b52a1 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -1,186 +1,178 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::{ - env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, - path::Path, -}; - -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("c-dataflow-runner").wrap_err("failed to set up tracing")?; +use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; +fn main() -> eyre::Result<()> { let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; - - tokio::fs::create_dir_all("build").await?; - - build_package("dora-node-api-c").await?; - build_c_node(root, "node.c", "c_node").await?; - build_c_node(root, "sink.c", "c_sink").await?; - - build_package("dora-operator-api-c").await?; - build_c_operator(root).await?; - - let dataflow = Path::new("dataflow.yml").to_owned(); - run_dataflow(&dataflow).await?; - - Ok(()) -} -async fn build_package(package: &str) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("build"); - cmd.arg("--package").arg(package); - if !cmd.status().await?.success() { - bail!("failed to build {package}"); + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; + + cmd!(sh, "cargo build --package dora-node-api-c").run()?; + cmd!(sh, "cargo build --package dora-operator-api-c").run()?; + + sh.create_dir("build")?; + let target_debug = root.join("target").join("debug"); + + // compile nodes + let args: &[&str] = if cfg!(target_os = "linux") { + &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] + } else if cfg!(target_os = "windows") { + &[ + "-ladvapi32", + "-luserenv", + "-lkernel32", + "-lws2_32", + "-lbcrypt", + "-lncrypt", + "-lschannel", + "-lntdll", + "-liphlpapi", + "-lcfgmgr32", + "-lcredui", + "-lcrypt32", + "-lcryptnet", + "-lfwpuclnt", + "-lgdi32", + "-lmsimg32", + "-lmswsock", + "-lole32", + "-loleaut32", + "-lopengl32", + "-lsecur32", + "-lshell32", + "-lsynchronization", + "-luser32", + "-lwinspool", + "-Wl,-nodefaultlib:libcmt", + "-D_DLL", + "-lmsvcrt", + ] + } else if cfg!(target_os = "macos") { + &[ + "-framework", + "CoreServices", + "-framework", + "Security", + "-l", + "System", + "-l", + "resolv", + "-l", + "pthread", + "-l", + "c", + "-l", + "m", + ] + } else { + panic!("unsupported target platform") }; - Ok(()) -} - -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); + cmd!( + sh, + "clang node.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_node{EXE_SUFFIX}" + ) + .run()?; + cmd!( + sh, + "clang sink.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_sink{EXE_SUFFIX}" + ) + .run()?; + + // compile operator + let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; + cmd!( + sh, + "clang -c operator.c -o build/operator.o -fdeclspec {operator_args...}" + ) + .run()?; + // link operator + let operator_link_args: &[&str] = if cfg!(target_os = "windows") { + &[ + "-ladvapi32", + "-luserenv", + "-lkernel32", + "-lws2_32", + "-lbcrypt", + "-lncrypt", + "-lschannel", + "-lntdll", + "-liphlpapi", + "-lcfgmgr32", + "-lcredui", + "-lcrypt32", + "-lcryptnet", + "-lfwpuclnt", + "-lgdi32", + "-lmsimg32", + "-lmswsock", + "-lole32", + "-loleaut32", + "-lopengl32", + "-lsecur32", + "-lshell32", + "-lsynchronization", + "-luser32", + "-lwinspool", + "-Wl,-nodefaultlib:libcmt", + "-D_DLL", + "-lmsvcrt", + ] + } else if cfg!(target_os = "macos") { + &[ + "-framework", + "CoreServices", + "-framework", + "Security", + "-l", + "System", + "-l", + "resolv", + "-l", + "pthread", + "-l", + "c", + "-l", + "m", + ] + } else { + &[] }; - Ok(()) -} + cmd!( + sh, + "clang -shared build/operator.o -L {target_debug} -l dora_operator_api_c {operator_link_args...} -o build/{DLL_PREFIX}operator{DLL_SUFFIX}" + ).run()?; -async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> { - let mut clang = tokio::process::Command::new("clang"); - clang.arg(name); - clang.arg("-l").arg("dora_node_api_c"); - #[cfg(target_os = "linux")] - { - clang.arg("-l").arg("m"); - clang.arg("-l").arg("rt"); - clang.arg("-l").arg("dl"); - clang.arg("-pthread"); - } - #[cfg(target_os = "windows")] - { - clang.arg("-ladvapi32"); - clang.arg("-luserenv"); - clang.arg("-lkernel32"); - clang.arg("-lws2_32"); - clang.arg("-lbcrypt"); - clang.arg("-lncrypt"); - clang.arg("-lschannel"); - clang.arg("-lntdll"); - clang.arg("-liphlpapi"); + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - clang.arg("-lcfgmgr32"); - clang.arg("-lcredui"); - clang.arg("-lcrypt32"); - clang.arg("-lcryptnet"); - clang.arg("-lfwpuclnt"); - clang.arg("-lgdi32"); - clang.arg("-lmsimg32"); - clang.arg("-lmswsock"); - clang.arg("-lole32"); - clang.arg("-loleaut32"); - clang.arg("-lopengl32"); - clang.arg("-lsecur32"); - clang.arg("-lshell32"); - clang.arg("-lsynchronization"); - clang.arg("-luser32"); - clang.arg("-lwinspool"); + // start running the dataflow.yml + cmd!(sh, "{dora} start dataflow.yml --attach").run()?; + + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; - clang.arg("-Wl,-nodefaultlib:libcmt"); - clang.arg("-D_DLL"); - clang.arg("-lmsvcrt"); - } - #[cfg(target_os = "macos")] - { - clang.arg("-framework").arg("CoreServices"); - clang.arg("-framework").arg("Security"); - clang.arg("-l").arg("System"); - clang.arg("-l").arg("resolv"); - clang.arg("-l").arg("pthread"); - clang.arg("-l").arg("c"); - clang.arg("-l").arg("m"); - } - clang.arg("-L").arg(root.join("target").join("debug")); - clang - .arg("--output") - .arg(Path::new("build").join(format!("{out_name}{EXE_SUFFIX}"))); - if !clang.status().await?.success() { - bail!("failed to compile c node"); - }; Ok(()) } -async fn build_c_operator(root: &Path) -> eyre::Result<()> { - let mut compile = tokio::process::Command::new("clang"); - compile.arg("-c").arg("operator.c"); - compile.arg("-o").arg("build/operator.o"); - compile.arg("-fdeclspec"); - #[cfg(unix)] - compile.arg("-fPIC"); - if !compile.status().await?.success() { - bail!("failed to compile c operator"); - }; - - let mut link = tokio::process::Command::new("clang"); - link.arg("-shared").arg("build/operator.o"); - link.arg("-L").arg(root.join("target").join("debug")); - link.arg("-l").arg("dora_operator_api_c"); - #[cfg(target_os = "windows")] - { - link.arg("-ladvapi32"); - link.arg("-luserenv"); - link.arg("-lkernel32"); - link.arg("-lws2_32"); - link.arg("-lbcrypt"); - link.arg("-lncrypt"); - link.arg("-lschannel"); - link.arg("-lntdll"); - link.arg("-liphlpapi"); - - link.arg("-lcfgmgr32"); - link.arg("-lcredui"); - link.arg("-lcrypt32"); - link.arg("-lcryptnet"); - link.arg("-lfwpuclnt"); - link.arg("-lgdi32"); - link.arg("-lmsimg32"); - link.arg("-lmswsock"); - link.arg("-lole32"); - link.arg("-loleaut32"); - link.arg("-lopengl32"); - link.arg("-lsecur32"); - link.arg("-lshell32"); - link.arg("-lsynchronization"); - link.arg("-luser32"); - link.arg("-lwinspool"); - - link.arg("-Wl,-nodefaultlib:libcmt"); - link.arg("-D_DLL"); - link.arg("-lmsvcrt"); - } - #[cfg(target_os = "macos")] - { - link.arg("-framework").arg("CoreServices"); - link.arg("-framework").arg("Security"); - link.arg("-l").arg("System"); - link.arg("-l").arg("resolv"); - link.arg("-l").arg("pthread"); - link.arg("-l").arg("c"); - link.arg("-l").arg("m"); - } - link.arg("-o") - .arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}"))); - if !link.status().await?.success() { - bail!("failed to link c operator"); - }; +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) +} - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 61d3f16bb2026e3e54e54c93c23877913a368103 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 14:55:11 +0200 Subject: [PATCH 11/16] Remove tokio dependency from Python dataflow example --- examples/python-dataflow/run.rs | 8 +++----- libraries/extensions/download/src/lib.rs | 7 +++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index c118943a3..aecac7507 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,11 +1,10 @@ use dora_core::get_python_path; -use dora_download::download_file; +use dora_download::download_file_sync; use eyre::{ContextCompat, WrapErr}; use std::path::{Path, PathBuf}; use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { +fn main() -> eyre::Result<()> { // create a new shell in this folder let sh = prepare_shell()?; @@ -29,11 +28,10 @@ async fn main() -> eyre::Result<()> { cmd!(sh, "maturin develop").run()?; } - download_file( + download_file_sync( "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", Path::new("yolov8n.pt"), ) - .await .context("Could not download weights.")?; // start up the dora daemon and coordinator diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs index b0843c83f..6be5ededf 100644 --- a/libraries/extensions/download/src/lib.rs +++ b/libraries/extensions/download/src/lib.rs @@ -5,6 +5,13 @@ use std::path::Path; use tokio::io::AsyncWriteExt; use tracing::info; +pub fn download_file_sync(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ + tokio::runtime::Runtime::new()?.block_on(download_file(url, target_path)) +} + pub async fn download_file(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> where T: reqwest::IntoUrl + std::fmt::Display + Copy, From 4514d6e9d0536faa5f5d98d6d0a68253b1612a16 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 15:21:40 +0200 Subject: [PATCH 12/16] Migrate C++ dataflow example to `xshell` --- examples/c++-dataflow/run.rs | 395 ++++++++++++----------------------- 1 file changed, 138 insertions(+), 257 deletions(-) diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index 6f966e191..0bae972fb 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -1,12 +1,9 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; -use std::{ - env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, - path::Path, -}; - -#[tokio::main] -async fn main() -> eyre::Result<()> { +use eyre::Context; +use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; +fn main() -> eyre::Result<()> { set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; if cfg!(windows) { @@ -16,284 +13,168 @@ async fn main() -> eyre::Result<()> { return Ok(()); } + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; + + cmd!(sh, "cargo build --package dora-node-api-cxx").run()?; + cmd!(sh, "cargo build --package dora-operator-api-cxx").run()?; + cmd!(sh, "cargo build --package dora-node-api-c").run()?; + cmd!(sh, "cargo build --package dora-operator-api-c").run()?; + + sh.create_dir("build")?; let root = Path::new(env!("CARGO_MANIFEST_DIR")); let target = root.join("target"); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; - - tokio::fs::create_dir_all("build").await?; - let build_dir = Path::new("build"); + let target_debug = target.join("debug"); - build_package("dora-node-api-cxx").await?; let node_cxxbridge = target .join("cxxbridge") .join("dora-node-api-cxx") .join("src"); - tokio::fs::copy( - node_cxxbridge.join("lib.rs.cc"), - build_dir.join("node-bridge.cc"), - ) - .await?; - tokio::fs::copy( - node_cxxbridge.join("lib.rs.h"), - build_dir.join("dora-node-api.h"), - ) - .await?; - tokio::fs::write( - build_dir.join("operator.h"), + sh.copy_file(node_cxxbridge.join("lib.rs.cc"), "build/node-bridge.cc")?; + sh.copy_file(node_cxxbridge.join("lib.rs.h"), "build/dora-node-api.h")?; + sh.write_file( + "build/operator.h", r###"#include "../operator-rust-api/operator.h""###, - ) - .await?; + )?; - build_package("dora-operator-api-cxx").await?; let operator_cxxbridge = target .join("cxxbridge") .join("dora-operator-api-cxx") .join("src"); - tokio::fs::copy( + sh.copy_file( operator_cxxbridge.join("lib.rs.cc"), - build_dir.join("operator-bridge.cc"), - ) - .await?; - tokio::fs::copy( + "build/operator-bridge.cc", + )?; + sh.copy_file( operator_cxxbridge.join("lib.rs.h"), - build_dir.join("dora-operator-api.h"), - ) - .await?; + "build/dora-operator-api.h", + )?; - build_package("dora-node-api-c").await?; - build_package("dora-operator-api-c").await?; - build_cxx_node( - root, - &[ - &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, - &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, - ], - "node_rust_api", - &["-l", "dora_node_api_cxx"], - ) - .await?; - build_cxx_node( - root, - &[&dunce::canonicalize( - Path::new("node-c-api").join("main.cc"), - )?], - "node_c_api", - &["-l", "dora_node_api_c"], - ) - .await?; - build_cxx_operator( + // compile nodes + let args: &[&str] = if cfg!(target_os = "linux") { + &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] + } else if cfg!(target_os = "windows") { &[ - &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, - &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, - ], - "operator_rust_api", + "-ladvapi32", + "-luserenv", + "-lkernel32", + "-lws2_32", + "-lbcrypt", + "-lncrypt", + "-lschannel", + "-lntdll", + "-liphlpapi", + "-lcfgmgr32", + "-lcredui", + "-lcrypt32", + "-lcryptnet", + "-lfwpuclnt", + "-lgdi32", + "-lmsimg32", + "-lmswsock", + "-lole32", + "-loleaut32", + "-lopengl32", + "-lsecur32", + "-lshell32", + "-lsynchronization", + "-luser32", + "-lwinspool", + "-Wl,-nodefaultlib:libcmt", + "-D_DLL", + "-lmsvcrt", + ] + } else if cfg!(target_os = "macos") { &[ + "-framework", + "CoreServices", + "-framework", + "Security", "-l", - "dora_operator_api_cxx", - "-L", - root.join("target").join("debug").to_str().unwrap(), - ], - ) - .await?; - build_cxx_operator( - &[&dunce::canonicalize( - Path::new("operator-c-api").join("operator.cc"), - )?], - "operator_c_api", - &[ + "System", + "-l", + "resolv", + "-l", + "pthread", + "-l", + "c", "-l", - "dora_operator_api_c", - "-L", - root.join("target").join("debug").to_str().unwrap(), - ], + "m", + ] + } else { + panic!("unsupported target platform") + }; + cmd!( + sh, + "clang++ node-rust-api/main.cc build/node-bridge.cc -std=c++17 -l dora_node_api_cxx {args...} -L {target_debug} --output build/node_rust_api{EXE_SUFFIX}" ) - .await?; - - let dataflow = Path::new("dataflow.yml").to_owned(); - build_package("dora-runtime").await?; - run_dataflow(&dataflow).await?; - - Ok(()) -} + .run()?; + cmd!( + sh, + "clang++ node-c-api/main.cc -std=c++17 -l dora_node_api_c {args...} -L {target_debug} --output build/node_c_api{EXE_SUFFIX}" + ) + .run()?; -async fn build_package(package: &str) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("build"); - cmd.arg("--package").arg(package); - if !cmd.status().await?.success() { - bail!("failed to build {package}"); - }; - Ok(()) -} + // compile operators + let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; + cmd!( + sh, + "clang++ -c build/operator-bridge.cc -std=c++17 -o build/operator-bridge.o {operator_args...}" + ) + .run()?; + cmd!( + sh, + "clang++ -c operator-rust-api/operator.cc -o operator-rust-api/operator.o -std=c++17 {operator_args...}" + ) + .run()?; + cmd!( + sh, + "clang++ -c operator-c-api/operator.cc -o operator-c-api/operator.o -std=c++17 {operator_args...}" + ) + .run()?; -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) -} + // link operators + cmd!( + sh, + "clang++ -shared operator-rust-api/operator.o build/operator-bridge.o -l dora_operator_api_cxx {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_rust_api{DLL_SUFFIX}" + ) + .run()?; + cmd!( + sh, + "clang++ -shared operator-c-api/operator.o -l dora_operator_api_c {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_c_api{DLL_SUFFIX}" + ) + .run()?; -async fn build_cxx_node( - root: &Path, - paths: &[&Path], - out_name: &str, - args: &[&str], -) -> eyre::Result<()> { - let mut clang = tokio::process::Command::new("clang++"); - clang.args(paths); - clang.arg("-std=c++17"); - #[cfg(target_os = "linux")] - { - clang.arg("-l").arg("m"); - clang.arg("-l").arg("rt"); - clang.arg("-l").arg("dl"); - clang.arg("-pthread"); - } - #[cfg(target_os = "windows")] - { - clang.arg("-ladvapi32"); - clang.arg("-luserenv"); - clang.arg("-lkernel32"); - clang.arg("-lws2_32"); - clang.arg("-lbcrypt"); - clang.arg("-lncrypt"); - clang.arg("-lschannel"); - clang.arg("-lntdll"); - clang.arg("-liphlpapi"); + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - clang.arg("-lcfgmgr32"); - clang.arg("-lcredui"); - clang.arg("-lcrypt32"); - clang.arg("-lcryptnet"); - clang.arg("-lfwpuclnt"); - clang.arg("-lgdi32"); - clang.arg("-lmsimg32"); - clang.arg("-lmswsock"); - clang.arg("-lole32"); - clang.arg("-lopengl32"); - clang.arg("-lsecur32"); - clang.arg("-lshell32"); - clang.arg("-lsynchronization"); - clang.arg("-luser32"); - clang.arg("-lwinspool"); + // start running the dataflow.yml + cmd!(sh, "{dora} start dataflow.yml --attach").run()?; - clang.arg("-Wl,-nodefaultlib:libcmt"); - clang.arg("-D_DLL"); - clang.arg("-lmsvcrt"); - } - #[cfg(target_os = "macos")] - { - clang.arg("-framework").arg("CoreServices"); - clang.arg("-framework").arg("Security"); - clang.arg("-l").arg("System"); - clang.arg("-l").arg("resolv"); - clang.arg("-l").arg("pthread"); - clang.arg("-l").arg("c"); - clang.arg("-l").arg("m"); - } - clang.args(args); - clang.arg("-L").arg(root.join("target").join("debug")); - clang - .arg("--output") - .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); - if let Some(parent) = paths[0].parent() { - clang.current_dir(parent); - } + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; - if !clang.status().await?.success() { - bail!("failed to compile c++ node"); - }; Ok(()) } -async fn build_cxx_operator( - paths: &[&Path], - out_name: &str, - link_args: &[&str], -) -> eyre::Result<()> { - let mut object_file_paths = Vec::new(); - - for path in paths { - let mut compile = tokio::process::Command::new("clang++"); - compile.arg("-c").arg(path); - compile.arg("-std=c++17"); - let object_file_path = path.with_extension("o"); - compile.arg("-o").arg(&object_file_path); - #[cfg(unix)] - compile.arg("-fPIC"); - if let Some(parent) = path.parent() { - compile.current_dir(parent); - } - if !compile.status().await?.success() { - bail!("failed to compile cxx operator"); - }; - object_file_paths.push(object_file_path); - } - - let mut link = tokio::process::Command::new("clang++"); - link.arg("-shared").args(&object_file_paths); - link.args(link_args); - #[cfg(target_os = "windows")] - { - link.arg("-ladvapi32"); - link.arg("-luserenv"); - link.arg("-lkernel32"); - link.arg("-lws2_32"); - link.arg("-lbcrypt"); - link.arg("-lncrypt"); - link.arg("-lschannel"); - link.arg("-lntdll"); - link.arg("-liphlpapi"); - - link.arg("-lcfgmgr32"); - link.arg("-lcredui"); - link.arg("-lcrypt32"); - link.arg("-lcryptnet"); - link.arg("-lfwpuclnt"); - link.arg("-lgdi32"); - link.arg("-lmsimg32"); - link.arg("-lmswsock"); - link.arg("-lole32"); - link.arg("-lopengl32"); - link.arg("-lsecur32"); - link.arg("-lshell32"); - link.arg("-lsynchronization"); - link.arg("-luser32"); - link.arg("-lwinspool"); - - link.arg("-Wl,-nodefaultlib:libcmt"); - link.arg("-D_DLL"); - link.arg("-lmsvcrt"); - link.arg("-fms-runtime-lib=static"); - } - #[cfg(target_os = "macos")] - { - link.arg("-framework").arg("CoreServices"); - link.arg("-framework").arg("Security"); - link.arg("-l").arg("System"); - link.arg("-l").arg("resolv"); - link.arg("-l").arg("pthread"); - link.arg("-l").arg("c"); - link.arg("-l").arg("m"); - } - link.arg("-o") - .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); - if let Some(parent) = paths[0].parent() { - link.current_dir(parent); - } - if !link.status().await?.success() { - bail!("failed to create shared library from cxx operator (c api)"); - }; +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) +} - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 6fba72165a330272f2fcd1b4f0a23f3e2efda419 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 15:22:55 +0200 Subject: [PATCH 13/16] Remove tokio dependency from cmake example --- examples/cmake-dataflow/run.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/cmake-dataflow/run.rs b/examples/cmake-dataflow/run.rs index 6509286ed..eeb4013f4 100644 --- a/examples/cmake-dataflow/run.rs +++ b/examples/cmake-dataflow/run.rs @@ -3,8 +3,7 @@ use eyre::{Context, ContextCompat}; use std::path::{Path, PathBuf}; use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { +fn main() -> eyre::Result<()> { set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?; if cfg!(windows) { From 1c3488d977a45f69d88e613f07f0a92edb49e7e7 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 15:34:45 +0200 Subject: [PATCH 14/16] Migrate Python operator example to xshell --- examples/python-operator-dataflow/run.rs | 173 ++++++++++++----------- 1 file changed, 91 insertions(+), 82 deletions(-) diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index d6534cf6b..f283f1c97 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -1,29 +1,72 @@ -use dora_core::{get_pip_path, get_python_path, run}; -use dora_tracing::set_up_tracing; -use eyre::{bail, ContextCompat, WrapErr}; -use std::path::Path; +use dora_core::get_python_path; +use eyre::{ContextCompat, WrapErr}; +use std::path::{Path, PathBuf}; +use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("python-operator-dataflow-runner")?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; - - run( - get_python_path().context("Could not get python binary")?, - &["-m", "venv", "../.env"], - None, - ) - .await - .context("failed to create venv")?; - let venv = &root.join("examples").join(".env"); - std::env::set_var( + // prepare Python virtual environment + prepare_venv(&sh)?; + + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; + + // install/upgrade pip, then install requirements + cmd!(sh, "python -m pip install --upgrade pip").run()?; + cmd!(sh, "pip install -r requirements.txt").run()?; + + // build the dora Python package (you can skip this if you installed the Python dora package) + { + let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("apis") + .join("python") + .join("node"); + let _dir = sh.push_dir(python_node_api_dir); + cmd!(sh, "maturin develop").run()?; + } + + let dataflow = if std::env::var("CONDA_EXE").is_ok() { + Path::new("dataflow.yml") + } else { + Path::new("dataflow_conda.yml") + }; + + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; + + // start running the dataflow.yml -> outputs the UUID assigned to the dataflow + let output = cmd!(sh, "{dora} start {dataflow} --attach").read_stderr()?; + let uuid = output.lines().next().context("no output")?; + + // stop the dora daemon and coordinator again + cmd!(sh, "{dora} destroy").run()?; + + // verify that the node output was written to `out` + sh.change_dir("out"); + sh.change_dir(uuid); + let sink_output = sh.read_file("log_object_detection.txt")?; + if sink_output.lines().count() < 50 { + eyre::bail!("object dectection node did not receive the expected number of messages") + } + + Ok(()) +} + +/// Prepares a Python virtual environment. +/// +/// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` +/// if you're running bash. +fn prepare_venv(sh: &Shell) -> eyre::Result<()> { + let python = get_python_path().context("Could not get python binary")?; + cmd!(sh, "{python} -m venv ../.env").run()?; + let venv = sh.current_dir().parent().unwrap().join(".env"); + sh.set_var( "VIRTUAL_ENV", venv.to_str().context("venv path not valid unicode")?, ); - let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 let venv_bin = if cfg!(windows) { @@ -31,70 +74,36 @@ async fn main() -> eyre::Result<()> { } else { venv.join("bin") }; + let path_separator = if cfg!(windows) { ';' } else { ':' }; - if cfg!(windows) { - std::env::set_var( - "PATH", - format!( - "{};{orig_path}", - venv_bin.to_str().context("venv path not valid unicode")? - ), - ); - } else { - std::env::set_var( - "PATH", - format!( - "{}:{orig_path}", - venv_bin.to_str().context("venv path not valid unicode")? - ), - ); - } - - run( - get_python_path().context("Could not get pip binary")?, - &["-m", "pip", "install", "--upgrade", "pip"], - None, - ) - .await - .context("failed to install pip")?; - run( - get_pip_path().context("Could not get pip binary")?, - &["install", "-r", "requirements.txt"], - None, - ) - .await - .context("pip install failed")?; - - run( - "maturin", - &["develop"], - Some(&root.join("apis").join("python").join("node")), - ) - .await - .context("maturin develop failed")?; - - if std::env::var("CONDA_EXE").is_ok() { - let dataflow = Path::new("dataflow.yml"); - run_dataflow(dataflow).await?; - } else { - let dataflow = Path::new("dataflow_conda.yml"); - run_dataflow(dataflow).await?; - } + sh.set_var( + "PATH", + format!( + "{}{path_separator}{}", + venv_bin.to_str().context("venv path not valid unicode")?, + std::env::var("PATH")? + ), + ); Ok(()) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) +} + +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From fcef458aa35abd58347237e738b69c5602d1eb25 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 15:49:08 +0200 Subject: [PATCH 15/16] Migrate rust-dataflow-url example to `xshell` --- examples/rust-dataflow-url/run.rs | 63 ++++++++++++++----------------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index 6f511970a..64efaa5c2 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -1,46 +1,41 @@ -use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; use std::path::Path; +use std::path::PathBuf; +use xshell::{cmd, Shell}; -#[tokio::main] -async fn main() -> eyre::Result<()> { - set_up_tracing("rust-dataflow-url-runner").wrap_err("failed to set up tracing")?; +fn main() -> eyre::Result<()> { + // create a new shell in this folder + let sh = prepare_shell()?; + // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) + let dora = prepare_dora(&sh)?; - let root = Path::new(env!("CARGO_MANIFEST_DIR")); - std::env::set_current_dir(root.join(file!()).parent().unwrap()) - .wrap_err("failed to set working dir")?; + // build the dataflow using `dora build` + cmd!(sh, "{dora} build dataflow.yml").run()?; - let dataflow = Path::new("dataflow.yml"); - build_dataflow(dataflow).await?; + // start up the dora daemon and coordinator + cmd!(sh, "{dora} up").run()?; - run_dataflow(dataflow).await?; + // start running the dataflow.yml + cmd!(sh, "{dora} start dataflow.yml --attach").run()?; Ok(()) } -async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--").arg("build").arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to build dataflow"); - }; - Ok(()) +/// Prepares a shell and set the working directory to the parent folder of this file. +/// +/// You can use your system shell instead (e.g. `bash`); +fn prepare_shell() -> Result { + let sh = Shell::new()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + sh.change_dir(root.join(file!()).parent().unwrap()); + Ok(sh) } -async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { - let cargo = std::env::var("CARGO").unwrap(); - let mut cmd = tokio::process::Command::new(&cargo); - cmd.arg("run"); - cmd.arg("--package").arg("dora-cli"); - cmd.arg("--") - .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow); - if !cmd.status().await?.success() { - bail!("failed to run dataflow"); - }; - Ok(()) +/// Build the `dora` command-line executable from this repo. +/// +/// You can skip this step and run `cargo install dora-cli --locked` instead. +fn prepare_dora(sh: &Shell) -> eyre::Result { + cmd!(sh, "cargo build --package dora-cli").run()?; + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let dora = root.join("target").join("debug").join("dora"); + Ok(dora) } From 22f2956b84aa6a11379b63662e821d2e3bbe1e51 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 28 May 2024 15:49:25 +0200 Subject: [PATCH 16/16] Remove dev-dependency on `tokio` --- Cargo.lock | 2 -- Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de50acd18..a13f77b5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2352,8 +2352,6 @@ dependencies = [ "eyre", "futures", "serde_yaml 0.8.26", - "tokio", - "tokio-stream", "tracing", "uuid", "xshell", diff --git a/Cargo.toml b/Cargo.toml index 0154fb172..d187a0e74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,7 +86,6 @@ ros2-examples = [] [dev-dependencies] eyre = "0.6.8" -tokio = "1.24.2" dora-coordinator = { workspace = true } dora-core = { workspace = true } dora-tracing = { workspace = true } @@ -96,7 +95,6 @@ serde_yaml = "0.8.23" uuid = { version = "1.7", features = ["v7", "serde"] } tracing = "0.1.36" futures = "0.3.25" -tokio-stream = "0.1.11" xshell = "0.2.6" [[example]]