From 7e9869e1d2ce951a55aebf96e01f669f9e89f7f1 Mon Sep 17 00:00:00 2001 From: Alexander Galibey Date: Thu, 20 Jul 2023 12:29:36 +0200 Subject: [PATCH] feat(spu): added smart engine memory limit --- Cargo.lock | 2 +- crates/fluvio-protocol/src/link/error_code.rs | 5 + crates/fluvio-smartengine/src/engine/error.rs | 6 + crates/fluvio-smartengine/src/engine/mod.rs | 1 + .../src/engine/wasmtime/engine.rs | 153 +++++++++++++++++- .../src/engine/wasmtime/instance.rs | 5 +- .../src/engine/wasmtime/limiter.rs | 48 ++++++ .../src/engine/wasmtime/mod.rs | 1 + .../src/engine/wasmtime/state.rs | 48 +++--- crates/fluvio-spu/src/config/cli.rs | 15 ++ crates/fluvio-spu/src/config/spu_config.rs | 17 ++ .../src/control_plane/dispatcher.rs | 4 +- .../src/services/public/produce_handler.rs | 27 ++-- .../src/services/public/tests/produce.rs | 144 ++++++++++++++++- .../src/services/public/tests/stream_fetch.rs | 2 +- crates/fluvio-spu/src/smartengine/chain.rs | 21 ++- crates/fluvio-spu/src/smartengine/context.rs | 17 +- crates/fluvio-spu/src/smartengine/mod.rs | 17 ++ crates/fluvio-types/Cargo.toml | 2 +- crates/fluvio-types/src/defaults.rs | 2 + 20 files changed, 487 insertions(+), 50 deletions(-) create mode 100644 crates/fluvio-smartengine/src/engine/wasmtime/limiter.rs diff --git a/Cargo.lock b/Cargo.lock index 36a058da77..5d88a560e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3236,7 +3236,7 @@ dependencies = [ [[package]] name = "fluvio-types" -version = "0.4.2" +version = "0.4.3" dependencies = [ "event-listener", "fluvio-future", diff --git a/crates/fluvio-protocol/src/link/error_code.rs b/crates/fluvio-protocol/src/link/error_code.rs index f5180d939d..3e0cd92915 100644 --- a/crates/fluvio-protocol/src/link/error_code.rs +++ b/crates/fluvio-protocol/src/link/error_code.rs @@ -153,6 +153,11 @@ pub enum ErrorCode { #[fluvio(tag = 6007)] #[error("SmartModule look_back error: {0}")] SmartModuleLookBackError(String), + #[fluvio(tag = 6008)] + #[error( + "SmartModule memory limit exceeded: requested {requested} bytes, max allowed {max} bytes" + )] + SmartModuleMemoryLimitExceeded { requested: u64, max: u64 }, // TableFormat Errors #[fluvio(tag = 7000)] diff --git a/crates/fluvio-smartengine/src/engine/error.rs b/crates/fluvio-smartengine/src/engine/error.rs index f8d4a7ec1e..f28ef24753 100644 --- a/crates/fluvio-smartengine/src/engine/error.rs +++ b/crates/fluvio-smartengine/src/engine/error.rs @@ -4,4 +4,10 @@ pub enum EngineError { UnknownSmartModule, #[error("Failed to instantiate: {0}")] Instantiate(anyhow::Error), + #[error("Requested memory {requested}b exceeded max allowed {max}b")] + StoreMemoryExceeded { + current: usize, + requested: usize, + max: usize, + }, } diff --git a/crates/fluvio-smartengine/src/engine/mod.rs b/crates/fluvio-smartengine/src/engine/mod.rs index 3e6084bebd..1a1713fce1 100644 --- a/crates/fluvio-smartengine/src/engine/mod.rs +++ b/crates/fluvio-smartengine/src/engine/mod.rs @@ -6,6 +6,7 @@ pub use config::{ SmartModuleInitialData, Lookback, }; mod error; +pub use error::EngineError; #[cfg(test)] mod fixture; diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/engine.rs b/crates/fluvio-smartengine/src/engine/wasmtime/engine.rs index 825a48dcae..e4783b2ef3 100644 --- a/crates/fluvio-smartengine/src/engine/wasmtime/engine.rs +++ b/crates/fluvio-smartengine/src/engine/wasmtime/engine.rs @@ -14,11 +14,15 @@ use crate::engine::config::Lookback; use super::init::SmartModuleInit; use super::instance::{SmartModuleInstance, SmartModuleInstanceContext}; +use super::limiter::StoreResourceLimiter; use super::look_back::SmartModuleLookBack; use super::metrics::SmartModuleChainMetrics; use super::state::WasmState; use super::transforms::create_transform; +// 1 GB +const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000; + #[derive(Clone)] pub struct SmartEngine(Engine); @@ -30,8 +34,8 @@ impl SmartEngine { Self(Engine::new(&config).expect("Config is static")) } - pub(crate) fn new_state(&self) -> WasmState { - WasmState::new(&self.0) + pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState { + WasmState::new(&self.0, store_limiter) } } @@ -42,9 +46,9 @@ impl Debug for SmartEngine { } /// Building SmartModule -#[derive(Default)] pub struct SmartModuleChainBuilder { smart_modules: Vec<(SmartModuleConfig, Vec)>, + store_limiter: StoreResourceLimiter, } impl SmartModuleChainBuilder { @@ -53,10 +57,14 @@ impl SmartModuleChainBuilder { self.smart_modules.push((config, bytes)) } + pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) { + self.store_limiter.set_memory_size(max_memory_bytes); + } + /// stop adding smartmodule and return SmartModuleChain that can be executed pub fn initialize(self, engine: &SmartEngine) -> Result { let mut instances = Vec::with_capacity(self.smart_modules.len()); - let mut state = engine.new_state(); + let mut state = engine.new_state(self.store_limiter); for (config, bytes) in self.smart_modules { let module = Module::new(&engine.0, bytes)?; let version = config.version(); @@ -81,6 +89,17 @@ impl SmartModuleChainBuilder { } } +impl Default for SmartModuleChainBuilder { + fn default() -> Self { + let mut store_limiter = StoreResourceLimiter::default(); + store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT); + Self { + smart_modules: Default::default(), + store_limiter, + } + } +} + impl>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder { fn from(pair: (SmartModuleConfig, T)) -> Self { let mut result = Self::default(); @@ -210,6 +229,7 @@ mod chaining_test { use fluvio_smartmodule::{dataplane::smartmodule::SmartModuleInput, Record}; use crate::engine::config::Lookback; + use crate::engine::error::EngineError; use super::super::{ SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData, @@ -438,4 +458,129 @@ mod chaining_test { assert_eq!(output.successes.len(), 1); assert_eq!(output.successes[0].value().to_string(), "input"); } + + #[ignore] + #[test] + fn test_unsufficient_memory_to_instantiate() { + //given + let engine = SmartEngine::new(); + let mut chain_builder = SmartModuleChainBuilder::default(); + let max_memory = 1_000; // 1 kb + + chain_builder.add_smart_module( + SmartModuleConfig::builder() + .lookback(Some(Lookback::Last(1))) + .build() + .unwrap(), + read_wasm_module(SM_FILTER_LOOK_BACK), + ); + chain_builder.set_store_memory_limit(max_memory); + + // when + let res = chain_builder.initialize(&engine); + + // then + assert!(res.is_err()); + let err = res + .unwrap_err() + .downcast::() + .expect("EngineError expected"); + assert!(matches!( + err, + EngineError::StoreMemoryExceeded { + current: _, + requested: _, + max + } + if max == max_memory + )) + } + + #[ignore] + #[test] + fn test_look_back_unsufficient_memory() { + //given + let engine = SmartEngine::new(); + let mut chain_builder = SmartModuleChainBuilder::default(); + let metrics = SmartModuleChainMetrics::default(); + let max_memory = 1_000_000 * 2; // 2mb + + chain_builder.add_smart_module( + SmartModuleConfig::builder() + .lookback(Some(Lookback::Last(1000))) + .build() + .unwrap(), + read_wasm_module(SM_FILTER_LOOK_BACK), + ); + chain_builder.set_store_memory_limit(max_memory); + + let mut chain = chain_builder + .initialize(&engine) + .expect("failed to build chain"); + + // when + let res = fluvio_future::task::run_block_on(chain.look_back( + |_| { + let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect(); + async { Ok(res) } + }, + &metrics, + )); + + // then + assert!(res.is_err()); + let err = res + .unwrap_err() + .downcast::() + .expect("EngineError expected"); + assert!(matches!( + err, + EngineError::StoreMemoryExceeded { + current: _, + requested: _, + max + } + if max == max_memory + )) + } + + #[ignore] + #[test] + fn test_process_unsufficient_memory() { + //given + let engine = SmartEngine::new(); + let mut chain_builder = SmartModuleChainBuilder::default(); + let metrics = SmartModuleChainMetrics::default(); + let max_memory = 1_000_000 * 2; // 2mb + + chain_builder.add_smart_module( + SmartModuleConfig::builder().build().unwrap(), + read_wasm_module(SM_FILTER_LOOK_BACK), + ); + chain_builder.set_store_memory_limit(max_memory); + + let mut chain = chain_builder + .initialize(&engine) + .expect("failed to build chain"); + + // when + let input: Vec = (0..1000).map(|_| Record::new([0u8; 1_000])).collect(); + let res = chain.process(SmartModuleInput::try_from(input).expect("input"), &metrics); + + // then + assert!(res.is_err()); + let err = res + .unwrap_err() + .downcast::() + .expect("EngineError expected"); + assert!(matches!( + err, + EngineError::StoreMemoryExceeded { + current: _, + requested: _, + max + } + if max == max_memory + )) + } } diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/instance.rs b/crates/fluvio-smartengine/src/engine/wasmtime/instance.rs index 662ebeed1d..418341ca9e 100644 --- a/crates/fluvio-smartengine/src/engine/wasmtime/instance.rs +++ b/crates/fluvio-smartengine/src/engine/wasmtime/instance.rs @@ -135,7 +135,10 @@ impl SmartModuleInstanceContext { debug!("instantiating WASMtime"); let instance = state .instantiate(&module, copy_records_fn) - .map_err(EngineError::Instantiate)?; + .map_err(|e| match e.downcast::() { + Ok(e) => e, + Err(e) => EngineError::Instantiate(e), + })?; Ok(Self { instance, records_cb, diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/limiter.rs b/crates/fluvio-smartengine/src/engine/wasmtime/limiter.rs new file mode 100644 index 0000000000..1d9b1c69db --- /dev/null +++ b/crates/fluvio-smartengine/src/engine/wasmtime/limiter.rs @@ -0,0 +1,48 @@ +use wasmtime::ResourceLimiter; + +use crate::engine::error::EngineError; + +#[derive(Debug, Default)] +pub(crate) struct StoreResourceLimiter { + pub memory_size: Option, +} + +impl StoreResourceLimiter { + pub(crate) fn set_memory_size(&mut self, memory_size: usize) -> &mut Self { + self.memory_size = Some(memory_size); + self + } +} + +impl ResourceLimiter for StoreResourceLimiter { + fn memory_growing( + &mut self, + current: usize, + desired: usize, + maximum: Option, + ) -> anyhow::Result { + let allow = match self.memory_size { + Some(limit) if desired > limit => false, + _ => !matches!(maximum, Some(max) if desired > max), + }; + if !allow { + Err(EngineError::StoreMemoryExceeded { + current, + requested: desired, + max: self.memory_size.or(maximum).unwrap_or_default(), + } + .into()) + } else { + Ok(allow) + } + } + + fn table_growing( + &mut self, + _current: u32, + _desired: u32, + _maximum: Option, + ) -> anyhow::Result { + Ok(true) + } +} diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs b/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs index 4fee9a2344..a1e9f32772 100644 --- a/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs +++ b/crates/fluvio-smartengine/src/engine/wasmtime/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod state; pub(crate) mod engine; pub(crate) mod instance; pub(crate) mod look_back; +pub(crate) mod limiter; pub use engine::{SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance}; use super::*; diff --git a/crates/fluvio-smartengine/src/engine/wasmtime/state.rs b/crates/fluvio-smartengine/src/engine/wasmtime/state.rs index 2b6aa5eb08..6783694a6e 100644 --- a/crates/fluvio-smartengine/src/engine/wasmtime/state.rs +++ b/crates/fluvio-smartengine/src/engine/wasmtime/state.rs @@ -6,29 +6,31 @@ use wasmtime::{ StoreContextMut, }; +use super::limiter::StoreResourceLimiter; + // DO NOT INCREASE THIS VALUE HIGHER THAN i64::MAX / 2. // WASMTIME keeps fuel as i64 and has some strange behavior with `add_fuel` if trying to top fuel // up to a values close to i64:MAX const DEFAULT_FUEL: u64 = i64::MAX as u64 / 2; -#[cfg(not(feature = "wasi"))] -pub type WasmState = WasmStore<()>; - -#[cfg(feature = "wasi")] -pub type WasmState = WasmStore; - #[derive(Debug)] -pub struct WasmStore(Store); +pub struct WasmState(Store); + +pub struct Context { + limiter: StoreResourceLimiter, + #[cfg(feature = "wasi")] + wasi_ctx: wasmtime_wasi::WasiCtx, +} -impl AsContext for WasmStore { - type Data = T; +impl AsContext for WasmState { + type Data = Context; fn as_context(&self) -> StoreContext<'_, Self::Data> { self.0.as_context() } } -impl AsContextMut for WasmStore { +impl AsContextMut for WasmState { fn as_context_mut(&mut self) -> StoreContextMut<'_, Self::Data> { self.0.as_context_mut() } @@ -54,9 +56,10 @@ impl WasmState { } #[cfg(not(feature = "wasi"))] -impl WasmStore<()> { - pub(crate) fn new(engine: &Engine) -> Self { - let mut s = Self(Store::new(engine, ())); +impl WasmState { + pub(crate) fn new(engine: &Engine, limiter: StoreResourceLimiter) -> Self { + let mut s = Self(Store::new(engine, Context { limiter })); + s.0.limiter(|inner| &mut inner.limiter); s.top_up_fuel(); s } @@ -74,13 +77,14 @@ impl WasmStore<()> { } #[cfg(feature = "wasi")] -impl WasmStore { - pub(crate) fn new(engine: &Engine) -> Self { - let wasi = wasmtime_wasi::WasiCtxBuilder::new() +impl WasmState { + pub(crate) fn new(engine: &Engine, limiter: StoreResourceLimiter) -> Self { + let wasi_ctx = wasmtime_wasi::WasiCtxBuilder::new() .inherit_stderr() .inherit_stdout() .build(); - let mut s = Self(Store::new(engine, wasi)); + let mut s = Self(Store::new(engine, Context { limiter, wasi_ctx })); + s.0.limiter(|inner| &mut inner.limiter); s.top_up_fuel(); s } @@ -91,7 +95,7 @@ impl WasmStore { host_fn: impl IntoFunc<::Data, Params, Args>, ) -> Result { let mut linker = wasmtime::Linker::new(module.engine()); - wasmtime_wasi::add_to_linker(&mut linker, |c| c)?; + wasmtime_wasi::add_to_linker(&mut linker, |c: &mut Context| &mut c.wasi_ctx)?; let copy_records_fn_import = module .imports() .find(|import| import.name().eq("copy_records")) @@ -104,3 +108,11 @@ impl WasmStore { linker.instantiate(self, module) } } + +impl std::fmt::Debug for Context { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Context") + .field("limiter", &self.limiter) + .finish() + } +} diff --git a/crates/fluvio-spu/src/config/cli.rs b/crates/fluvio-spu/src/config/cli.rs index 38734a100b..78bfc73b7d 100644 --- a/crates/fluvio-spu/src/config/cli.rs +++ b/crates/fluvio-spu/src/config/cli.rs @@ -59,6 +59,13 @@ pub struct SpuOpt { )] pub peer_max_bytes: u32, + #[arg( + long, + value_name = "integer", + env = "FLV_SMART_ENGINE_MAX_MEMORY_BYTES" + )] + pub smart_engine_max_memory: Option, + #[clap(flatten)] tls: TlsConfig, } @@ -138,6 +145,14 @@ impl SpuOpt { config.peer_max_bytes = self.peer_max_bytes; + if let Some(smart_engine_max_memory) = self.smart_engine_max_memory { + info!( + "overriding smart engine max memory: {}", + smart_engine_max_memory + ); + config.smart_engine.store_max_memory = smart_engine_max_memory; + } + Ok((config, tls_port)) } diff --git a/crates/fluvio-spu/src/config/spu_config.rs b/crates/fluvio-spu/src/config/spu_config.rs index 86c23747cf..e15f47161b 100644 --- a/crates/fluvio-spu/src/config/spu_config.rs +++ b/crates/fluvio-spu/src/config/spu_config.rs @@ -21,6 +21,7 @@ use fluvio_types::defaults::SPU_LOG_INDEX_MAX_BYTES; use fluvio_types::defaults::SPU_LOG_INDEX_MAX_INTERVAL_BYTES; use fluvio_types::defaults::SPU_LOG_SEGMENT_MAX_BYTES; use fluvio_types::defaults::SPU_RETRY_SC_TIMEOUT_MS; +use fluvio_types::defaults::SPU_SMARTENGINE_STORE_MAX_BYTES; // environment variables @@ -75,6 +76,19 @@ impl Default for Log { } } +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct SmartEngineConfig { + pub store_max_memory: usize, +} + +impl Default for SmartEngineConfig { + fn default() -> Self { + Self { + store_max_memory: SPU_SMARTENGINE_STORE_MAX_BYTES, + } + } +} + /// streaming processing unit configuration file #[derive(Debug, Eq, PartialEq, Clone)] pub struct SpuConfig { @@ -95,6 +109,8 @@ pub struct SpuConfig { pub log: Log, pub peer_max_bytes: u32, + + pub smart_engine: SmartEngineConfig, } impl Default for SpuConfig { @@ -109,6 +125,7 @@ impl Default for SpuConfig { sc_retry_ms: SPU_RETRY_SC_TIMEOUT_MS, log: Log::default(), peer_max_bytes: fluvio_storage::FileReplica::PREFER_MAX_LEN, + smart_engine: SmartEngineConfig::default(), } } } diff --git a/crates/fluvio-spu/src/control_plane/dispatcher.rs b/crates/fluvio-spu/src/control_plane/dispatcher.rs index d4bf49752b..947eebf8c1 100644 --- a/crates/fluvio-spu/src/control_plane/dispatcher.rs +++ b/crates/fluvio-spu/src/control_plane/dispatcher.rs @@ -285,11 +285,11 @@ impl ScDispatcher { ReplicaChange::Remove(remove) => { let message = RequestMessage::new_request(remove); if let Err(err) = sc_sink.send_request(&message).await { - error!("error sending back to sc {}", err); + error!("error sending back to sc {err:#?}"); } } ReplicaChange::StorageError(err) => { - error!("error storage {}", err); + error!("error storage {err:#?}"); } } } diff --git a/crates/fluvio-spu/src/services/public/produce_handler.rs b/crates/fluvio-spu/src/services/public/produce_handler.rs index 1277575547..0d283658b5 100644 --- a/crates/fluvio-spu/src/services/public/produce_handler.rs +++ b/crates/fluvio-spu/src/services/public/produce_handler.rs @@ -7,7 +7,7 @@ use tracing::{debug, trace, error}; use tracing::instrument; use anyhow::{anyhow, Result}; -use fluvio_smartengine::SmartModuleChainInstance; +use fluvio_smartengine::{SmartModuleChainInstance, EngineError}; use fluvio_protocol::api::{RequestKind, RequestHeader}; use fluvio_spu_schema::Isolation; use fluvio_protocol::record::{BatchRecords, Offset, Batch, RawRecords}; @@ -29,6 +29,7 @@ use fluvio_future::timer::sleep; use crate::core::DefaultSharedGlobalContext; use crate::replication::leader::SharedFileLeaderState; use crate::smartengine::context::SmartModuleContext; +use crate::smartengine::map_engine_error; use crate::smartengine::produce_batch::ProduceBatchIterator; use crate::smartengine::batch::process_batch; use crate::traffic::TrafficType; @@ -190,16 +191,22 @@ async fn handle_produce_partition( PartitionWriteResult::ok(replica_id, base_offset, leo) } - Err(err) => match err.downcast_ref::() { - Some(StorageError::BatchTooBig(_)) => { - error!(%replica_id, "Batch is too big: {:#?}", err); - PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge) - } - _ => { - error!(%replica_id, "Error writing to replica: {:#?}", err); - PartitionWriteResult::error(replica_id, ErrorCode::StorageError) + Err(err) => { + if let Some(engine_err) = err.downcast_ref::() { + error!(%replica_id, "Replica SmartEngine error: {:#?}", engine_err); + return PartitionWriteResult::error(replica_id, map_engine_error(engine_err)); + }; + match err.downcast_ref::() { + Some(StorageError::BatchTooBig(_)) => { + error!(%replica_id, "Batch is too big: {:#?}", err); + PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge) + } + _ => { + error!(%replica_id, "Error writing to replica: {:#?}", err); + PartitionWriteResult::error(replica_id, ErrorCode::StorageError) + } } - }, + } } } diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index be079fc9eb..2139a0d97d 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -29,7 +29,9 @@ use crate::{ core::GlobalContext, services::public::{ create_public_server, - tests::{create_filter_records, vec_to_raw_batch, load_wasm_module}, + tests::{ + create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records, + }, }, replication::leader::LeaderReplicaState, smartengine::file_batch::FileBatchIterator, @@ -825,7 +827,7 @@ async fn test_produce_basic_with_smartmodule_with_lookback() { assert_eq!(produce_response.responses[0].partitions.len(), 1); assert_eq!( produce_response.responses[0].partitions[0].error_code, - ErrorCode::SmartModuleLookBackError("error in look_back chain: invalid digit found in string\n\nSmartModule Lookback Error: \n Offset: 0\n Key: NULL\n Value: wrong last record".to_string()) + ErrorCode::SmartModuleLookBackError("invalid digit found in string\n\nSmartModule Lookback Error: \n Offset: 0\n Key: NULL\n Value: wrong last record".to_string()) ); } @@ -1023,6 +1025,144 @@ async fn test_produce_with_deduplication() { debug!("terminated controller"); } +#[fluvio_future::test(ignore)] +async fn test_produce_smart_engine_memory_overfow() { + let test_path = temp_dir().join("test_produce_smart_engine_memory_overfow"); + ensure_clean_dir(&test_path); + let port = portpicker::pick_unused_port().expect("No free ports left"); + + let addr = format!("127.0.0.1:{port}"); + let mut spu_config = SpuConfig::default(); + let max_memory_size = 1179648; + spu_config.smart_engine.store_max_memory = max_memory_size; + spu_config.log.base_dir = test_path; + let ctx = GlobalContext::new_shared_context(spu_config); + load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); + + let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + + // wait for stream controller async to start + sleep(Duration::from_millis(100)).await; + + let client_socket = + MultiplexerSocket::new(FluvioSocket::connect(&addr).await.expect("connect")); + + let deduplication = Deduplication { + bounds: Bounds { + count: 6, + age: None, + }, + filter: Filter { + transform: Transform { + uses: FLUVIO_WASM_DEDUPLICATION_FILTER.to_owned(), + with: Default::default(), + }, + }, + }; + let topic = "test_produce_with_deduplication"; + let mut test = Replica::new((topic, 0), 5001, vec![5001]); + test.deduplication = Some(deduplication); + let test_id = test.id.clone(); + ctx.replica_localstore().sync_all(vec![test.clone()]); + + let replica = LeaderReplicaState::create(test.clone(), ctx.config(), ctx.status_update_owned()) + .await + .expect("replica") + .init(&ctx) + .await + .expect("init succeeded"); + + ctx.leaders_state() + .insert(test_id.clone(), replica.clone()) + .await; + + { + // dedup declines repeated record within one batch + let records = create_filter_raw_records(1000); + + let mut produce_request: DefaultProduceRequest = Default::default(); + + let partition_produce = DefaultPartitionRequest { + partition_index: 0, + records, + }; + let topic_produce_request = TopicProduceData { + name: topic.to_owned(), + partitions: vec![partition_produce], + ..Default::default() + }; + + produce_request.topics.push(topic_produce_request); + + let produce_response = client_socket + .send_and_receive(RequestMessage::new_request(produce_request)) + .await + .expect("send offset"); + + // Check base offset + assert_eq!(produce_response.responses.len(), 1); + assert_eq!(produce_response.responses[0].partitions.len(), 1); + assert!( + matches!(produce_response.responses[0].partitions[0].error_code, ErrorCode::SmartModuleMemoryLimitExceeded { requested: _, max } if max == max_memory_size as u64) + ); + } + + server_end_event.notify(); + debug!("terminated controller"); +} + +#[fluvio_future::test(ignore)] +async fn test_dedup_init_smart_engine_memory_overfow() { + let test_path = temp_dir().join("test_dedup_init_smart_engine_memory_overfow"); + ensure_clean_dir(&test_path); + let port = portpicker::pick_unused_port().expect("No free ports left"); + + let addr = format!("127.0.0.1:{port}"); + let mut spu_config = SpuConfig::default(); + let max_memory_size = 1024; + spu_config.smart_engine.store_max_memory = max_memory_size; + spu_config.log.base_dir = test_path; + let ctx = GlobalContext::new_shared_context(spu_config); + load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); + + let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + + // wait for stream controller async to start + sleep(Duration::from_millis(100)).await; + + let deduplication = Deduplication { + bounds: Bounds { + count: 6, + age: None, + }, + filter: Filter { + transform: Transform { + uses: FLUVIO_WASM_DEDUPLICATION_FILTER.to_owned(), + with: Default::default(), + }, + }, + }; + let topic = "test_produce_with_deduplication"; + let mut test = Replica::new((topic, 0), 5001, vec![5001]); + test.deduplication = Some(deduplication); + ctx.replica_localstore().sync_all(vec![test.clone()]); + + let init_res: Result, anyhow::Error> = + LeaderReplicaState::create(test.clone(), ctx.config(), ctx.status_update_owned()) + .await + .expect("replica") + .init(&ctx) + .await; + + assert!(init_res.is_err()); + assert!( + matches!(init_res.unwrap_err().downcast::(), Ok(ErrorCode::SmartModuleMemoryLimitExceeded { requested: _, max }) if max == max_memory_size as u64) + ); + + server_end_event.notify(); + debug!("terminated controller"); +} + async fn read_records(replica: &LeaderReplicaState) -> Vec { let slice = replica .read_records(0i64, u32::MAX, Isolation::ReadUncommitted) diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index ff8cd2b67e..f54234d820 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -2636,7 +2636,7 @@ async fn stream_fetch_filter_lookback( assert_eq!(partition.records.batches.len(), 0); assert_eq!( partition.error_code, - ErrorCode::SmartModuleLookBackError("error in look_back chain: invalid digit found in string\n\nSmartModule Lookback Error: \n Offset: 0\n Key: NULL\n Value: wrong record".to_string()) + ErrorCode::SmartModuleLookBackError("invalid digit found in string\n\nSmartModule Lookback Error: \n Offset: 0\n Key: NULL\n Value: wrong record".to_string()) ); } diff --git a/crates/fluvio-spu/src/smartengine/chain.rs b/crates/fluvio-spu/src/smartengine/chain.rs index cb5274086b..890b189291 100644 --- a/crates/fluvio-spu/src/smartengine/chain.rs +++ b/crates/fluvio-spu/src/smartengine/chain.rs @@ -2,18 +2,18 @@ use tracing::{debug, error}; use fluvio_protocol::link::ErrorCode; use fluvio_smartengine::{ SmartEngine, SmartModuleChainBuilder, SmartModuleChainInstance, SmartModuleConfig, - SmartModuleInitialData, + SmartModuleInitialData, EngineError, }; use fluvio_spu_schema::server::smartmodule::{ SmartModuleContextData, SmartModuleInvocation, SmartModuleKind, }; pub(crate) fn build_chain( + mut chain_builder: SmartModuleChainBuilder, invocations: Vec, version: i16, engine: SmartEngine, ) -> Result { - let mut chain_builder = SmartModuleChainBuilder::default(); for invocation in invocations { let raw = invocation .wasm @@ -54,11 +54,18 @@ pub(crate) fn build_chain( } let chain = chain_builder.initialize(&engine).map_err(|err| { - error!( - error = err.to_string().as_str(), - "Error Initializing SmartModule" - ); - ErrorCode::SmartModuleChainInitError(err.to_string()) + error!("Error Initializing SmartModule chain: {err:#?}"); + match err.downcast_ref() { + Some(EngineError::StoreMemoryExceeded { + current: _, + requested, + max, + }) => ErrorCode::SmartModuleMemoryLimitExceeded { + requested: *requested as u64, + max: *max as u64, + }, + _ => ErrorCode::SmartModuleChainInitError(err.to_string()), + } })?; Ok(chain) } diff --git a/crates/fluvio-spu/src/smartengine/context.rs b/crates/fluvio-spu/src/smartengine/context.rs index e7bc04d9ee..31c27265e6 100644 --- a/crates/fluvio-spu/src/smartengine/context.rs +++ b/crates/fluvio-spu/src/smartengine/context.rs @@ -3,13 +3,14 @@ use std::time::Duration; use async_rwlock::RwLock; use chrono::Utc; +use fluvio::SmartModuleChainBuilder; use fluvio_smartengine::{SmartModuleChainInstance, Version, Lookback}; use fluvio_protocol::link::ErrorCode; use fluvio_smartmodule::Record; use fluvio_spu_schema::server::smartmodule::{SmartModuleInvocation, SmartModuleInvocationWasm}; use fluvio_storage::ReplicaStorage; use fluvio_types::Timestamp; -use tracing::{debug, trace}; +use tracing::{debug, trace, error}; use crate::core::GlobalContext; use crate::core::metrics::SpuMetrics; @@ -54,7 +55,8 @@ impl SmartModuleContext { ) .await .map_err(|err| { - ErrorCode::SmartModuleLookBackError(format!("error in look_back chain: {err}")) + error!("look_back chain error: {err:#}"); + ErrorCode::SmartModuleLookBackError(err.root_cause().to_string()) }) } @@ -72,9 +74,18 @@ impl SmartModuleContext { for invocation in invocations { fetched_invocations.push(resolve_invocation(invocation, ctx)?) } + let mut chain_builder = SmartModuleChainBuilder::default(); + chain_builder.set_store_memory_limit(ctx.config().smart_engine.store_max_memory); + + let chain = chain::build_chain( + chain_builder, + fetched_invocations, + version, + ctx.smartengine_owned(), + )?; Ok(Some(Self { - chain: chain::build_chain(fetched_invocations, version, ctx.smartengine_owned())?, + chain, version, spu_metrics: ctx.metrics(), })) diff --git a/crates/fluvio-spu/src/smartengine/mod.rs b/crates/fluvio-spu/src/smartengine/mod.rs index f79c1233b8..b916144782 100644 --- a/crates/fluvio-spu/src/smartengine/mod.rs +++ b/crates/fluvio-spu/src/smartengine/mod.rs @@ -4,6 +4,8 @@ use fluvio::{ SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind, SmartModuleExtraParams, }; use fluvio_controlplane_metadata::topic::Deduplication; +use fluvio_protocol::link::ErrorCode; +use fluvio_smartengine::EngineError; use fluvio_smartmodule::dataplane::smartmodule::Lookback; pub(crate) mod batch; @@ -32,6 +34,21 @@ pub(crate) fn dedup_to_invocation(dedup: &Deduplication) -> SmartModuleInvocatio } } +pub(crate) fn map_engine_error(err: &EngineError) -> ErrorCode { + match err { + EngineError::UnknownSmartModule => ErrorCode::Other("Unknown SmartModule type".to_string()), + EngineError::Instantiate(err) => ErrorCode::Other(err.to_string()), + EngineError::StoreMemoryExceeded { + current: _, + requested, + max, + } => ErrorCode::SmartModuleMemoryLimitExceeded { + requested: *requested as u64, + max: *max as u64, + }, + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/crates/fluvio-types/Cargo.toml b/crates/fluvio-types/Cargo.toml index 30ce7095aa..3887b1b8fb 100644 --- a/crates/fluvio-types/Cargo.toml +++ b/crates/fluvio-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-types" -version = "0.4.2" +version = "0.4.3" authors = ["Fluvio Contributors "] edition = "2021" description = "Fluvio common types and objects" diff --git a/crates/fluvio-types/src/defaults.rs b/crates/fluvio-types/src/defaults.rs index 0c887b756e..944f7016b3 100644 --- a/crates/fluvio-types/src/defaults.rs +++ b/crates/fluvio-types/src/defaults.rs @@ -51,6 +51,8 @@ pub const STORAGE_FLUSH_WRITE_COUNT: u32 = 1; pub const STORAGE_FLUSH_IDLE_MSEC: u32 = 0; pub const STORAGE_MAX_BATCH_SIZE: u32 = 33_554_432; +pub const SPU_SMARTENGINE_STORE_MAX_BYTES: usize = 1_073_741_824; //1Gb + // CLI config pub const CLI_PROFILES_DIR: &str = "profiles"; pub const CLI_DEFAULT_PROFILE: &str = "default";