Skip to content

Commit

Permalink
feat(smartmodule): added chaining support to fluvio-cli (#2812)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Galibey committed Nov 15, 2022
1 parent 27988aa commit 945c515
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 3 deletions.
4 changes: 2 additions & 2 deletions crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ k8s = [
"k8-config",
"fluvio-cluster",
]
smartengine = ["fluvio-smartengine"]
smartengine = ["fluvio-smartengine/default"]
producer-file-io = ["fluvio-cli-common/file-records"]

[dependencies]
Expand Down Expand Up @@ -89,7 +89,7 @@ fluvio-extension-common = { path = "../fluvio-extension-common", features = ["ta
fluvio-channel = { path = "../fluvio-channel" }
fluvio-cli-common = { path = "../fluvio-cli-common" }
fluvio-hub-util = { path = "../fluvio-hub-util" }
fluvio-smartengine = { path = "../fluvio-smartengine", optional = true }
fluvio-smartengine = { path = "../fluvio-smartengine", features = ["transformation"], default-features = false}
fluvio-protocol = { path = "../fluvio-protocol", features=["record","api"] }
fluvio-smartmodule = { path = "../fluvio-smartmodule", default-features = false }
fluvio-controlplane-metadata = { path = "../fluvio-controlplane-metadata", features = ["smartmodule"] }
Expand Down
47 changes: 47 additions & 0 deletions crates/fluvio-cli/src/client/consume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod cmd {
};
use super::super::ClientCmd;
use super::table_format::{TableEventResponse, TableModel};
use fluvio_smartengine::transformation::TransformationConfig;

const USER_TEMPLATE: &str = "user_template";

Expand Down Expand Up @@ -194,6 +195,15 @@ mod cmd {
/// read_uncommitted (ReadUncommitted) - consume all records accepted by leader.
#[clap(long, value_parser=parse_isolation)]
pub isolation: Option<Isolation>,

/// (Optional) Path to a file with transformation specification.
#[clap(long, conflicts_with = "smartmodule_group")]
pub transforms_file: Option<PathBuf>,

/// (Optional) Transformation specification as JSON formatted string.
/// E.g. fluvio consume topic-name --transform='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'
#[clap(long, short, conflicts_with_all = &["smartmodule_group", "transforms_file"])]
pub transform: Vec<String>,
}

fn parse_key_val(s: &str) -> Result<(String, String)> {
Expand Down Expand Up @@ -316,6 +326,23 @@ mod cmd {
self.smart_module_ctx(),
initial_param,
)?]
} else if !self.transform.is_empty() {
let config =
TransformationConfig::try_from(self.transform.clone()).map_err(|err| {
CliError::InvalidArg(format!(
"unable to parse `transform` argument: {}",
err
))
})?;
create_smartmodule_list(config)?
} else if let Some(transforms_file) = &self.transforms_file {
let config = TransformationConfig::from_file(transforms_file).map_err(|err| {
CliError::InvalidArg(format!(
"unable to process `transforms_file` argument: {}",
err
))
})?;
create_smartmodule_list(config)?
} else {
Vec::new()
};
Expand Down Expand Up @@ -726,6 +753,24 @@ mod cmd {
})
}

/// create list of smartmodules from a list of transformations
fn create_smartmodule_list(config: TransformationConfig) -> Result<Vec<SmartModuleInvocation>> {
Ok(config
.transforms
.into_iter()
.map(|t| SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(t.uses),
kind: SmartModuleKind::Generic(Default::default()),
params: t
.with
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<std::collections::BTreeMap<String, String>>()
.into(),
})
.collect())
}

// Uses clap::ArgEnum to choose possible variables

#[derive(ValueEnum, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -775,6 +820,8 @@ mod cmd {
params: Default::default(),
isolation: Default::default(),
beginning: Default::default(),
transforms_file: Default::default(),
transform: Default::default(),
}
}
#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/smartmodule-development-kit/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct TestOpt {
)]
params: Vec<(String, String)>,

/// (Optional) File path to transformation speciafication.
/// (Optional) File path to transformation specification.
#[clap(long, group = "TestSmartModule")]
transforms_file: Option<PathBuf>,

Expand Down

2 comments on commit 945c515

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 945c515 Previous: 27988aa Ratio
encode wasm file 461222 ns/iter (± 37142) 345203 ns/iter (± 46376) 1.34

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 945c515 Previous: 27988aa Ratio
vecu8 encoding 332441 ns/iter (± 682404) 451652 ns/iter (± 38039) 0.74
vecu8 decoding 623098 ns/iter (± 2241) 446847 ns/iter (± 381) 1.39
bytebuf encoding 7742 ns/iter (± 6) 7470 ns/iter (± 115) 1.04
bytebuf decoding 7785 ns/iter (± 13) 6351 ns/iter (± 140) 1.23

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.