diff --git a/crates/fluvio-smartmodule/README.md b/crates/fluvio-smartmodule/README.md index 3dbf6407152..cdf65ed5313 100644 --- a/crates/fluvio-smartmodule/README.md +++ b/crates/fluvio-smartmodule/README.md @@ -20,13 +20,39 @@ For a quick setup using `cargo-generate`, see [the SmartModule template]. name = "fluvio-wasm-filter" version = "0.1.0" authors = ["Fluvio Contributors "] -edition = "2018" +edition = "2021" [lib] crate-type = ['cdylib'] [dependencies] -fluvio-smartmodule = "0.1.0" +fluvio-smartmodule = "0.8.0" +``` + +### Init + +Init functions are optional but serve to configure any state the SmartModule requires at the beginning of its execution. Could be helpful for preparing the SmartModule's operational context. The example below demonstrates an init function that sets a key for the SmartModule to use: + +```rust +use std::sync::OnceLock; + +use fluvio_smartmodule::{ + smartmodule, Result, eyre, + dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError}, +}; + +static CRITERIA: OnceLock = OnceLock::new(); + +#[smartmodule(init)] +fn init(params: SmartModuleExtraParams) -> Result<()> { + if let Some(key) = params.get("key") { + CRITERIA + .set(key.clone()) + .map_err(|err| eyre!("failed setting key: {:#?}", err)) + } else { + Err(SmartModuleInitError::MissingParam("key".to_string()).into()) + } +} ``` ### Filtering @@ -34,11 +60,11 @@ fluvio-smartmodule = "0.1.0" For filtering, write your smartmodule using `#[smartmodule(filter)]` on your top-level function. Consider this the "main" function of your SmartModule. -```text -use fluvio_smartmodule::{smartmodule, Record, Result}; +```rust +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result}; #[smartmodule(filter)] -pub fn filter(record: &Record) -> Result { +pub fn filter(record: &SmartModuleRecord) -> Result { let string = std::str::from_utf8(record.value.as_ref())?; Ok(string.contains('a')) } @@ -50,11 +76,11 @@ This filter will keep only records whose contents contain the letter `a`. Mapping functions use `#[smartmodule(map)]`, and are also a top-level entrypoint. -```text -use fluvio_smartmodule::{smartmodule, Record, RecordData, Result}; +```rust +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(map)] -pub fn map(record: &Record) -> Result<(Option, RecordData)> { +pub fn map(record: &SmartModuleRecord) -> Result<(Option, RecordData)> { let key = record.key.clone(); let string = std::str::from_utf8(record.value.as_ref())?; @@ -77,11 +103,11 @@ accumulator value will be passed to the next invocation of `aggregate` with the next record value. The resulting stream of values is the output accumulator from each step. -```text -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +```rust +use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData}; #[smartmodule(aggregate)] -pub fn aggregate(accumulator: RecordData, current: &Record) -> Result { +pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result { let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?; let next = std::str::from_utf8(current.value.as_ref())?; acc.push_str(next); @@ -98,26 +124,26 @@ This can be used to chop up input records that logically represent more than one and turn them into independent records. Below is an example where we take JSON arrays and convert them into a stream of the inner JSON objects. -```ignore -use fluvio_smartmodule::{smartmodule, Result, Record, RecordData}; +```rust +use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result}; #[smartmodule(array_map)] -pub fn array_map(record: &Record) -> Result, RecordData)>> { - // Read the input record as a JSON array - let array = serde_json::from_slice::>(record.value.as_ref())?; - - // Convert each individual value from the array into its own JSON string - let strings = array +pub fn array_map(record: &SmartModuleRecord) -> Result, RecordData)>> { + // Deserialize a JSON array with any kind of values inside + let array: Vec = serde_json::from_slice(record.value.as_ref())?; + + // Convert each JSON value from the array back into a JSON string + let strings: Vec = array .into_iter() .map(|value| serde_json::to_string(&value)) - .collect::, _>>()?; - - // Return a list of records to be flattened into the output stream - let kvs = strings + .collect::>()?; + + // Create one record from each JSON string to send + let records: Vec<(Option, RecordData)> = strings .into_iter() .map(|s| (None, RecordData::from(s))) - .collect::>(); - Ok(kvs) + .collect(); + Ok(records) } ```