diff --git a/Cargo.toml b/Cargo.toml index c15e293..f043672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ # cargo-workspaces [workspace.package] -version = "0.8.0" +version = "0.8.0-beta.1" license = "MIT OR Apache-2.0" repository = "https://github.com/near/near-lake-framework" description = "Library to connect to the NEAR Lake S3 and stream the data" diff --git a/blocks/000000879765/block.json b/blocks/000000879765/block.json deleted file mode 100644 index 1581ebb..0000000 --- a/blocks/000000879765/block.json +++ /dev/null @@ -1,63 +0,0 @@ -{ - "author": "test.near", - "header": { - "height": 879765, - "prev_height": 879764, - "epoch_id": "Hp4sw9ZGSceYadnvh7NpYJVVK7rcdir48jfrsxvwKQu9", - "next_epoch_id": "4h5mecoLYVFeZxAMAX3Mq3GQfEnuvSAPPo9kEpr4rGUL", - "hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", - "prev_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", - "prev_state_root": "6zDM1UGLsZ7HnyUofDrTF73gv5vk2N614ViDkXBkq4ej", - "chunk_receipts_root": "9ETNjrt6MkwTgSVMMbpukfxRshSD1avBUUa4R4NuqwHv", - "chunk_headers_root": "4otZ2Zj1wANZweh33kWETr3VbF3HwW9zWET4YRYTo2pL", - "chunk_tx_root": "9rdfzfYzJMZyaj2yMvjget2ZsPNbZhKqY1qUXc1urDfu", - "outcome_root": "7tkzFg8RHBmMw1ncRJZCCZAizgq4rwCftTKYLce8RU8t", - "chunks_included": 1, - "challenges_root": "11111111111111111111111111111111", - "timestamp": 1676913656724153000, - "timestamp_nanosec": "1676913656724153000", - "random_value": "Au7bq9XzGAhDm2wb4PxbXQnTngzVTcWYa76Govx6n7NK", - "validator_proposals": [], - "chunk_mask": [ - true - ], - "gas_price": "100000000", - "block_ordinal": 879714, - "rent_paid": "0", - "validator_reward": "0", - "total_supply": "2085303629225498163419972383984892", - "challenges_result": [], - "last_final_block": "BS9QJenf3N9pKy8PZ5xRuowZi9X9T4sSDDu4i3i5UJZe", - "last_ds_final_block": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", - "next_bp_hash": "EtsYQonaJ7n5nRt32XJC5dBxxBxh7a9UVApykmmt8fCQ", - "block_merkle_root": "CqRoDd8BR4su7Z8vSfvg45HrugZnwbMbnXHRTWYQkWfZ", - "epoch_sync_data_hash": null, - "approvals": [ - "ed25519:3RBQ4PnfBbnDn8WnCScQJH9asjkicuhZZo36aa6FVa2Lbnj531NLiBkTmj8rhg5vfsarmYLgQmcMcXRuJ4jkzKns" - ], - "signature": "ed25519:2dWsY1QadJyNaVkyga5Wcj9DFRizAyFc9STjyN5Mtxc59ZzNYqML6qQTgtLeCYkpCy1h7kG34jcALTpEDQpkBoKQ", - "latest_protocol_version": 59 - }, - "chunks": [ - { - "chunk_hash": "7Ewp1AnL6o29UXLW2up9miQBdSaKxCnfRyhMGt9G4epN", - "prev_block_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", - "outcome_root": "11111111111111111111111111111111", - "prev_state_root": "2ViDp7rmam77VmhY5C9KW92a6mgUTCKQ3Scz8tFyH13z", - "encoded_merkle_root": "44MrDjQzt1jU5PGUYY69THZ4g3SsfQiNiKKorey3GVtq", - "encoded_length": 364, - "height_created": 879765, - "height_included": 879765, - "shard_id": 0, - "gas_used": 0, - "gas_limit": 1000000000000000, - "rent_paid": "0", - "validator_reward": "0", - "balance_burnt": "0", - "outgoing_receipts_root": "H4Rd6SGeEBTbxkitsCdzfu9xL9HtZ2eHoPCQXUeZ6bW4", - "tx_root": "GKd8Evs3JdahRpS8q14q6RzzkodzFiSQPcH4yJxs4ZjG", - "validator_proposals": [], - "signature": "ed25519:2qev3mWQdYLi9aPwCnFHt22GFxhuGTGfnaz3msGcduUdXeycTQDBkY4EyQzpph4frXCybuYHE6g4GFxD2HVmWbJY" - } - ] -} diff --git a/blocks/000000879765/shard_0.json b/blocks/000000879765/shard_0.json deleted file mode 100644 index 2178ac2..0000000 --- a/blocks/000000879765/shard_0.json +++ /dev/null @@ -1,239 +0,0 @@ -{ - "shard_id": 0, - "chunk": { - "author": "test.near", - "header": { - "chunk_hash": "7Ewp1AnL6o29UXLW2up9miQBdSaKxCnfRyhMGt9G4epN", - "prev_block_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", - "outcome_root": "11111111111111111111111111111111", - "prev_state_root": "2ViDp7rmam77VmhY5C9KW92a6mgUTCKQ3Scz8tFyH13z", - "encoded_merkle_root": "44MrDjQzt1jU5PGUYY69THZ4g3SsfQiNiKKorey3GVtq", - "encoded_length": 364, - "height_created": 879765, - "height_included": 0, - "shard_id": 0, - "gas_used": 0, - "gas_limit": 1000000000000000, - "rent_paid": "0", - "validator_reward": "0", - "balance_burnt": "0", - "outgoing_receipts_root": "H4Rd6SGeEBTbxkitsCdzfu9xL9HtZ2eHoPCQXUeZ6bW4", - "tx_root": "GKd8Evs3JdahRpS8q14q6RzzkodzFiSQPcH4yJxs4ZjG", - "validator_proposals": [], - "signature": "ed25519:2qev3mWQdYLi9aPwCnFHt22GFxhuGTGfnaz3msGcduUdXeycTQDBkY4EyQzpph4frXCybuYHE6g4GFxD2HVmWbJY" - }, - "transactions": [ - { - "transaction": { - "signer_id": "test.near", - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", - "nonce": 39, - "receiver_id": "test.near", - "actions": [ - { - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - } - ], - "signature": "ed25519:3vKF31u2naSjow1uQEfkoWy834fu9xhk66oBfTAYL3XVtJVAf1FREt7owJzwyRrN5F4mtd1rkvv1iTPTL86Szb2j", - "hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" - }, - "outcome": { - "execution_outcome": { - "proof": [ - { - "hash": "7kPZTTVYJHvUg4g3S7SFErkKs18Ex1kN4rESnZwtJb2U", - "direction": "Right" - } - ], - "block_hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", - "id": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ", - "outcome": { - "logs": [], - "receipt_ids": [ - "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ" - ], - "gas_burnt": 409824625000, - "tokens_burnt": "40982462500000000000", - "executor_id": "test.near", - "status": { - "SuccessReceiptId": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ" - }, - "metadata": { - "version": 1, - "gas_profile": null - } - } - }, - "receipt": null - } - } - ], - "receipts": [ - { - "predecessor_id": "test.near", - "receiver_id": "test.near", - "receipt_id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", - "receipt": { - "Action": { - "signer_id": "test.near", - "signer_public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", - "gas_price": "100000000", - "output_data_receivers": [], - "input_data_ids": [], - "actions": [ - { - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - } - ] - } - } - } - ] - }, - "receipt_execution_outcomes": [ - { - "execution_outcome": { - "proof": [ - { - "hash": "6vBgNYcwx6pcESfrw5YRBRamatBH8red3GEt3s3ntefm", - "direction": "Left" - } - ], - "block_hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", - "id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", - "outcome": { - "logs": [], - "receipt_ids": [ - "5rc8UEhD4hmNQ3pJJM5Xc3VHeLXpCQqkA3ep8ag4aaDA" - ], - "gas_burnt": 308059500000, - "tokens_burnt": "30805950000000000000", - "executor_id": "test.near", - "status": { - "Failure": { - "ActionError": { - "index": 0, - "kind": "DelegateActionExpired" - } - } - }, - "metadata": { - "version": 3, - "gas_profile": [] - } - } - }, - "receipt": { - "predecessor_id": "test.near", - "receiver_id": "test.near", - "receipt_id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", - "receipt": { - "Action": { - "signer_id": "test.near", - "signer_public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", - "gas_price": "100000000", - "output_data_receivers": [], - "input_data_ids": [], - "actions": [ - { - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - } - ] - } - } - } - } - ], - "state_changes": [ - { - "cause": { - "type": "transaction_processing", - "tx_hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" - }, - "type": "account_update", - "change": { - "account_id": "test.near", - "amount": "999999549946933447300000000000000", - "locked": "81773107345435833494396250588347", - "code_hash": "11111111111111111111111111111111", - "storage_usage": 182, - "storage_paid_at": 0 - } - }, - { - "cause": { - "type": "transaction_processing", - "tx_hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" - }, - "type": "access_key_update", - "change": { - "account_id": "test.near", - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", - "access_key": { - "nonce": 39, - "permission": "FullAccess" - } - } - } - ] -} diff --git a/lake-context-derive/Cargo.toml b/lake-context-derive/Cargo.toml index eda295d..d70454e 100644 --- a/lake-context-derive/Cargo.toml +++ b/lake-context-derive/Cargo.toml @@ -1,12 +1,14 @@ [package] name = "lake-context-derive" -version.workspace = true -# version = "0.0.0" # managed by cargo-workspaces +description = "Derive macro for LakeContext" edition = "2021" +version.workspace = true +license.workspace = true +repository.workspace = true [lib] proc-macro = true [dependencies] syn = "2.0" -quote = "1.0" \ No newline at end of file +quote = "1.0" diff --git a/lake-framework/Cargo.toml b/lake-framework/Cargo.toml index ff4a14e..912cdc3 100644 --- a/lake-framework/Cargo.toml +++ b/lake-framework/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "near-lake-framework" -version.workspace = true +description = "Library to connect to the NEAR Lake S3 and stream the data" edition = "2021" +version.workspace = true +license.workspace = true +repository.workspace = true [dependencies] aws-config = "0.53.0" @@ -32,7 +35,7 @@ regex = "1.5.4" once_cell = "1.8.0" # used in the doc examples -diesel = { version = "2", features= ["postgres_backend", "postgres"] } +diesel = { version = "2", features = ["postgres_backend", "postgres"] } # used by with_context_parent_tx_cache example lake-parent-transaction-cache = { path = "../lake-parent-transaction-cache" } diff --git a/lake-parent-transaction-cache/Cargo.toml b/lake-parent-transaction-cache/Cargo.toml index e7d2170..76f493d 100644 --- a/lake-parent-transaction-cache/Cargo.toml +++ b/lake-parent-transaction-cache/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "lake-parent-transaction-cache" -version.workspace = true +description = "Ready-to-use context for the Lake Framework in Rust. It provides a cache for keeping the relation between transactions and receipts in cache." edition = "2021" +version.workspace = true +license.workspace = true +repository.workspace = true [dependencies] cached = "0.43.0" @@ -10,4 +13,4 @@ derive_builder = "0.12.0" near-lake-framework = { path = "../lake-framework" } [dev-dependencies] -anyhow = "1.0.44" \ No newline at end of file +anyhow = "1.0.44" diff --git a/lake-primitives/Cargo.toml b/lake-primitives/Cargo.toml index c5c776e..bbbb454 100644 --- a/lake-primitives/Cargo.toml +++ b/lake-primitives/Cargo.toml @@ -1,7 +1,11 @@ [package] name = "near-lake-primitives" -version.workspace = true +description = "Primitives for NEAR Lake" edition = "2021" +version.workspace = true +license.workspace = true +repository.workspace = true + [dependencies] anyhow = "1.0.51" diff --git a/near-lake-framework/src/s3_fetchers.rs b/near-lake-framework/src/s3_fetchers.rs deleted file mode 100644 index 6bbac1e..0000000 --- a/near-lake-framework/src/s3_fetchers.rs +++ /dev/null @@ -1,357 +0,0 @@ -use async_trait::async_trait; -use std::str::FromStr; - -use aws_sdk_s3::output::{GetObjectOutput, ListObjectsV2Output}; - -#[async_trait] -pub trait S3Client { - async fn get_object( - &self, - bucket: &str, - prefix: &str, - ) -> Result>; - - async fn list_objects( - &self, - bucket: &str, - start_after: &str, - ) -> Result< - ListObjectsV2Output, - aws_sdk_s3::types::SdkError, - >; -} - -#[derive(Clone, Debug)] -pub struct LakeS3Client { - s3: aws_sdk_s3::Client, -} - -impl LakeS3Client { - pub fn new(s3: aws_sdk_s3::Client) -> Self { - Self { s3 } - } -} - -#[async_trait] -impl S3Client for LakeS3Client { - async fn get_object( - &self, - bucket: &str, - prefix: &str, - ) -> Result> - { - Ok(self - .s3 - .get_object() - .bucket(bucket) - .key(prefix) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .send() - .await?) - } - - async fn list_objects( - &self, - bucket: &str, - start_after: &str, - ) -> Result< - ListObjectsV2Output, - aws_sdk_s3::types::SdkError, - > { - Ok(self - .s3 - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(start_after) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .bucket(bucket) - .send() - .await?) - } -} - -/// Queries the list of the objects in the bucket, grouped by "/" delimiter. -/// Returns the list of block heights that can be fetched -pub(crate) async fn list_block_heights( - lake_s3_client: &impl S3Client, - s3_bucket_name: &str, - start_from_block_height: crate::types::BlockHeight, -) -> Result< - Vec, - crate::types::LakeError, -> { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Fetching block heights from S3, after #{}...", - start_from_block_height - ); - let response = lake_s3_client - .list_objects(s3_bucket_name, &format!("{:0>12}", start_from_block_height)) - .await?; - - Ok(match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| { - prefix_string - .split('/') - .next() - .map(u64::from_str) - .and_then(|num| num.ok()) - }) - .collect(), - }) -} - -/// By the given block height gets the objects: -/// - block.json -/// - shard_N.json -/// Reads the content of the objects and parses as a JSON. -/// Returns the result in `near_indexer_primitives::StreamerMessage` -pub(crate) async fn fetch_streamer_message( - lake_s3_client: &impl S3Client, - s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -<<<<<<< HEAD:src/s3_fetchers.rs -) -> Result< - near_indexer_primitives::StreamerMessage, - crate::types::LakeError, -> { -======= -) -> anyhow::Result { ->>>>>>> 8bcd5c5 (feat: NEAR Lake Helper (high-level Lake Framework) (#51)):near-lake-framework/src/s3_fetchers.rs - let block_view = { - let body_bytes = loop { - match lake_s3_client - .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) - .await - { - Ok(response) => { - match response.body.collect().await { - Ok(bytes_stream) => break bytes_stream.into_bytes(), - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read bytes from the block #{:0>12} response. Retrying immediately.\n{:#?}", - block_height, - err, - ); - } - }; - } - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to get {:0>12}/block.json. Retrying immediately\n{:#?}", - block_height, - err - ); - } - }; - }; - -<<<<<<< HEAD:src/s3_fetchers.rs - serde_json::from_slice::(body_bytes.as_ref())? - }; - - let fetch_shards_futures = (0..block_view.chunks.len() as u64) -======= - let body_bytes = response.body.collect().await?.into_bytes(); - - serde_json::from_slice::( - body_bytes.as_ref(), - )? - }; - - let shards: Vec = (0..block_view.chunks.len() - as u64) ->>>>>>> 8bcd5c5 (feat: NEAR Lake Helper (high-level Lake Framework) (#51)):near-lake-framework/src/s3_fetchers.rs - .collect::>() - .into_iter() - .map(|shard_id| { - fetch_shard_or_retry(lake_s3_client, s3_bucket_name, block_height, shard_id) - }); - - let shards = futures::future::try_join_all(fetch_shards_futures).await?; - - Ok(crate::near_indexer_primitives::StreamerMessage { - block: block_view, - shards, - }) -} - -/// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` -async fn fetch_shard_or_retry( - lake_s3_client: &impl S3Client, - s3_bucket_name: &str, - block_height: crate::types::BlockHeight, - shard_id: u64, -<<<<<<< HEAD:src/s3_fetchers.rs -) -> Result< - near_indexer_primitives::IndexerShard, - crate::types::LakeError, -> { - let body_bytes = loop { - match lake_s3_client - .get_object( - s3_bucket_name, - &format!("{:0>12}/shard_{}.json", block_height, shard_id), - ) -======= -) -> crate::near_indexer_primitives::IndexerShard { - loop { - match s3_client - .get_object() - .bucket(s3_bucket_name) - .key(format!("{:0>12}/shard_{}.json", block_height, shard_id)) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .send() ->>>>>>> 8bcd5c5 (feat: NEAR Lake Helper (high-level Lake Framework) (#51)):near-lake-framework/src/s3_fetchers.rs - .await - { - Ok(response) => { - let body_bytes = match response.body.collect().await { - Ok(body) => body.into_bytes(), - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to read the {:0>12}/shard_{}.json. Retrying in 1s...\n {:#?}", - block_height, - shard_id, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - }; - -<<<<<<< HEAD:src/s3_fetchers.rs - break body_bytes; -======= - let indexer_shard = match serde_json::from_slice::< - crate::near_indexer_primitives::IndexerShard, - >(body_bytes.as_ref()) - { - Ok(indexer_shard) => indexer_shard, - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to parse the {:0>12}/shard_{}.json. Retrying in 1s...\n {:#?}", - block_height, - shard_id, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - }; - - break indexer_shard; ->>>>>>> 8bcd5c5 (feat: NEAR Lake Helper (high-level Lake Framework) (#51)):near-lake-framework/src/s3_fetchers.rs - } - Err(err) => { - tracing::debug!( - target: crate::LAKE_FRAMEWORK, - "Failed to fetch shard #{}, retrying immediately\n{:#?}", - shard_id, - err - ); - } - } - }; - - Ok(serde_json::from_slice::< - near_indexer_primitives::IndexerShard, - >(body_bytes.as_ref())?) -} - -#[cfg(test)] -mod test { - use super::*; - - use async_trait::async_trait; - - use aws_sdk_s3::output::{get_object_output, list_objects_v2_output}; - use aws_sdk_s3::types::ByteStream; - - use aws_smithy_http::body::SdkBody; - - #[derive(Clone, Debug)] - pub struct LakeS3Client {} - - #[async_trait] - impl S3Client for LakeS3Client { - async fn get_object( - &self, - _bucket: &str, - prefix: &str, - ) -> Result> - { - let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - let file_bytes = tokio::fs::read(path).await.unwrap(); - let stream = ByteStream::new(SdkBody::from(file_bytes)); - Ok(get_object_output::Builder::default().body(stream).build()) - } - - async fn list_objects( - &self, - _bucket: &str, - _start_after: &str, - ) -> Result< - ListObjectsV2Output, - aws_sdk_s3::types::SdkError, - > { - Ok(list_objects_v2_output::Builder::default().build()) - } - } - - #[tokio::test] - async fn deserializes_meta_transactions() { - let lake_client = LakeS3Client {}; - - let streamer_message = - fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) - .await - .unwrap(); - - let delegate_action = &streamer_message.shards[0] - .chunk - .as_ref() - .unwrap() - .transactions[0] - .transaction - .actions[0]; - - assert_eq!( - serde_json::to_value(delegate_action).unwrap(), - serde_json::json!({ - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - }) - ); - } -} diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 66d3ccc..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,553 +0,0 @@ -//! # NEAR Lake Framework -//! -//! NEAR Lake Framework is a small library companion to [NEAR Lake](https://github.com/near/near-lake). It allows you to build -//! your own indexer that subscribes to the stream of blocks from the NEAR Lake data source and create your own logic to process -//! the NEAR Protocol data. - -//! ## Example - -//! ```rust -//! use futures::StreamExt; -//! use near_lake_framework::LakeConfigBuilder; -//! -//! #[tokio::main] -//! async fn main() -> Result<(), tokio::io::Error> { -//! // create a NEAR Lake Framework config -//! let config = LakeConfigBuilder::default() -//! .testnet() -//! .start_block_height(82422587) -//! .build() -//! .expect("Failed to build LakeConfig"); -//! -//! // instantiate the NEAR Lake Framework Stream -//! let (sender, stream) = near_lake_framework::streamer(config); -//! -//! // read the stream events and pass them to a handler function with -//! // concurrency 1 -//! let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) -//! .map(|streamer_message| handle_streamer_message(streamer_message)) -//! .buffer_unordered(1usize); -//! -//! while let Some(_handle_message) = handlers.next().await {} -//! drop(handlers); // close the channel so the sender will stop -//! -//! // propagate errors from the sender -//! match sender.await { -//! Ok(Ok(())) => Ok(()), -//! Ok(Err(e)) => Err(e), -//! Err(e) => Err(anyhow::Error::from(e)), // JoinError -//! } -//!} -//! -//! // The handler function to take the entire `StreamerMessage` -//! // and print the block height and number of shards -//! async fn handle_streamer_message( -//! streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage, -//! ) { -//! eprintln!( -//! "{} / shards {}", -//! streamer_message.block.header.height, -//! streamer_message.shards.len() -//! ); -//!} -//!``` -//! -//! ## Tutorials: -//! -//! - -//! - [Migrating to NEAR Lake Framework](https://near-indexers.io/tutorials/lake/migrating-to-near-lake-framework) from [NEAR Indexer Framework](https://near-indexers.io/docs/projects/near-indexer-framework) -//! -//! ### More examples -//! -//! - simple example of a data printer built on top of NEAR Lake Framework -//! - another simple example of the indexer built on top of NEAR Lake Framework for a tutorial purpose -//! -//! - an example of the indexer built on top of NEAR Lake Framework that watches for transactions related to specified account(s) -//! - a community-made project that uses NEAR Lake Framework -//! -//! ## How to use -//! -//! ### AWS S3 Credentials -//! -//! In order to be able to get objects from the AWS S3 bucket you need to provide the AWS credentials. -//! #### Passing credentials to the config builder -//! -//! ```rust -//! use near_lake_framework::LakeConfigBuilder; -//! -//! # async fn main() { -//! let credentials = aws_credential_types::Credentials::new( -//! "AKIAIOSFODNN7EXAMPLE", -//! "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", -//! None, -//! None, -//! "custom_credentials", -//! ); -//! let s3_config = aws_sdk_s3::Config::builder() -//! .credentials_provider(credentials) -//! .build(); -//! -//! let config = LakeConfigBuilder::default() -//! .s3_config(s3_config) -//! .s3_bucket_name("near-lake-data-custom") -//! .start_block_height(1) -//! .build() -//! .expect("Failed to build LakeConfig"); -//! # } -//! ``` -//! -//! **You should never hardcode your credentials, it is insecure. Use the described method to pass the credentials you read from CLI arguments** -//! -//! #### File-based AWS credentials -//!AWS default profile configuration with aws configure looks similar to the following: -//! -//!`~/.aws/credentials` -//!``` -//![default] -//!aws_access_key_id=AKIAIOSFODNN7EXAMPLE -//!aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY -//!``` -//! -//![AWS docs: Configuration and credential file settings](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) -//! -//! ### Environmental variables -//! -//! Alternatively, you can provide your AWS credentials via environment variables with constant names: -//! -//!``` -//!$ export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE -//!$ AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY -//!$ AWS_DEFAULT_REGION=eu-central-1 -//!``` -//! -//!### Dependencies -//! -//!Add the following dependencies to your `Cargo.toml` -//! -//!```toml -//!... -//![dependencies] -//!futures = "0.3.5" -//!itertools = "0.10.3" -//!tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] } -//!tokio-stream = { version = "0.1" } -//! -//!# NEAR Lake Framework -//!near-lake-framework = "0.6.1" -//!``` -//! -//! ### Custom S3 storage -//! -//! In case you want to run your own [near-lake](https://github.com/near/near-lake) instance and store data in some S3 compatible storage ([Minio](https://min.io/) or [Localstack](https://localstack.cloud/) as example) -//! You can owerride default S3 API endpoint by using `s3_endpoint` option -//! -//! - run minio -//! -//! ```bash -//! $ mkdir -p /data/near-lake-custom && minio server /data -//! ``` -//! -//! - pass custom `aws_sdk_s3::config::Config` to the [LakeConfigBuilder] -//! -//! ```rust -//! use aws_sdk_s3::Endpoint; -//! use http::Uri; -//! use near_lake_framework::LakeConfigBuilder; -//! -//! # async fn main() { -//! let aws_config = aws_config::from_env().load().await; -//! let mut s3_conf = aws_sdk_s3::config::Builder::from(&aws_config); -//! s3_conf = s3_conf -//! .endpoint_resolver( -//! Endpoint::immutable("http://0.0.0.0:9000".parse::().unwrap())) -//! .build(); -//! -//! let config = LakeConfigBuilder::default() -//! .s3_config(s3_conf) -//! .s3_bucket_name("near-lake-data-custom") -//! .start_block_height(1) -//! .build() -//! .expect("Failed to build LakeConfig"); -//! # } -//! ``` -//! -//! ## Configuration -//! -//! Everything should be configured before the start of your indexer application via `LakeConfigBuilder` struct. -//! -//! Available parameters: -//! -//! * [`start_block_height(value: u64)`](LakeConfigBuilder::start_block_height) - block height to start the stream from -//! * *optional* [`s3_bucket_name(value: impl Into)`](LakeConfigBuilder::s3_bucket_name) - provide the AWS S3 bucket name (you need to provide it if you use custom S3-compatible service, otherwise you can use [LakeConfigBuilder::mainnet] and [LakeConfigBuilder::testnet]) -//! * *optional* [`LakeConfigBuilder::s3_region_name(value: impl Into)`](LakeConfigBuilder::s3_region_name) - provide the AWS S3 region name (if you need to set a custom one) -//! * *optional* [`LakeConfigBuilder::s3_config(value: aws_sdk_s3::config::Config`](LakeConfigBuilder::s3_config) - provide custom AWS SDK S3 Config -//! -//! ## Cost estimates (Updated Mar 10, 2022 with more precise calculations) -//! -//! **TL;DR** approximately $20 per month (for AWS S3 access, paid directly to AWS) for the reading of fresh blocks -//! -//! ### Historical indexing -//! -//! | Blocks | GET | LIST | Subtotal GET | Subtotal LIST | Total $ | -//! |---|---|---|---|---|---| -//! | 1000 | 5000 | 4 | 0.00215 | 0.0000216 | $0.00 | -//! | 86,400 | 432000 | 345.6 | 0.18576 | 0.00186624 | $0.19 | -//! | 2,592,000 | 12960000 | 10368 | 5.5728 | 0.0559872 | $5.63 | -//! | 77,021,059 | 385105295 | 308084.236 | 165.5952769 | 1.663654874 | $167.26 | -//! -//! **Note:** ~77m of blocks is the number of blocks on the moment I was calculating. -//! -// !**84,400 blocks is approximate number of blocks per day** (1 block per second * 60 seconds * 60 minutes * 24 hours) -//! -//! **2,592,000 blocks is approximate number of blocks per months** (86,400 blocks per day * 30 days) -//! -//! ### Tip of the network indexing -//! -//! | Blocks | GET | LIST | Subtotal GET | Subtotal LIST | Total $ | -//! |---|---|---|---|---|---| -//! | 1000 | 5000 | 1000 | 0.00215 | 0.0054 | $0.01 | -//! | 86,400 | 432000 | 86,400 | 0.18576 | 0.46656 | $0.65 | -//! | 2,592,000 | 12960000 | 2,592,000 | 5.5728 | 13.9968 | $19.57 | -//! | 77,021,059 | 385105295 | 77,021,059 | 165.5952769 | 415.9137186 | $581.51 | -//! -//! Explanation: -//! -//! Assuming NEAR Protocol produces accurately 1 block per second (which is really not, the average block production time is 1.3s). A full day consists of 86400 seconds, that's the max number of blocks that can be produced. -//! -//! According the [Amazon S3 prices](https://aws.amazon.com/s3/pricing/?nc1=h_ls) `list` requests are charged for $0.0054 per 1000 requests and `get` is charged for $0.00043 per 1000 requests. -//! -//! Calculations (assuming we are following the tip of the network all the time): -//! -//! ```text -//! 86400 blocks per day * 5 requests for each block / 1000 requests * $0.0004 per 1k requests = $0.19 * 30 days = $5.7 -//! ``` -//! **Note:** 5 requests for each block means we have 4 shards (1 file for common block data and 4 separate files for each shard) -//! -//! And a number of `list` requests we need to perform for 30 days: -//! -//! ```text -//! 86400 blocks per day / 1000 requests * $0.005 per 1k list requests = $0.47 * 30 days = $14.1 -//! -//! $5.7 + $14.1 = $19.8 -//! ``` -//! -//! The price depends on the number of shards -//! -//! ## Future plans -//! -//! We use Milestones with clearly defined acceptance criteria: -//! -//! * [x] [MVP](https://github.com/near/near-lake-framework/milestone/1) -//! * [ ] [0.8 High-level update](https://github.com/near/near-lake-framework-rs/milestone/3) -//! * [ ] [1.0](https://github.com/near/near-lake-framework/milestone/2) -use aws_sdk_s3::Client; - -#[macro_use] -extern crate derive_builder; - -use futures::stream::StreamExt; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; - -pub use near_indexer_primitives; - -pub use aws_credential_types::Credentials; -pub use types::{LakeConfig, LakeConfigBuilder}; - -use s3_fetchers::LakeS3Client; - -mod s3_fetchers; -pub(crate) mod types; - -pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; - -/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage` -/// ``` -/// use near_lake_framework::LakeConfigBuilder; -/// use tokio::sync::mpsc; -/// -/// # async fn main() { -/// let config = LakeConfigBuilder::default() -/// .testnet() -/// .start_block_height(82422587) -/// .build() -/// .expect("Failed to build LakeConfig"); -/// -/// let (_, stream) = near_lake_framework::streamer(config); -/// -/// while let Some(streamer_message) = stream.recv().await { -/// eprintln!("{:#?}", streamer_message); -/// } -/// # } -/// ``` -pub fn streamer( - config: LakeConfig, -) -> ( - tokio::task::JoinHandle>, - mpsc::Receiver, -) { - let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size); - (tokio::spawn(start(sender, config)), receiver) -} - -fn stream_block_heights<'a: 'b, 'b>( - lake_s3_client: &'a LakeS3Client, - s3_bucket_name: &'a str, - mut start_from_block_height: crate::types::BlockHeight, -) -> impl futures::Stream + 'b { - async_stream::stream! { - loop { - tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); - match s3_fetchers::list_block_heights( - lake_s3_client, - s3_bucket_name, - start_from_block_height, - ) - .await { - Ok(block_heights) => { - if block_heights.is_empty() { - tracing::debug!( - target: LAKE_FRAMEWORK, - "There are no newer block heights than {} in bucket {}. Fetching again in 2s...", - start_from_block_height, - s3_bucket_name, - ); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received {} newer block heights", - block_heights.len() - ); - - start_from_block_height = *block_heights.last().unwrap() + 1; - for block_height in block_heights { - tracing::debug!(target: LAKE_FRAMEWORK, "Yielding {} block height...", block_height); - yield block_height; - } - } - Err(err) => { - tracing::warn!( - target: LAKE_FRAMEWORK, - "Failed to get block heights from bucket {}: {}. Retrying in 1s...", - s3_bucket_name, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - } - } -} - -// The only consumer of the BlockHeights Streamer -async fn prefetch_block_heights_into_pool( - pending_block_heights: &mut std::pin::Pin< - &mut impl tokio_stream::Stream, - >, - limit: usize, - await_for_at_least_one: bool, -) -> anyhow::Result> { - let mut block_heights = Vec::with_capacity(limit); - for remaining_limit in (0..limit).rev() { - tracing::debug!(target: LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit); - match futures::poll!(pending_block_heights.next()) { - std::task::Poll::Ready(Some(block_height)) => { - block_heights.push(block_height); - } - std::task::Poll::Pending => { - if await_for_at_least_one && block_heights.is_empty() { - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding..."); - match pending_block_heights.next().await { - Some(block_height) => { - block_heights.push(block_height); - } - None => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - continue; - } - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks."); - break; - } - std::task::Poll::Ready(None) => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - } - Ok(block_heights) -} - -#[allow(unused_labels)] // we use loop labels for code-readability -async fn start( - streamer_message_sink: mpsc::Sender, - config: LakeConfig, -) -> anyhow::Result<()> { - let mut start_from_block_height = config.start_block_height; - - let s3_client = if let Some(config) = config.s3_config { - Client::from_conf(config) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - Client::from_conf(s3_config) - }; - let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone()); - - let mut last_processed_block_hash: Option = None; - - 'main: loop { - // In the beginning of the 'main' loop we create a Block Heights stream - // and prefetch the initial data in that pool. - // Later the 'stream' loop might exit to this 'main' one to repeat the procedure. - // This happens because we assume Lake Indexer that writes to the S3 Bucket might - // in some cases, write N+1 block before it finishes writing the N block. - // We require to stream blocks consistently, so we need to try to load the block again. - - let pending_block_heights = stream_block_heights( - &lake_s3_client, - &config.s3_bucket_name, - start_from_block_height, - ); - tokio::pin!(pending_block_heights); - - let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks...", - config.blocks_preload_pool_size - ); - - streamer_messages_futures.extend( - prefetch_block_heights_into_pool( - &mut pending_block_heights, - config.blocks_preload_pool_size, - true, - ) - .await? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - }), - ); - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Awaiting for the first prefetched block..." - ); - 'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await { - let streamer_message = streamer_message_result.map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to fetch StreamerMessage with error: \n{:#?}", - err, - ); - err - })?; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - // check if we have `last_processed_block_hash` (might be None only on start) - if let Some(prev_block_hash) = last_processed_block_hash { - // compare last_processed_block_hash` with `block.header.prev_hash` of the current - // block (ensure we don't miss anything from S3) - // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step - if prev_block_hash != streamer_message.block.header.prev_hash { - tracing::warn!( - target: LAKE_FRAMEWORK, - "`prev_hash` does not match, refetching the data from S3 in 200ms", - ); - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - break 'stream; - } - } - - // store current block info as `last_processed_block_*` for next iteration - last_processed_block_hash = Some(streamer_message.block.header.hash); - start_from_block_height = streamer_message.block.header.height + 1; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)", - config.blocks_preload_pool_size, - streamer_messages_futures.len(), - ); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Streaming block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - let blocks_preload_pool_current_len = streamer_messages_futures.len(); - - let prefetched_block_heights_future = prefetch_block_heights_into_pool( - &mut pending_block_heights, - config - .blocks_preload_pool_size - .saturating_sub(blocks_preload_pool_current_len), - blocks_preload_pool_current_len == 0, - ); - - let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message); - - let (prefetch_res, send_res): ( - Result, anyhow::Error>, - Result<_, SendError>, - ) = futures::join!( - prefetched_block_heights_future, - streamer_message_sink_send_future, - ); - - if let Err(SendError(err)) = send_res { - tracing::debug!( - target: LAKE_FRAMEWORK, - "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}", - start_from_block_height - 1, - err, - ); - return Ok(()); - } - - streamer_messages_futures.extend( - prefetch_res - .map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}", - err - ); - err - })? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - } - )); - } - - tracing::warn!( - target: LAKE_FRAMEWORK, - "Exited from the 'stream' loop. It may happen in two cases:\n - 1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n - 2. Received a Block which prev_hash doesn't match the previously streamed block.\n - Will attempt to restart the stream from block #{}", - start_from_block_height, - ); - } -}