Skip to content

Commit

Permalink
chore: update sm readme
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Dec 13, 2024
1 parent 4faf9e9 commit 49d2cf4
Showing 1 changed file with 51 additions and 25 deletions.
76 changes: 51 additions & 25 deletions crates/fluvio-smartmodule/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,51 @@ For a quick setup using `cargo-generate`, see [the SmartModule template].
name = "fluvio-wasm-filter"
version = "0.1.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
edition = "2018"
edition = "2021"

[lib]
crate-type = ['cdylib']

[dependencies]
fluvio-smartmodule = "0.1.0"
fluvio-smartmodule = "0.8.0"
```

### Init

Init functions are optional but serve to configure any state the SmartModule requires at the beginning of its execution. Could be helpful for preparing the SmartModule's operational context. The example below demonstrates an init function that sets a key for the SmartModule to use:

```rust
use std::sync::OnceLock;

use fluvio_smartmodule::{
smartmodule, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError},
};

static CRITERIA: OnceLock<String> = OnceLock::new();

#[smartmodule(init)]
fn init(params: SmartModuleExtraParams) -> Result<()> {
if let Some(key) = params.get("key") {
CRITERIA
.set(key.clone())
.map_err(|err| eyre!("failed setting key: {:#?}", err))
} else {
Err(SmartModuleInitError::MissingParam("key".to_string()).into())
}
}
```

### Filtering

For filtering, write your smartmodule using `#[smartmodule(filter)]` on your
top-level function. Consider this the "main" function of your SmartModule.

```text
use fluvio_smartmodule::{smartmodule, Record, Result};
```rust
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains('a'))
}
Expand All @@ -50,11 +76,11 @@ This filter will keep only records whose contents contain the letter `a`.

Mapping functions use `#[smartmodule(map)]`, and are also a top-level entrypoint.

```text
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
```rust
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();

let string = std::str::from_utf8(record.value.as_ref())?;
Expand All @@ -77,11 +103,11 @@ accumulator value will be passed to the next invocation of `aggregate` with
the next record value. The resulting stream of values is the output accumulator
from each step.

```text
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
```rust
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?;
let next = std::str::from_utf8(current.value.as_ref())?;
acc.push_str(next);
Expand All @@ -98,26 +124,26 @@ This can be used to chop up input records that logically represent more than one
and turn them into independent records. Below is an example where we take JSON arrays and
convert them into a stream of the inner JSON objects.

```ignore
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
```rust
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Read the input record as a JSON array
let array = serde_json::from_slice::<Vec<serde_json::Value>>(record.value.as_ref())?;
// Convert each individual value from the array into its own JSON string
let strings = array
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a JSON array with any kind of values inside
let array: Vec<serde_json::Value> = serde_json::from_slice(record.value.as_ref())?;

// Convert each JSON value from the array back into a JSON string
let strings: Vec<String> = array
.into_iter()
.map(|value| serde_json::to_string(&value))
.collect::<core::result::Result<Vec<String>, _>>()?;
// Return a list of records to be flattened into the output stream
let kvs = strings
.collect::<core::result::Result<_, _>>()?;

// Create one record from each JSON string to send
let records: Vec<(Option<RecordData>, RecordData)> = strings
.into_iter()
.map(|s| (None, RecordData::from(s)))
.collect::<Vec<_>>();
Ok(kvs)
.collect();
Ok(records)
}
```

Expand Down

0 comments on commit 49d2cf4

Please sign in to comment.