Skip to content

Commit

Permalink
feat: LEP-001 Introduce Lake Context (#68)
Browse files Browse the repository at this point in the history
* feat: Implement LEP-001 (Context)

* clean up Lake impl to reuse code

* Update docstrings

* refactor: Resolves #42 replace anyhow with LakeError enum (thiserror) (#70)

* fix: typo in README
  • Loading branch information
khorolets authored Apr 28, 2023
1 parent d1accc6 commit 78849f3
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 64 deletions.
3 changes: 2 additions & 1 deletion lake-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.0.0" # managed by cargo-workspaces
edition = "2021"

[dependencies]
anyhow = "1.0.51"
aws-config = "0.53.0"
aws-types = "0.53.0"
aws-credential-types = "0.53.0"
Expand All @@ -24,6 +23,8 @@ near-lake-primitives = { path = "../lake-primitives" }

[dev-dependencies]
aws-smithy-http = "0.53.0"
# use by examples
anyhow = "1.0.51"

# used by nft_indexer example
regex = "1.5.4"
Expand Down
45 changes: 43 additions & 2 deletions lake-framework/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ fn main() -> anyhow::Result<()> {
.testnet()
.start_block_height(112205773)
.build()?
.run(handle_block)
.run(handle_block)?;
Ok(())
}
// The handler function to take the `Block`
// and print the block height
async fn handle_block(
block: near_lake_primitives::block::Block,
_context: near_lake_framework::LakeContext,
) -> anyhow::Result<()> {
eprintln!(
"Block #{}",
Expand All @@ -29,13 +29,54 @@ async fn handle_block(
}
```

### Pass the context to the function

```no_run
struct MyContext {
my_field: String
}
fn main() -> anyhow::Result<()> {
let context = MyContext {
my_field: "My value".to_string(),
};
near_lake_framework::LakeBuilder::default()
.testnet()
.start_block_height(112205773)
.build()?
.run_with_context(handle_block, &context)?;
Ok(())
}
// The handler function to take the `Block`
// and print the block height
async fn handle_block(
block: near_lake_primitives::block::Block,
context: &MyContext,
) -> anyhow::Result<()> {
eprintln!(
"Block #{} / {}",
block.block_height(),
context.my_field,
);
# Ok(())
}
```

## Tutorials:

- <https://youtu.be/GsF7I93K-EQ>
- [Migrating to NEAR Lake Framework](https://near-indexers.io/tutorials/lake/migrating-to-near-lake-framework) from [NEAR Indexer Framework](https://near-indexers.io/docs/projects/near-indexer-framework)

### More examples

You might want to have a look at the always up-to-date examples in [`examples`](https://github.com/near/near-lake-framework-rs/tree/main/lake-framework/examples) folder.

Other examples that we try to keep up-to-date but we might fail sometimes:

- <https://github.com/near-examples/near-lake-raw-printer> simple example of a data printer built on top of NEAR Lake Framework
- <https://github.com/near-examples/near-lake-accounts-watcher> another simple example of the indexer built on top of NEAR Lake Framework for a tutorial purpose

Expand Down
5 changes: 3 additions & 2 deletions lake-framework/examples/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ fn main() -> anyhow::Result<()> {
.mainnet()
.start_block_height(88444526)
.build()?
.run(print_function_calls_to_my_account) // developer-defined async function that handles each block
// developer-defined async function that handles each block
.run(print_function_calls_to_my_account)?;
Ok(())
}

async fn print_function_calls_to_my_account(
mut block: near_lake_primitives::block::Block,
_ctx: near_lake_framework::LakeContext,
) -> anyhow::Result<()> {
let block_height = block.block_height();
let actions: Vec<&near_lake_primitives::actions::FunctionCall> = block
Expand Down
8 changes: 3 additions & 5 deletions lake-framework/examples/nft_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ fn main() -> anyhow::Result<()> {
.testnet()
.start_block_height(112205773)
.build()?
.run(handle_block) // developer-defined async function that handles each block
.run(handle_block)?; // developer-defined async function that handles each block
Ok(())
}

async fn handle_block(
mut block: near_lake_primitives::block::Block,
_ctx: near_lake_framework::LakeContext,
) -> anyhow::Result<()> {
async fn handle_block(mut block: near_lake_primitives::block::Block) -> anyhow::Result<()> {
// Indexing lines START
let nfts: Vec<NFTReceipt> = block
.events() // fetching all the events that occurred in the block
Expand Down
8 changes: 3 additions & 5 deletions lake-framework/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ fn main() -> anyhow::Result<()> {
.testnet()
.start_block_height(112205773)
.build()?
.run(handle_block) // developer-defined async function that handles each block
.run(handle_block)?; // developer-defined async function that handles each block
Ok(())
}

async fn handle_block(
block: near_lake_primitives::block::Block,
_ctx: near_lake_framework::LakeContext,
) -> anyhow::Result<()> {
async fn handle_block(block: near_lake_primitives::block::Block) -> anyhow::Result<()> {
println!("Block {:?}", block.block_height());

Ok(())
Expand Down
73 changes: 73 additions & 0 deletions lake-framework/examples/with_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! This example show how to use a context with Lake Framework.
//! It is going to follow the NEAR Social contract and the block height along
//! with a number of calls to the contract.
use std::io::Write;

use near_lake_framework::near_lake_primitives;
// We need to import this trait to use the `as_function_call` method.
use near_lake_primitives::actions::ActionMetaDataExt;

const CONTRACT_ID: &str = "social.near";

#[derive(Clone)]
struct FileContext {
path: std::path::PathBuf,
}

impl FileContext {
fn new(path: impl Into<std::path::PathBuf>) -> Self {
Self { path: path.into() }
}

// append to the file
pub fn write(&self, value: &str) -> anyhow::Result<()> {
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
file.write_all(value.as_bytes())?;
Ok(())
}
}

fn main() -> anyhow::Result<()> {
println!("Starting...");
// Create the context
let context = FileContext::new("./output.txt");
// Lake Framework start boilerplate
near_lake_framework::LakeBuilder::default()
.mainnet()
.start_block_height(88444526)
.build()?
// developer-defined async function that handles each block
.run_with_context(print_function_calls_to_my_account, &context)?;
Ok(())
}

async fn print_function_calls_to_my_account(
mut block: near_lake_primitives::block::Block,
ctx: &FileContext,
) -> anyhow::Result<()> {
let block_height = block.block_height();
let actions: Vec<&near_lake_primitives::actions::FunctionCall> = block
.actions()
.filter(|action| action.receiver_id().as_str() == CONTRACT_ID)
.filter_map(|action| action.as_function_call())
.collect();

if !actions.is_empty() {
// Here's the usage of the context.
ctx.write(
format!(
"Block #{} - {} calls to {}\n",
block_height,
actions.len(),
CONTRACT_ID
)
.as_str(),
)?;
println!("Block #{:?}\n{:#?}", block_height, actions);
}

Ok(())
}
86 changes: 59 additions & 27 deletions lake-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,62 +7,94 @@ use futures::{Future, StreamExt};
pub use near_lake_primitives::{self, near_indexer_primitives, LakeContext};

pub use aws_credential_types::Credentials;
pub use types::{Lake, LakeBuilder};
pub use types::{Lake, LakeBuilder, LakeError};

mod s3_fetchers;
mod streamer;
pub(crate) mod types;

pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework";

/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage`
///```no_run
///# fn main() -> anyhow::Result<()> {
/// near_lake_framework::LakeBuilder::default()
/// .testnet()
/// .start_block_height(112205773)
/// .build()?
/// .run(handle_block)
///# }
///
/// # async fn handle_block(_block: near_lake_primitives::block::Block, _context: near_lake_framework::LakeContext) -> anyhow::Result<()> { Ok(()) }
///```
impl types::Lake {
pub fn run<Fut>(
/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage`
///```no_run
/// struct MyContext {
/// my_field: String,
/// }
///# fn main() -> anyhow::Result<()> {
///
/// let context = MyContext {
/// my_field: "my_value".to_string(),
/// };
///
/// near_lake_framework::LakeBuilder::default()
/// .testnet()
/// .start_block_height(112205773)
/// .build()?
/// .run_with_context(handle_block, &context)?;
/// Ok(())
///# }
///
/// # async fn handle_block(_block: near_lake_primitives::block::Block, context: &MyContext) -> anyhow::Result<()> { Ok(()) }
///```
pub fn run_with_context<'context, C, E, Fut>(
self,
f: impl Fn(near_lake_primitives::block::Block, near_lake_primitives::LakeContext) -> Fut,
) -> anyhow::Result<()>
f: impl Fn(near_lake_primitives::block::Block, &'context C) -> Fut,
context: &'context C,
) -> Result<(), LakeError>
where
Fut: Future<Output = anyhow::Result<()>>,
Fut: Future<Output = Result<(), E>>,
E: Into<Box<dyn std::error::Error>>,
{
let runtime = tokio::runtime::Runtime::new()?;

runtime.block_on(async {
// capture the concurrency value before it moves into the streamer
let concurrency = self.concurrency;
let runtime = tokio::runtime::Runtime::new()
.map_err(|err| LakeError::RuntimeStartError { error: err })?;

runtime.block_on(async move {
// instantiate the NEAR Lake Framework Stream
let (sender, stream) = streamer::streamer(self);

// read the stream events and pass them to a handler function with
// concurrency 1
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| async {
let context = LakeContext {};
let block: near_lake_primitives::block::Block = streamer_message.into();
f(block, context).await
f(block, &context).await
})
.buffer_unordered(concurrency);
.buffer_unordered(1usize);

while let Some(_handle_message) = handlers.next().await {}
drop(handlers); // close the channel so the sender will stop

// propagate errors from the sender
match sender.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(e) => Err(anyhow::Error::from(e)), // JoinError
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()), // JoinError
}
})
}

/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage`
///```no_run
///# fn main() -> anyhow::Result<()> {
/// near_lake_framework::LakeBuilder::default()
/// .testnet()
/// .start_block_height(112205773)
/// .build()?
/// .run(handle_block)?;
/// Ok(())
///# }
///
/// # async fn handle_block(_block: near_lake_primitives::block::Block) -> anyhow::Result<()> { Ok(()) }
///```
pub fn run<Fut, E>(
self,
f: impl Fn(near_lake_primitives::block::Block) -> Fut,
) -> Result<(), LakeError>
where
Fut: Future<Output = Result<(), E>>,
E: Into<Box<dyn std::error::Error>>,
{
self.run_with_context(|block, _context| f(block), &())
}
}
15 changes: 3 additions & 12 deletions lake-framework/src/s3_fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ pub(crate) async fn list_block_heights(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
start_from_block_height: crate::types::BlockHeight,
) -> Result<
Vec<crate::types::BlockHeight>,
crate::types::LakeError<aws_sdk_s3::error::ListObjectsV2Error>,
> {
) -> Result<Vec<crate::types::BlockHeight>, crate::types::LakeError> {
tracing::debug!(
target: crate::LAKE_FRAMEWORK,
"Fetching block heights from S3, after #{}...",
Expand Down Expand Up @@ -117,10 +114,7 @@ pub(crate) async fn fetch_streamer_message(
lake_s3_client: &impl S3Client,
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
) -> Result<
near_lake_primitives::StreamerMessage,
crate::types::LakeError<aws_sdk_s3::error::GetObjectError>,
> {
) -> Result<near_lake_primitives::StreamerMessage, crate::types::LakeError> {
let block_view = {
let body_bytes = loop {
match lake_s3_client
Expand Down Expand Up @@ -177,10 +171,7 @@ async fn fetch_shard_or_retry(
s3_bucket_name: &str,
block_height: crate::types::BlockHeight,
shard_id: u64,
) -> Result<
near_lake_primitives::IndexerShard,
crate::types::LakeError<aws_sdk_s3::error::GetObjectError>,
> {
) -> Result<near_lake_primitives::IndexerShard, crate::types::LakeError> {
let body_bytes = loop {
match lake_s3_client
.get_object(
Expand Down
Loading

0 comments on commit 78849f3

Please sign in to comment.