Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat(spu): added smart engine memory limit #3407

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/fluvio-protocol/src/link/error_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ pub enum ErrorCode {
#[fluvio(tag = 6007)]
#[error("SmartModule look_back error: {0}")]
SmartModuleLookBackError(String),
#[fluvio(tag = 6008)]
#[error(
"SmartModule memory limit exceeded: requested {requested} bytes, max allowed {max} bytes"
)]
SmartModuleMemoryLimitExceeded { requested: u64, max: u64 },

// TableFormat Errors
#[fluvio(tag = 7000)]
Expand Down
6 changes: 6 additions & 0 deletions crates/fluvio-smartengine/src/engine/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ pub enum EngineError {
UnknownSmartModule,
#[error("Failed to instantiate: {0}")]
Instantiate(anyhow::Error),
#[error("Requested memory {requested}b exceeded max allowed {max}b")]
StoreMemoryExceeded {
current: usize,
requested: usize,
max: usize,
},
}
1 change: 1 addition & 0 deletions crates/fluvio-smartengine/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use config::{
SmartModuleInitialData, Lookback,
};
mod error;
pub use error::EngineError;

#[cfg(test)]
mod fixture;
Expand Down
153 changes: 149 additions & 4 deletions crates/fluvio-smartengine/src/engine/wasmtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ use crate::engine::config::Lookback;
use super::init::SmartModuleInit;
use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};

use super::limiter::StoreResourceLimiter;
use super::look_back::SmartModuleLookBack;
use super::metrics::SmartModuleChainMetrics;
use super::state::WasmState;
use super::transforms::create_transform;

// 1 GB
const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000;

#[derive(Clone)]
pub struct SmartEngine(Engine);

Expand All @@ -30,8 +34,8 @@ impl SmartEngine {
Self(Engine::new(&config).expect("Config is static"))
}

pub(crate) fn new_state(&self) -> WasmState {
WasmState::new(&self.0)
pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState {
WasmState::new(&self.0, store_limiter)
}
}

Expand All @@ -42,9 +46,9 @@ impl Debug for SmartEngine {
}

/// Building SmartModule
#[derive(Default)]
pub struct SmartModuleChainBuilder {
smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
store_limiter: StoreResourceLimiter,
}

impl SmartModuleChainBuilder {
Expand All @@ -53,10 +57,14 @@ impl SmartModuleChainBuilder {
self.smart_modules.push((config, bytes))
}

pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) {
self.store_limiter.set_memory_size(max_memory_bytes);
}

/// stop adding smartmodule and return SmartModuleChain that can be executed
pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
let mut instances = Vec::with_capacity(self.smart_modules.len());
let mut state = engine.new_state();
let mut state = engine.new_state(self.store_limiter);
for (config, bytes) in self.smart_modules {
let module = Module::new(&engine.0, bytes)?;
let version = config.version();
Expand All @@ -81,6 +89,17 @@ impl SmartModuleChainBuilder {
}
}

impl Default for SmartModuleChainBuilder {
fn default() -> Self {
let mut store_limiter = StoreResourceLimiter::default();
store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT);
Self {
smart_modules: Default::default(),
store_limiter,
}
}
}

impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
fn from(pair: (SmartModuleConfig, T)) -> Self {
let mut result = Self::default();
Expand Down Expand Up @@ -210,6 +229,7 @@ mod chaining_test {
use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record};

use crate::engine::config::Lookback;
use crate::engine::error::EngineError;

use super::super::{
SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
Expand Down Expand Up @@ -438,4 +458,129 @@ mod chaining_test {
assert_eq!(output.successes.len(), 1);
assert_eq!(output.successes[0].value().to_string(), "input");
}

#[ignore]
#[test]
fn test_unsufficient_memory_to_instantiate() {
//given
let engine = SmartEngine::new();
let mut chain_builder = SmartModuleChainBuilder::default();
let max_memory = 1_000; // 1 kb

chain_builder.add_smart_module(
SmartModuleConfig::builder()
.lookback(Some(Lookback::Last(1)))
.build()
.unwrap(),
read_wasm_module(SM_FILTER_LOOK_BACK),
);
chain_builder.set_store_memory_limit(max_memory);

// when
let res = chain_builder.initialize(&engine);

// then
assert!(res.is_err());
let err = res
.unwrap_err()
.downcast::<EngineError>()
.expect("EngineError expected");
assert!(matches!(
err,
EngineError::StoreMemoryExceeded {
current: _,
requested: _,
max
}
if max == max_memory
))
}

#[ignore]
#[test]
fn test_look_back_unsufficient_memory() {
//given
let engine = SmartEngine::new();
let mut chain_builder = SmartModuleChainBuilder::default();
let metrics = SmartModuleChainMetrics::default();
let max_memory = 1_000_000 * 2; // 2mb

chain_builder.add_smart_module(
SmartModuleConfig::builder()
.lookback(Some(Lookback::Last(1000)))
.build()
.unwrap(),
read_wasm_module(SM_FILTER_LOOK_BACK),
);
chain_builder.set_store_memory_limit(max_memory);

let mut chain = chain_builder
.initialize(&engine)
.expect("failed to build chain");

// when
let res = fluvio_future::task::run_block_on(chain.look_back(
|_| {
let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
async { Ok(res) }
},
&metrics,
));

// then
assert!(res.is_err());
let err = res
.unwrap_err()
.downcast::<EngineError>()
.expect("EngineError expected");
assert!(matches!(
err,
EngineError::StoreMemoryExceeded {
current: _,
requested: _,
max
}
if max == max_memory
))
}

#[ignore]
#[test]
fn test_process_unsufficient_memory() {
//given
let engine = SmartEngine::new();
let mut chain_builder = SmartModuleChainBuilder::default();
let metrics = SmartModuleChainMetrics::default();
let max_memory = 1_000_000 * 2; // 2mb

chain_builder.add_smart_module(
SmartModuleConfig::builder().build().unwrap(),
read_wasm_module(SM_FILTER_LOOK_BACK),
);
chain_builder.set_store_memory_limit(max_memory);

let mut chain = chain_builder
.initialize(&engine)
.expect("failed to build chain");

// when
let input: Vec<Record> = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
let res = chain.process(SmartModuleInput::try_from(input).expect("input"), &metrics);

// then
assert!(res.is_err());
let err = res
.unwrap_err()
.downcast::<EngineError>()
.expect("EngineError expected");
assert!(matches!(
err,
EngineError::StoreMemoryExceeded {
current: _,
requested: _,
max
}
if max == max_memory
))
}
}
5 changes: 4 additions & 1 deletion crates/fluvio-smartengine/src/engine/wasmtime/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ impl SmartModuleInstanceContext {
debug!("instantiating WASMtime");
let instance = state
.instantiate(&module, copy_records_fn)
.map_err(EngineError::Instantiate)?;
.map_err(|e| match e.downcast::<EngineError>() {
Ok(e) => e,
Err(e) => EngineError::Instantiate(e),
})?;
Ok(Self {
instance,
records_cb,
Expand Down
48 changes: 48 additions & 0 deletions crates/fluvio-smartengine/src/engine/wasmtime/limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use wasmtime::ResourceLimiter;

use crate::engine::error::EngineError;

#[derive(Debug, Default)]
pub(crate) struct StoreResourceLimiter {
pub memory_size: Option<usize>,
}

impl StoreResourceLimiter {
pub(crate) fn set_memory_size(&mut self, memory_size: usize) -> &mut Self {
self.memory_size = Some(memory_size);
self
}
}

impl ResourceLimiter for StoreResourceLimiter {
fn memory_growing(
&mut self,
current: usize,
desired: usize,
maximum: Option<usize>,
) -> anyhow::Result<bool> {
let allow = match self.memory_size {
Some(limit) if desired > limit => false,
_ => !matches!(maximum, Some(max) if desired > max),
};
if !allow {
Err(EngineError::StoreMemoryExceeded {
current,
requested: desired,
max: self.memory_size.or(maximum).unwrap_or_default(),
}
.into())
} else {
Ok(allow)
}
}

fn table_growing(
&mut self,
_current: u32,
_desired: u32,
_maximum: Option<u32>,
) -> anyhow::Result<bool> {
Ok(true)
}
}
1 change: 1 addition & 0 deletions crates/fluvio-smartengine/src/engine/wasmtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod state;
pub(crate) mod engine;
pub(crate) mod instance;
pub(crate) mod look_back;
pub(crate) mod limiter;
pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance};

use super::*;
Loading