Skip to content

Commit

Permalink
feat: Expose concurrency parameter to the Lake structure (and builder) (
Browse files Browse the repository at this point in the history
khorolets authored Apr 27, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 4bda12f commit d1accc6
Showing 2 changed files with 14 additions and 1 deletion.
5 changes: 4 additions & 1 deletion lake-framework/src/lib.rs
Original file line number Diff line number Diff line change
@@ -38,6 +38,9 @@ impl types::Lake {
let runtime = tokio::runtime::Runtime::new()?;

runtime.block_on(async {
// capture the concurrency value before it moves into the streamer
let concurrency = self.concurrency;

// instantiate the NEAR Lake Framework Stream
let (sender, stream) = streamer::streamer(self);

@@ -49,7 +52,7 @@ impl types::Lake {
let block: near_lake_primitives::block::Block = streamer_message.into();
f(block, context).await
})
.buffer_unordered(1usize);
.buffer_unordered(concurrency);

while let Some(_handle_message) = handlers.next().await {}
drop(handlers); // close the channel so the sender will stop
10 changes: 10 additions & 0 deletions lake-framework/src/types.rs
Original file line number Diff line number Diff line change
@@ -50,8 +50,18 @@ pub struct Lake {
/// ```
#[builder(setter(strip_option), default)]
pub(crate) s3_config: Option<aws_sdk_s3::config::Config>,
/// Defines how many *block heights* Lake Framework will try to preload into memory to avoid S3 `List` requests.
/// Default: 100
///
/// *Note*: This value is not the number of blocks to preload, but the number of block heights.
/// Also, this value doesn't affect your indexer much if it follows the tip of the network.
/// This parameter is useful for historical indexing.
#[builder(default = "100")]
pub(crate) blocks_preload_pool_size: usize,
/// Number of concurrent blocks to process. Default: 1
/// **WARNING**: Increase this value only if your block handling logic doesn't have to rely on previous blocks and can be processed in parallel
#[builder(default = "1")]
pub(crate) concurrency: usize,
}

impl LakeBuilder {

0 comments on commit d1accc6

Please sign in to comment.