diff --git a/lake-framework/src/lib.rs b/lake-framework/src/lib.rs index 235d24d..61ce2cc 100644 --- a/lake-framework/src/lib.rs +++ b/lake-framework/src/lib.rs @@ -38,6 +38,9 @@ impl types::Lake { let runtime = tokio::runtime::Runtime::new()?; runtime.block_on(async { + // capture the concurrency value before it moves into the streamer + let concurrency = self.concurrency; + // instantiate the NEAR Lake Framework Stream let (sender, stream) = streamer::streamer(self); @@ -49,7 +52,7 @@ impl types::Lake { let block: near_lake_primitives::block::Block = streamer_message.into(); f(block, context).await }) - .buffer_unordered(1usize); + .buffer_unordered(concurrency); while let Some(_handle_message) = handlers.next().await {} drop(handlers); // close the channel so the sender will stop diff --git a/lake-framework/src/types.rs b/lake-framework/src/types.rs index 3ab147e..03d1ee9 100644 --- a/lake-framework/src/types.rs +++ b/lake-framework/src/types.rs @@ -50,8 +50,18 @@ pub struct Lake { /// ``` #[builder(setter(strip_option), default)] pub(crate) s3_config: Option, + /// Defines how many *block heights* Lake Framework will try to preload into memory to avoid S3 `List` requests. + /// Default: 100 + /// + /// *Note*: This value is not the number of blocks to preload, but the number of block heights. + /// Also, this value doesn't affect your indexer much if it follows the tip of the network. + /// This parameter is useful for historical indexing. #[builder(default = "100")] pub(crate) blocks_preload_pool_size: usize, + /// Number of concurrent blocks to process. Default: 1 + /// **WARNING**: Increase this value only if your block handling logic doesn't have to rely on previous blocks and can be processed in parallel + #[builder(default = "1")] + pub(crate) concurrency: usize, } impl LakeBuilder {