Skip to content

Source (v0.4.0)

J-Loudet edited this page Jan 2, 2023 · 7 revisions

In Zenoh-Flow, a Source fetches data from the outside world in order to process them. A prime example would be the temperature and the humidity captured by a sensor.

There are — for now — two ways to create a Source in Zenoh-Flow, depending on the programming language you favor:

  • A shared library (Rust).
  • A script (Python).

For Zenoh-Flow to be able to load our Source, it must be accompanied by a descriptor.

Descriptor

The content of the descriptor is relatively straight-forward, it must specify:

  1. an id (for display purposes),
  2. [optional] some configuration,
  3. [optional] some vars,
  4. its output(s) --- i.e. the data it will produce,
  5. an uri --- i.e. where to find its actual implementation.

Below is a descriptor fit for the code we are going to write next:

id: my-source

# This configuration is not used and serves as an example.
configuration:
  value: not-used

# This vars section is not used and serves as an example.
vars:
  FOO: not-used

outputs:
  - id: output
    type: String

# Linux:
uri: file:///absolute/path/to/the/implementation/libmy_source.so
# MacOS:
# uri: file:///absolute/path/to/the/implementation/libmy_source.dylib
# Windows:
# uri: file:///absolute/path/to/the/implementation/my_source.dll

Shared library

Assuming you want to create a Source called my-source, enter the following in a terminal:

cargo new --lib my-source

Modify the Cargo.toml to add these dependencies and tell rustc that you want a library that can be dynamically loaded:

[dependencies]
async-trait = "0.1.50"  # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow = { git = "https://github.com/ZettaScaleLabs/zenoh-flow.git", branch = "dev/v0.4.0" }

[lib]
crate-type=["cdylib"]

⚠️ Once we release 0.4.0 on crates.io, the Zenoh-Flow dependency will simply be zenoh-flow = "0.4".

Now modify lib.rs to (i) implement the Zenoh-Flow traits and (ii) include your logic.

Below you can find commented boilerplate code to do (i).

use async_trait::async_trait;
use zenoh_flow::prelude::*;

// MySource is where you implement your business' logic. `Output` is a structure provided by
// Zenoh-Flow through which you send `Data` to the next node in your data flow.
//
// The way to pass an `Output` is through its Constructor --- see below.
//
// That structure is the place where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_source` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_source]
struct MySource {
    output: Output<String>,
}

#[async_trait]
impl Node for MySource {
    async fn iteration(&self) -> Result<()> {
        // Add your business logic here, the ultimate step is to produce a structure that implements
        // the `ZFData` trait.
        // Zenoh-Flow implements this trait on some basic types, meaning that in the case of a String
        // it can be sent as is.
        let data = "Hello, World!".to_string();
        self.output.send(data, None).await
    }
}


#[async_trait]
impl Source for MySource {
    async fn new(
        // The `context` provides information about the Zenoh-Flow daemon on which the generated
        // node MySource will be executed.
        context: Context,
        // The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
        // dictionary and allows accessing configuration variables defined in the descriptor.
        configuration: Option<Configuration>,
        // The `Outputs` are encapsulated `flume::Senders` that were created by Zenoh-Flow. It is
        // a HashMap whose keys match what was defined in the descriptor file.
        mut outputs: Outputs,
    ) -> Result<Self> {
        let output = outputs.take("output").expect("No output named 'output' found");
        Ok(MySource { output })
    }
}

(1): Configuration (2): serde_json::Value

Python script

TODO: Add a reference to the auto-generated Python docs.

Clone this wiki locally