Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async run_with_context #107

Open
wants to merge 1 commit into
base: node/1.40
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions lake-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,39 +57,49 @@ impl types::Lake {
let runtime = tokio::runtime::Runtime::new()
.map_err(|err| LakeError::RuntimeStartError { error: err })?;

runtime.block_on(async move {
// capture the concurrency value before it moves into the streamer
let concurrency = self.concurrency;
runtime.block_on(async move { self.run_with_context_async(f, context).await })
}

// instantiate the NEAR Lake Framework Stream
let (sender, stream) = streamer::streamer(self);
pub async fn run_with_context_async<'context, C: LakeContextExt, E, Fut>(
self,
f: impl Fn(near_lake_primitives::block::Block, &'context C) -> Fut,
context: &'context C,
) -> Result<(), LakeError>
where
Fut: Future<Output = Result<(), E>>,
E: Into<Box<dyn std::error::Error>>,
{
// capture the concurrency value before it moves into the streamer
let concurrency = self.concurrency;

// read the stream events and pass them to a handler function with
// concurrency 1
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| async {
let mut block: near_lake_primitives::block::Block = streamer_message.into();
// instantiate the NEAR Lake Framework Stream
let (sender, stream) = streamer::streamer(self);

context.execute_before_run(&mut block);
// read the stream events and pass them to a handler function with
// concurrency 1
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| async {
let mut block: near_lake_primitives::block::Block = streamer_message.into();

let user_indexer_function_execution_result = f(block, context).await;
context.execute_before_run(&mut block);

context.execute_after_run();
let user_indexer_function_execution_result = f(block, context).await;

user_indexer_function_execution_result
})
.buffer_unordered(concurrency);
context.execute_after_run();

while let Some(_handle_message) = handlers.next().await {}
drop(handlers); // close the channel so the sender will stop
user_indexer_function_execution_result
})
.buffer_unordered(concurrency);

// propagate errors from the sender
match sender.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()), // JoinError
}
})
while let Some(_handle_message) = handlers.next().await {}
drop(handlers); // close the channel so the sender will stop

// propagate errors from the sender
match sender.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()), // JoinError
}
}

/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage`
Expand Down