diff --git a/CHANGELOG.md b/CHANGELOG.md index 13f16aa..6332ff2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.9...HEAD) +## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.10...HEAD) + +## [0.7.10](https://github.com/near/near-lake-framework/compare/v0.7.9...0.7.10) + +* Upgrade `near-indexer-primitives` to `0.27.0` (nearcore-2.3.0) +* Added new provider `fastnear` - a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. Check the [FastNear](https://fastnear.com/) and [Near Data Server](https://github.com/fastnear/neardata-server/) to get more details about provider. + +### Breaking Change + +* `s3_fetchers` rename to `fetchers` and move to the `providers` module. New usage example: + ```rust + use near_lake_framework::s3::fetchers::fetch_streamer_message; + use near_lake_framework::fastnear::fetchers::fetch_streamer_message; + ``` +* `s3_client` rename to `client` and move to the `providers` module. New usage example: + ```rust + use near_lake_framework::S3Client; + use near_lake_framework::FastNearClient; + ``` ## [0.7.9](https://github.com/near/near-lake-framework/compare/v0.7.7...0.7.9) diff --git a/Cargo.toml b/Cargo.toml index 2ebd049..f3504c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,11 @@ categories = ["asynchronous", "api-bindings", "network-programming"] keywords = ["near", "near-lake", "near-indexer"] authors = ["Near Inc "] edition = "2021" -rust-version = "1.75.0" +rust-version = "1.81.0" # cargo-workspaces [workspace.metadata.workspaces] -version = "0.7.9" +version = "0.7.10" [dependencies] anyhow = "1.0.79" @@ -25,6 +25,7 @@ async-stream = "0.3.5" async-trait = "0.1.77" derive_builder = "0.13.0" futures = "0.3.30" +reqwest = { version = "0.12.7", features = ["json"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" thiserror = "1.0.56" @@ -32,7 +33,7 @@ tokio = { version = "1.35.1", features = ["sync", "time", "rt", "macros"] } tokio-stream = { version = "0.1.14" } tracing = "0.1.40" -near-indexer-primitives = "0.23.0" +near-indexer-primitives = "0.27.0" [lib] doctest = false diff --git a/README.md b/README.md index 6ef9dde..3c18e64 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,42 @@ $5.7 + $14.1 = $19.8 The price depends on the number of shards +## FastNear provider + +FastNear provides a service to access the NEAR Protocol data + +- [FastNear](https://fastnear.com/) +- [Near Data Server](https://github.com/fastnear/neardata-server/) + +FastNear provider is a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. + +### How to use it: + +```rust +use futures::StreamExt; +use near_lake_framework::FastNearConfigBuilder; + +#[tokio::main] +async fn main() { + + let config = FastNearConfigBuilder::default() + .testnet() + .start_block_height(82422587) + .build() + .expect("Failed to build LakeConfig"); + + let (_, stream) = near_lake_framework::streamer(config); + + while let Some(streamer_message) = stream.recv().await { + eprintln!("{:#?}", streamer_message); + } +} +``` +### How to migrate from Lake to FastNear: + +- Replace `LakeConfigBuilder` with `FastNearConfigBuilder` +- Replace `LakeConfig` with `FastNearConfig` + ## Future plans We use Milestones with clearly defined acceptance criteria: diff --git a/src/lib.rs b/src/lib.rs index 11f711d..bf246a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -233,6 +233,39 @@ //! //! The price depends on the number of shards //! +//! ### FastNear provider +//! +//! FastNear provider is a new way to get the data from NEAR Protocol. It is a separate service that provides the data in a more efficient way. +//! +//! How to use it: +//! +//! ## Example +//! +//! ```rust +//! use futures::StreamExt; +//! use near_lake_framework::FastNearConfigBuilder; +//! +//! #[tokio::main] +//! # async fn main() { +//! let config = FastNearConfigBuilder::default() +//! .testnet() +//! .start_block_height(82422587) +//! .build() +//! .expect("Failed to build LakeConfig"); +//! +//! let (_, stream) = near_lake_framework::streamer(config); +//! +//! while let Some(streamer_message) = stream.recv().await { +//! eprintln!("{:#?}", streamer_message); +//! } +//! # } +//! ``` +//! +//! How to migrate from Lake to FastNear: +//! +//! - Replace `LakeConfigBuilder` with `FastNearConfigBuilder` +//! - Replace `LakeConfig` with `FastNearConfig` +//! //! ## Future plans //! //! We use Milestones with clearly defined acceptance criteria: @@ -244,18 +277,20 @@ #[macro_use] extern crate derive_builder; -use futures::stream::StreamExt; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; - pub use near_indexer_primitives; pub use aws_credential_types::Credentials; -pub use types::{LakeConfig, LakeConfigBuilder}; -pub mod s3_client; -pub mod s3_fetchers; -pub(crate) mod types; +mod providers; + +pub use providers::fastnear; +pub use providers::s3; + +pub use providers::fastnear::client::FastNearClient; +pub use providers::fastnear::types::{FastNearConfig, FastNearConfigBuilder}; + +pub use providers::s3::client::LakeS3Client; +pub use providers::s3::types::{LakeConfig, LakeConfigBuilder}; pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; @@ -278,277 +313,20 @@ pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework"; /// } /// # } /// ``` -pub fn streamer( - config: LakeConfig, +pub fn streamer>( + config: T, ) -> ( tokio::task::JoinHandle>, - mpsc::Receiver, + tokio::sync::mpsc::Receiver, ) { - let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size); - (tokio::spawn(start(sender, config)), receiver) -} - -fn stream_block_heights<'a: 'b, 'b>( - lake_s3_client: &'a dyn s3_client::S3Client, - s3_bucket_name: &'a str, - mut start_from_block_height: crate::types::BlockHeight, -) -> impl futures::Stream + 'b { - async_stream::stream! { - loop { - tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); - match s3_fetchers::list_block_heights( - lake_s3_client, - s3_bucket_name, - start_from_block_height, - ) - .await { - Ok(block_heights) => { - if block_heights.is_empty() { - tracing::debug!( - target: LAKE_FRAMEWORK, - "There are no newer block heights than {} in bucket {}. Fetching again in 2s...", - start_from_block_height, - s3_bucket_name, - ); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - continue; - } - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received {} newer block heights", - block_heights.len() - ); - - start_from_block_height = *block_heights.last().unwrap() + 1; - for block_height in block_heights { - tracing::debug!(target: LAKE_FRAMEWORK, "Yielding {} block height...", block_height); - yield block_height; - } - } - Err(err) => { - tracing::warn!( - target: LAKE_FRAMEWORK, - "Failed to get block heights from bucket {}: {}. Retrying in 1s...", - s3_bucket_name, - err, - ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } + let config: providers::NearLakeFrameworkConfig = config.into(); + let (sender, receiver) = tokio::sync::mpsc::channel(config.blocks_preload_pool_size()); + match config { + providers::NearLakeFrameworkConfig::Lake(config) => { + (tokio::spawn(s3::start(sender, config)), receiver) } - } -} - -// The only consumer of the BlockHeights Streamer -async fn prefetch_block_heights_into_pool( - pending_block_heights: &mut std::pin::Pin< - &mut impl tokio_stream::Stream, - >, - limit: usize, - await_for_at_least_one: bool, -) -> anyhow::Result> { - let mut block_heights = Vec::with_capacity(limit); - for remaining_limit in (0..limit).rev() { - tracing::debug!(target: LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit); - match futures::poll!(pending_block_heights.next()) { - std::task::Poll::Ready(Some(block_height)) => { - block_heights.push(block_height); - } - std::task::Poll::Pending => { - if await_for_at_least_one && block_heights.is_empty() { - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding..."); - match pending_block_heights.next().await { - Some(block_height) => { - block_heights.push(block_height); - } - None => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - continue; - } - tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks."); - break; - } - std::task::Poll::Ready(None) => { - return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); - } - } - } - Ok(block_heights) -} - -#[allow(unused_labels)] // we use loop labels for code-readability -async fn start( - streamer_message_sink: mpsc::Sender, - config: LakeConfig, -) -> anyhow::Result<()> { - let mut start_from_block_height = config.start_block_height; - - let lake_s3_client: Box = - if let Some(s3_client) = config.s3_client { - s3_client - } else if let Some(config) = config.s3_config { - Box::new(s3_fetchers::LakeS3Client::from_conf(config)) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - - Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config)) - }; - - let mut last_processed_block_hash: Option = None; - - 'main: loop { - // In the beginning of the 'main' loop we create a Block Heights stream - // and prefetch the initial data in that pool. - // Later the 'stream' loop might exit to this 'main' one to repeat the procedure. - // This happens because we assume Lake Indexer that writes to the S3 Bucket might - // in some cases, write N+1 block before it finishes writing the N block. - // We require to stream blocks consistently, so we need to try to load the block again. - - let pending_block_heights = stream_block_heights( - &*lake_s3_client, - &config.s3_bucket_name, - start_from_block_height, - ); - tokio::pin!(pending_block_heights); - - let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks...", - config.blocks_preload_pool_size - ); - - streamer_messages_futures.extend( - prefetch_block_heights_into_pool( - &mut pending_block_heights, - config.blocks_preload_pool_size, - true, - ) - .await? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &*lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - }), - ); - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Awaiting for the first prefetched block..." - ); - 'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await { - let streamer_message = streamer_message_result.map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to fetch StreamerMessage with error: \n{:#?}", - err, - ); - err - })?; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Received block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - // check if we have `last_processed_block_hash` (might be None only on start) - if let Some(prev_block_hash) = last_processed_block_hash { - // compare last_processed_block_hash` with `block.header.prev_hash` of the current - // block (ensure we don't miss anything from S3) - // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step - if prev_block_hash != streamer_message.block.header.prev_hash { - tracing::warn!( - target: LAKE_FRAMEWORK, - "`prev_hash` does not match, refetching the data from S3 in 200ms", - ); - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - break 'stream; - } - } - - // store current block info as `last_processed_block_*` for next iteration - last_processed_block_hash = Some(streamer_message.block.header.hash); - start_from_block_height = streamer_message.block.header.height + 1; - - tracing::debug!( - target: LAKE_FRAMEWORK, - "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)", - config.blocks_preload_pool_size, - streamer_messages_futures.len(), - ); - tracing::debug!( - target: LAKE_FRAMEWORK, - "Streaming block #{} ({})", - streamer_message.block.header.height, - streamer_message.block.header.hash - ); - let blocks_preload_pool_current_len = streamer_messages_futures.len(); - - let prefetched_block_heights_future = prefetch_block_heights_into_pool( - &mut pending_block_heights, - config - .blocks_preload_pool_size - .saturating_sub(blocks_preload_pool_current_len), - blocks_preload_pool_current_len == 0, - ); - - let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message); - - let (prefetch_res, send_res): ( - Result, anyhow::Error>, - Result<_, SendError>, - ) = futures::join!( - prefetched_block_heights_future, - streamer_message_sink_send_future, - ); - - if let Err(SendError(err)) = send_res { - tracing::debug!( - target: LAKE_FRAMEWORK, - "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}", - start_from_block_height - 1, - err, - ); - return Ok(()); - } - - streamer_messages_futures.extend( - prefetch_res - .map_err(|err| { - tracing::error!( - target: LAKE_FRAMEWORK, - "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}", - err - ); - err - })? - .into_iter() - .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &*lake_s3_client, - &config.s3_bucket_name, - block_height, - ) - } - )); + providers::NearLakeFrameworkConfig::FastNear(config) => { + (tokio::spawn(fastnear::start(sender, config)), receiver) } - - tracing::warn!( - target: LAKE_FRAMEWORK, - "Exited from the 'stream' loop. It may happen in two cases:\n - 1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n - 2. Received a Block which prev_hash doesn't match the previously streamed block.\n - Will attempt to restart the stream from block #{}", - start_from_block_height, - ); } } diff --git a/src/providers/fastnear/client.rs b/src/providers/fastnear/client.rs new file mode 100644 index 0000000..daba3ef --- /dev/null +++ b/src/providers/fastnear/client.rs @@ -0,0 +1,65 @@ +use super::types; + +/// FastNearClient is a client to interact with the FastNear API +/// It is used to fetch the blocks from the FastNear +#[derive(Clone, Debug)] +pub struct FastNearClient { + endpoint: String, + client: reqwest::Client, +} + +impl FastNearClient { + pub fn new(endpoint: String) -> Self { + Self { + endpoint, + client: reqwest::Client::new(), + } + } + + pub fn from_conf(config: &types::FastNearConfig) -> Self { + Self { + endpoint: config.endpoint.clone(), + client: reqwest::Client::new(), + } + } + + /// Fetches the block from the FastNear API + /// Returns the result in `Option` + /// If the block does not exist, returns `None` + /// If the request fails, returns an error + pub async fn fetch( + &self, + url_path: &str, + ) -> Result, types::FastNearError> { + let url = format!("{}{}", self.endpoint, url_path); + let response = self.client.get(&url).send().await?; + match response.status().as_u16() { + 200 => Ok(response.json().await?), + 404 => Err(response.json::().await?.into()), + _ => Err(types::FastNearError::UnknownError(format!( + "Unexpected status code: {}, Response: {}", + response.status(), + response.text().await? + ))), + } + } + + /// Fetches the block from the FastNear API until it succeeds + /// It retries fetching the block until it gets a successful response + /// Returns the result in `Option` + /// If the block does not exist, returns `None` + pub async fn fetch_until_success( + &self, + url_path: &str, + ) -> Option { + loop { + match self.fetch(url_path).await { + Ok(block) => return block, + Err(err) => { + tracing::warn!(target: crate::LAKE_FRAMEWORK, "Failed to fetch block: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } +} diff --git a/src/providers/fastnear/fetchers.rs b/src/providers/fastnear/fetchers.rs new file mode 100644 index 0000000..62209f0 --- /dev/null +++ b/src/providers/fastnear/fetchers.rs @@ -0,0 +1,170 @@ +use super::client::FastNearClient; +use super::types; + +/// Fetches the last block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` +pub async fn fetch_last_block(client: &FastNearClient) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/last_block/final") + .await + .expect("Failed to fetch last block") +} + +/// Fetches the optimistic block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` +pub async fn fetch_optimistic_block( + client: &FastNearClient, +) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/last_block/optimistic") + .await + .expect("Failed to fetch optimistic block") +} + +/// Fetches the optimistic block from the fastenar +/// This function is used to fetch the optimistic block by height +/// This function will be using endpoint `/v0/block_opt/:block_height` +/// This would be waiting some time until the optimistic block is available +/// Returns `near_indexer_primitives::StreamerMessage` if the block is available +/// Returns `None` if the block height is skipped +pub async fn fetch_optimistic_block_by_height( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Option { + client + .fetch_until_success(&format!("/v0/block_opt/{}", block_height)) + .await +} + +/// Fetches the genesis block from the fastenar +/// Returns `near_indexer_primitives::StreamerMessage` +pub async fn fetch_first_block( + client: &FastNearClient, +) -> near_indexer_primitives::StreamerMessage { + client + .fetch_until_success("/v0/first_block") + .await + .expect("Failed to fetch first block") +} + +/// Fetches the block data from the fastenar by block height +/// Returns the result in `Option` +/// If the block does not exist, returns `None` +pub async fn fetch_streamer_message( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Option { + client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await +} + +/// Fetches the block from the fastenar by block height +/// Returns the result in `near_indexer_primitives::views::BlockView` +/// If the block does not exist, returns an error +pub async fn fetch_block( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Result { + let streamer_message = client.fetch(&format!("/v0/block/{}", block_height)).await?; + if let Some(msg) = streamer_message { + Ok(msg.block) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +/// Fetches the block from the fastenar by block height +/// Returns the result in `near_indexer_primitives::views::BlockView` +/// If the block does not exist, retries fetching the block +pub async fn fetch_block_or_retry( + client: &FastNearClient, + block_height: types::BlockHeight, +) -> Result { + let streamer_message = client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await; + if let Some(msg) = streamer_message { + Ok(msg.block) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +/// Fetches the shard from the fastenar by block height and shard id +/// Returns the result in `near_indexer_primitives::IndexerShard` +/// If the block does not exist, returns an error +pub async fn fetch_shard( + client: &FastNearClient, + block_height: types::BlockHeight, + shard_id: u64, +) -> Result { + let streamer_message = client.fetch(&format!("/v0/block/{}", block_height)).await?; + if let Some(msg) = streamer_message { + Ok(msg + .shards + .iter() + .filter_map(|shard| { + if shard.shard_id == shard_id { + Some(shard.clone()) + } else { + None + } + }) + .next() + .ok_or_else(|| { + types::FastNearError::BlockDoesNotExist(format!( + "Block {} and shard {} does not exist", + block_height, shard_id + )) + })?) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} + +/// Fetches the shard from the fastenar by block height and shard id +/// Returns the result in `near_indexer_primitives::IndexerShard` +/// If the block does not exist, retries fetching the block +pub async fn fetch_shard_or_retry( + client: &FastNearClient, + block_height: types::BlockHeight, + shard_id: u64, +) -> Result { + let streamer_message = client + .fetch_until_success(&format!("/v0/block/{}", block_height)) + .await; + if let Some(msg) = streamer_message { + Ok(msg + .shards + .iter() + .filter_map(|shard| { + if shard.shard_id == shard_id { + Some(shard.clone()) + } else { + None + } + }) + .next() + .ok_or_else(|| { + types::FastNearError::BlockDoesNotExist(format!( + "Block {} and shard {} does not exist", + block_height, shard_id + )) + })?) + } else { + Err(types::FastNearError::BlockDoesNotExist(format!( + "Block {} does not exist", + block_height + ))) + } +} diff --git a/src/providers/fastnear/mod.rs b/src/providers/fastnear/mod.rs new file mode 100644 index 0000000..a201f92 --- /dev/null +++ b/src/providers/fastnear/mod.rs @@ -0,0 +1,82 @@ +use crate::FastNearClient; + +pub mod client; +pub mod fetchers; +pub mod types; + +/// Starts the FastNear provider +/// Fetches the blocks from the FastNear and sends them to the blocks_sink +/// The fetching is done in parallel with multiple threads +/// The number of threads is defined in the FastNearConfig +/// The fetching starts from the start_block_height and continues until the last block +#[allow(unused_labels)] // we use loop labels for code-readability +pub async fn start( + blocks_sink: tokio::sync::mpsc::Sender, + config: types::FastNearConfig, +) -> anyhow::Result<()> { + let client = FastNearClient::from_conf(&config); + let max_num_threads = config.num_threads; + let next_sink_block = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(config.start_block_height)); + 'main: loop { + // In the beginning of the 'main' loop, we fetch the next block height to start fetching from + let start_block_height = next_sink_block.load(std::sync::atomic::Ordering::SeqCst); + let next_fetch_block = + std::sync::Arc::new(std::sync::atomic::AtomicU64::new(start_block_height)); + let last_block_height = fetchers::fetch_last_block(&client) + .await + .block + .header + .height; + let is_backfill = last_block_height > start_block_height + max_num_threads; + let num_threads = if is_backfill { max_num_threads } else { 1 }; + tracing::info!( + target: crate::LAKE_FRAMEWORK, + "Start fetching from block {} to block {} with {} threads. Backfill: {:?}", + start_block_height, + last_block_height, + num_threads, + is_backfill + ); + // starting backfill with multiple threads + let handles = (0..num_threads) + .map(|thread_index| { + let client = client.clone(); + let blocks_sink = blocks_sink.clone(); + let next_fetch_block = next_fetch_block.clone(); + let next_sink_block = next_sink_block.clone(); + tokio::spawn(async move { + 'stream: loop { + let block_height = next_fetch_block.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if is_backfill && block_height > last_block_height { + break 'stream; + } + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Fetching block: {}", thread_index, block_height); + let block = + fetchers::fetch_streamer_message(&client, block_height).await; + 'sender: loop { + let expected_block_height = next_sink_block.load(std::sync::atomic::Ordering::SeqCst); + if expected_block_height < block_height { + tokio::time::sleep(std::time::Duration::from_millis( + block_height - expected_block_height, + )).await; + } else { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Sending block: {}", thread_index, block_height); + break 'sender; + } + } + if let Some(block) = block { + blocks_sink.send(block).await.expect("Failed to send block"); + } else { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "#{}: Skipped block: {}", thread_index, block_height); + } + next_sink_block.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + }) + }) + .collect::>(); + for handle in handles { + handle.await?; + } + } +} diff --git a/src/providers/fastnear/types.rs b/src/providers/fastnear/types.rs new file mode 100644 index 0000000..eafc7ab --- /dev/null +++ b/src/providers/fastnear/types.rs @@ -0,0 +1,127 @@ +/// Type alias represents the block height +pub type BlockHeight = u64; + +/// Configuration struct for Fast NEAR Data Framework +/// NB! Consider using [`FastNearConfigBuilder`] +/// Building the `FastNearConfig` example: +/// ``` +/// use near_lake_framework::FastNearConfigBuilder; +/// +/// # async fn main() { +/// let config = FastNearConfigBuilder::default() +/// .testnet() +/// .start_block_height(82422587) +/// .build() +/// .expect("Failed to build FastNearConfig"); +/// # } +/// ``` +#[derive(Default, Builder)] +#[builder(pattern = "owned")] +pub struct FastNearConfig { + /// Fastnear data endpoint + #[builder(setter(into))] + pub(crate) endpoint: String, + /// Defines the block height to start indexing from + pub(crate) start_block_height: u64, + /// Number of threads to use for fetching data + /// Default: 2 * available threads + #[builder(default = "num_threads_default()")] + pub(crate) num_threads: u64, + #[builder(default = "100")] + pub(crate) blocks_preload_pool_size: usize, +} + +impl FastNearConfigBuilder { + /// Shortcut to set up [FastNearConfigBuilder] for mainnet + /// ``` + /// use near_lake_framework::FastNearConfigBuilder; + /// + /// # async fn main() { + /// let config = FastNearConfigBuilder::default() + /// .mainnet() + /// .start_block_height(65231161) + /// .build() + /// .expect("Failed to build FastNearConfig"); + /// # } + /// ``` + pub fn mainnet(mut self) -> Self { + self.endpoint = Some("https://mainnet.neardata.xyz".to_string()); + self + } + + /// Shortcut to set up [FastNearConfigBuilder] for testnet + /// ``` + /// use near_lake_framework::FastNearConfigBuilder; + /// + /// # async fn main() { + /// let config = FastNearConfigBuilder::default() + /// .testnet() + /// .start_block_height(82422587) + /// .build() + /// .expect("Failed to build FastNearConfig"); + /// # } + /// ``` + pub fn testnet(mut self) -> Self { + self.endpoint = Some("https://testnet.neardata.xyz".to_string()); + self + } +} + +/// Shortcut to set up [FastNearConfigBuilder] num_threads +/// ``` +/// use near_lake_framework::FastNearConfigBuilder; +/// +/// # async fn main() { +/// let config = FastNearConfigBuilder::default() +/// .mainnet() +/// .num_threads(8) +/// .start_block_height(82422587) +/// .build() +/// .expect("Failed to build FastNearConfig"); +/// # } +/// ``` +fn num_threads_default() -> u64 { + // Default to 2 threads if we can't get the number of available threads + let threads = + std::thread::available_parallelism().map_or(2, std::num::NonZeroUsize::get) as u64; + // Double the number of threads to fetch data and process it concurrently in the streamer + threads * 2 +} + +#[derive(Debug, thiserror::Error)] +pub enum FastNearError { + #[error("Block height too high: {0}")] + BlockHeightTooHigh(String), + #[error("Block height too low: {0}")] + BlockHeightTooLow(String), + #[error("Block does not exist: {0}")] + BlockDoesNotExist(String), + #[error("Request error: {0}")] + RequestError(reqwest::Error), + #[error("An unknown error occurred: {0}")] + UnknownError(String), +} + +impl From for FastNearError { + fn from(error: reqwest::Error) -> Self { + FastNearError::RequestError(error) + } +} + +#[derive(Debug, serde::Deserialize)] +pub(crate) struct ErrorResponse { + error: String, + #[serde(rename = "type")] + error_type: String, +} + +impl From for FastNearError { + fn from(response: ErrorResponse) -> Self { + match response.error_type.as_str() { + "BLOCK_DOES_NOT_EXIST" => FastNearError::BlockDoesNotExist(response.error), + "BLOCK_HEIGHT_TOO_HIGH" => FastNearError::BlockHeightTooHigh(response.error), + "BLOCK_HEIGHT_TOO_LOW" => FastNearError::BlockHeightTooLow(response.error), + _ => FastNearError::UnknownError(response.error), + } + } +} diff --git a/src/providers/mod.rs b/src/providers/mod.rs new file mode 100644 index 0000000..056ae8c --- /dev/null +++ b/src/providers/mod.rs @@ -0,0 +1,28 @@ +pub mod fastnear; +pub mod s3; + +pub enum NearLakeFrameworkConfig { + Lake(s3::types::LakeConfig), + FastNear(fastnear::types::FastNearConfig), +} + +impl NearLakeFrameworkConfig { + pub fn blocks_preload_pool_size(&self) -> usize { + match self { + NearLakeFrameworkConfig::Lake(config) => config.blocks_preload_pool_size, + NearLakeFrameworkConfig::FastNear(config) => config.blocks_preload_pool_size, + } + } +} + +impl From for NearLakeFrameworkConfig { + fn from(config: s3::types::LakeConfig) -> Self { + NearLakeFrameworkConfig::Lake(config) + } +} + +impl From for NearLakeFrameworkConfig { + fn from(config: fastnear::types::FastNearConfig) -> Self { + NearLakeFrameworkConfig::FastNear(config) + } +} diff --git a/src/providers/s3/client.rs b/src/providers/s3/client.rs new file mode 100644 index 0000000..70cf91e --- /dev/null +++ b/src/providers/s3/client.rs @@ -0,0 +1,230 @@ +use std::error::Error; +use std::sync::Arc; + +use async_trait::async_trait; + +pub type S3ClientError = Arc; + +#[derive(Debug, thiserror::Error, Clone)] +pub struct GetObjectBytesError(pub S3ClientError); + +impl std::ops::Deref for GetObjectBytesError { + type Target = S3ClientError; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for GetObjectBytesError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "GetObjectBytesError: {}", self.0) + } +} + +impl From> + for GetObjectBytesError +{ + fn from( + error: aws_sdk_s3::error::SdkError, + ) -> Self { + Self(Arc::new(error)) + } +} + +impl From for GetObjectBytesError { + fn from(error: aws_smithy_types::byte_stream::error::Error) -> Self { + Self(Arc::new(error)) + } +} + +#[derive(Debug, thiserror::Error, Clone)] +pub struct ListCommonPrefixesError(pub S3ClientError); + +impl std::fmt::Display for ListCommonPrefixesError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ListCommonPrefixesError: {}", self.0) + } +} + +impl std::ops::Deref for ListCommonPrefixesError { + type Target = S3ClientError; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From> + for ListCommonPrefixesError +{ + fn from( + error: aws_sdk_s3::error::SdkError< + aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error, + >, + ) -> Self { + Self(Arc::new(error)) + } +} + +#[async_trait] +pub trait S3Client: Send + Sync { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError>; + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError>; +} + +#[derive(Clone, Debug)] +pub struct LakeS3Client { + s3: aws_sdk_s3::Client, +} + +impl LakeS3Client { + pub fn new(s3: aws_sdk_s3::Client) -> Self { + Self { s3 } + } + + pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { + let s3_client = aws_sdk_s3::Client::from_conf(config); + + Self { s3: s3_client } + } +} + +#[async_trait] +impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let object = self + .s3 + .get_object() + .bucket(bucket) + .key(prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .send() + .await?; + + let bytes = object.body.collect().await?.into_bytes().to_vec(); + + Ok(bytes) + } + + async fn list_common_prefixes( + &self, + bucket: &str, + start_after_prefix: &str, + ) -> Result, ListCommonPrefixesError> { + let response = self + .s3 + .list_objects_v2() + .max_keys(1000) // 1000 is the default and max value for this parameter + .delimiter("/".to_string()) + .start_after(start_after_prefix) + .request_payer(aws_sdk_s3::types::RequestPayer::Requester) + .bucket(bucket) + .send() + .await?; + + let prefixes = match response.common_prefixes { + None => vec![], + Some(common_prefixes) => common_prefixes + .into_iter() + .filter_map(|common_prefix| common_prefix.prefix) + .collect::>() + .into_iter() + .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) + .collect(), + }; + + Ok(prefixes) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::sync::Arc; + + use crate::providers::s3::fetchers::fetch_streamer_message; + use async_trait::async_trait; + + #[derive(Clone, Debug)] + pub struct LakeS3Client {} + + #[async_trait] + impl S3Client for LakeS3Client { + async fn get_object_bytes( + &self, + _bucket: &str, + prefix: &str, + ) -> Result, GetObjectBytesError> { + let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); + tokio::fs::read(path) + .await + .map_err(|e| GetObjectBytesError(Arc::new(e))) + } + + async fn list_common_prefixes( + &self, + _bucket: &str, + _start_after: &str, + ) -> Result, ListCommonPrefixesError> { + Ok(Vec::new()) + } + } + + #[tokio::test] + async fn deserializes_meta_transactions() { + let lake_client = LakeS3Client {}; + let streamer_message = + fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) + .await + .unwrap(); + + let delegate_action = &streamer_message.shards[0] + .chunk + .as_ref() + .unwrap() + .transactions[0] + .transaction + .actions[0]; + + assert_eq!( + serde_json::to_value(delegate_action).unwrap(), + serde_json::json!({ + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + }) + ); + } +} diff --git a/src/s3_fetchers.rs b/src/providers/s3/fetchers.rs similarity index 51% rename from src/s3_fetchers.rs rename to src/providers/s3/fetchers.rs index a29bb1e..0d360f9 100644 --- a/src/s3_fetchers.rs +++ b/src/providers/s3/fetchers.rs @@ -1,85 +1,14 @@ use std::str::FromStr; -use async_trait::async_trait; - -use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; - -#[derive(Clone, Debug)] -pub struct LakeS3Client { - s3: aws_sdk_s3::Client, -} - -impl LakeS3Client { - pub fn new(s3: aws_sdk_s3::Client) -> Self { - Self { s3 } - } - - pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self { - let s3_client = aws_sdk_s3::Client::from_conf(config); - - Self { s3: s3_client } - } -} - -#[async_trait] -impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let object = self - .s3 - .get_object() - .bucket(bucket) - .key(prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .send() - .await?; - - let bytes = object.body.collect().await?.into_bytes().to_vec(); - - Ok(bytes) - } - - async fn list_common_prefixes( - &self, - bucket: &str, - start_after_prefix: &str, - ) -> Result, ListCommonPrefixesError> { - let response = self - .s3 - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(start_after_prefix) - .request_payer(aws_sdk_s3::types::RequestPayer::Requester) - .bucket(bucket) - .send() - .await?; - - let prefixes = match response.common_prefixes { - None => vec![], - Some(common_prefixes) => common_prefixes - .into_iter() - .filter_map(|common_prefix| common_prefix.prefix) - .collect::>() - .into_iter() - .filter_map(|prefix_string| prefix_string.split('/').next().map(String::from)) - .collect(), - }; - - Ok(prefixes) - } -} +use super::{client::S3Client, types}; /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched pub async fn list_block_heights( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - start_from_block_height: crate::types::BlockHeight, -) -> Result, crate::types::LakeError> { + start_from_block_height: types::BlockHeight, +) -> Result, types::LakeError> { tracing::debug!( target: crate::LAKE_FRAMEWORK, "Fetching block heights from S3, after #{}...", @@ -100,13 +29,13 @@ pub async fn list_block_heights( /// By the given block height gets the objects: /// - block.json /// - shard_N.json -/// Reads the content of the objects and parses as a JSON. -/// Returns the result in `near_indexer_primitives::StreamerMessage` -pub(crate) async fn fetch_streamer_message( +/// Reads the content of the objects and parses as a JSON. +/// Returns the result in `near_indexer_primitives::StreamerMessage` +pub async fn fetch_streamer_message( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { let block_view = fetch_block_or_retry(lake_s3_client, s3_bucket_name, block_height).await?; let fetch_shards_futures = (0..block_view.chunks.len() as u64) @@ -128,8 +57,8 @@ pub(crate) async fn fetch_streamer_message( pub async fn fetch_block( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { let bytes = lake_s3_client .get_object_bytes(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) .await?; @@ -143,13 +72,13 @@ pub async fn fetch_block( pub async fn fetch_block_or_retry( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, -) -> Result { + block_height: types::BlockHeight, +) -> Result { loop { match fetch_block(lake_s3_client, s3_bucket_name, block_height).await { Ok(block_view) => break Ok(block_view), Err(err) => { - if let crate::types::LakeError::S3GetError { ref error } = err { + if let types::LakeError::S3GetError { ref error } = err { if let Some(get_object_error) = error.downcast_ref::() { @@ -188,9 +117,9 @@ pub async fn fetch_block_or_retry( pub async fn fetch_shard( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, + block_height: types::BlockHeight, shard_id: u64, -) -> Result { +) -> Result { let bytes = lake_s3_client .get_object_bytes( s3_bucket_name, @@ -207,14 +136,14 @@ pub async fn fetch_shard( pub async fn fetch_shard_or_retry( lake_s3_client: &dyn S3Client, s3_bucket_name: &str, - block_height: crate::types::BlockHeight, + block_height: types::BlockHeight, shard_id: u64, -) -> Result { +) -> Result { loop { match fetch_shard(lake_s3_client, s3_bucket_name, block_height, shard_id).await { Ok(shard) => break Ok(shard), Err(err) => { - if let crate::types::LakeError::S3ListError { ref error } = err { + if let types::LakeError::S3ListError { ref error } = err { if let Some(list_objects_error) = error.downcast_ref::() { @@ -251,82 +180,3 @@ pub async fn fetch_shard_or_retry( } } } - -#[cfg(test)] -mod test { - use super::*; - - use std::sync::Arc; - - use async_trait::async_trait; - - #[derive(Clone, Debug)] - pub struct LakeS3Client {} - - #[async_trait] - impl S3Client for LakeS3Client { - async fn get_object_bytes( - &self, - _bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError> { - let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); - tokio::fs::read(path) - .await - .map_err(|e| GetObjectBytesError(Arc::new(e))) - } - - async fn list_common_prefixes( - &self, - _bucket: &str, - _start_after: &str, - ) -> Result, ListCommonPrefixesError> { - Ok(Vec::new()) - } - } - - #[tokio::test] - async fn deserializes_meta_transactions() { - let lake_client = LakeS3Client {}; - - let streamer_message = - fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) - .await - .unwrap(); - - let delegate_action = &streamer_message.shards[0] - .chunk - .as_ref() - .unwrap() - .transactions[0] - .transaction - .actions[0]; - - assert_eq!( - serde_json::to_value(delegate_action).unwrap(), - serde_json::json!({ - "Delegate": { - "delegate_action": { - "sender_id": "test.near", - "receiver_id": "test.near", - "actions": [ - { - "AddKey": { - "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", - "access_key": { - "nonce": 0, - "permission": "FullAccess" - } - } - } - ], - "nonce": 879546, - "max_block_height": 100, - "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" - }, - "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" - } - }) - ); - } -} diff --git a/src/providers/s3/mod.rs b/src/providers/s3/mod.rs new file mode 100644 index 0000000..f326d7c --- /dev/null +++ b/src/providers/s3/mod.rs @@ -0,0 +1,271 @@ +use futures::stream::StreamExt; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; + +pub mod client; +pub mod fetchers; +pub mod types; + +fn stream_block_heights<'a: 'b, 'b>( + lake_s3_client: &'a dyn client::S3Client, + s3_bucket_name: &'a str, + mut start_from_block_height: types::BlockHeight, +) -> impl futures::Stream + 'b { + async_stream::stream! { + loop { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); + match fetchers::list_block_heights( + lake_s3_client, + s3_bucket_name, + start_from_block_height, + ) + .await { + Ok(block_heights) => { + if block_heights.is_empty() { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "There are no newer block heights than {} in bucket {}. Fetching again in 2s...", + start_from_block_height, + s3_bucket_name, + ); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + continue; + } + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Received {} newer block heights", + block_heights.len() + ); + + start_from_block_height = *block_heights.last().unwrap() + 1; + for block_height in block_heights { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Yielding {} block height...", block_height); + yield block_height; + } + } + Err(err) => { + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "Failed to get block heights from bucket {}: {}. Retrying in 1s...", + s3_bucket_name, + err, + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } + } +} + +// The only consumer of the BlockHeights Streamer +async fn prefetch_block_heights_into_pool( + pending_block_heights: &mut std::pin::Pin< + &mut impl tokio_stream::Stream, + >, + limit: usize, + await_for_at_least_one: bool, +) -> anyhow::Result> { + let mut block_heights = Vec::with_capacity(limit); + for remaining_limit in (0..limit).rev() { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit); + match futures::poll!(pending_block_heights.next()) { + std::task::Poll::Ready(Some(block_height)) => { + block_heights.push(block_height); + } + std::task::Poll::Pending => { + if await_for_at_least_one && block_heights.is_empty() { + tracing::debug!(target: crate::LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding..."); + match pending_block_heights.next().await { + Some(block_height) => { + block_heights.push(block_height); + } + None => { + return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); + } + } + continue; + } + tracing::debug!(target: crate::LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks."); + break; + } + std::task::Poll::Ready(None) => { + return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite.")); + } + } + } + Ok(block_heights) +} + +#[allow(unused_labels)] // we use loop labels for code-readability +pub(crate) async fn start( + streamer_message_sink: mpsc::Sender, + config: types::LakeConfig, +) -> anyhow::Result<()> { + let mut start_from_block_height = config.start_block_height; + + let lake_s3_client: Box = if let Some(s3_client) = config.s3_client { + s3_client + } else if let Some(config) = config.s3_config { + Box::new(client::LakeS3Client::from_conf(config)) + } else { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .region(aws_types::region::Region::new(config.s3_region_name)) + .build(); + + Box::new(client::LakeS3Client::from_conf(s3_config)) + }; + + let mut last_processed_block_hash: Option = None; + + 'main: loop { + // In the beginning of the 'main' loop we create a Block Heights stream + // and prefetch the initial data in that pool. + // Later the 'stream' loop might exit to this 'main' one to repeat the procedure. + // This happens because we assume Lake Indexer that writes to the S3 Bucket might + // in some cases, write N+1 block before it finishes writing the N block. + // We require to stream blocks consistently, so we need to try to load the block again. + + let pending_block_heights = stream_block_heights( + &*lake_s3_client, + &config.s3_bucket_name, + start_from_block_height, + ); + tokio::pin!(pending_block_heights); + + let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Prefetching up to {} blocks...", + config.blocks_preload_pool_size + ); + + streamer_messages_futures.extend( + prefetch_block_heights_into_pool( + &mut pending_block_heights, + config.blocks_preload_pool_size, + true, + ) + .await? + .into_iter() + .map(|block_height| { + fetchers::fetch_streamer_message( + &*lake_s3_client, + &config.s3_bucket_name, + block_height, + ) + }), + ); + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Awaiting for the first prefetched block..." + ); + 'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await { + let streamer_message = streamer_message_result.map_err(|err| { + tracing::error!( + target: crate::LAKE_FRAMEWORK, + "Failed to fetch StreamerMessage with error: \n{:#?}", + err, + ); + err + })?; + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Received block #{} ({})", + streamer_message.block.header.height, + streamer_message.block.header.hash + ); + // check if we have `last_processed_block_hash` (might be None only on start) + if let Some(prev_block_hash) = last_processed_block_hash { + // compare last_processed_block_hash` with `block.header.prev_hash` of the current + // block (ensure we don't miss anything from S3) + // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step + if prev_block_hash != streamer_message.block.header.prev_hash { + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "`prev_hash` does not match, refetching the data from S3 in 200ms", + ); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + break 'stream; + } + } + + // store current block info as `last_processed_block_*` for next iteration + last_processed_block_hash = Some(streamer_message.block.header.hash); + start_from_block_height = streamer_message.block.header.height + 1; + + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)", + config.blocks_preload_pool_size, + streamer_messages_futures.len(), + ); + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Streaming block #{} ({})", + streamer_message.block.header.height, + streamer_message.block.header.hash + ); + let blocks_preload_pool_current_len = streamer_messages_futures.len(); + + let prefetched_block_heights_future = prefetch_block_heights_into_pool( + &mut pending_block_heights, + config + .blocks_preload_pool_size + .saturating_sub(blocks_preload_pool_current_len), + blocks_preload_pool_current_len == 0, + ); + + let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message); + + let (prefetch_res, send_res): ( + Result, anyhow::Error>, + Result<_, SendError>, + ) = futures::join!( + prefetched_block_heights_future, + streamer_message_sink_send_future, + ); + + if let Err(SendError(err)) = send_res { + tracing::debug!( + target: crate::LAKE_FRAMEWORK, + "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}", + start_from_block_height - 1, + err, + ); + return Ok(()); + } + + streamer_messages_futures.extend( + prefetch_res + .map_err(|err| { + tracing::error!( + target: crate::LAKE_FRAMEWORK, + "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}", + err + ); + err + })? + .into_iter() + .map(|block_height| { + fetchers::fetch_streamer_message( + &*lake_s3_client, + &config.s3_bucket_name, + block_height, + ) + } + )); + } + + tracing::warn!( + target: crate::LAKE_FRAMEWORK, + "Exited from the 'stream' loop. It may happen in two cases:\n + 1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n + 2. Received a Block which prev_hash doesn't match the previously streamed block.\n + Will attempt to restart the stream from block #{}", + start_from_block_height, + ); + } +} diff --git a/src/types.rs b/src/providers/s3/types.rs similarity index 98% rename from src/types.rs rename to src/providers/s3/types.rs index 5a0e5b3..da76a94 100644 --- a/src/types.rs +++ b/src/providers/s3/types.rs @@ -1,4 +1,4 @@ -use crate::s3_client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; +use super::client::{GetObjectBytesError, ListCommonPrefixesError, S3Client}; /// Type alias represents the block height pub type BlockHeight = u64; diff --git a/src/s3_client.rs b/src/s3_client.rs deleted file mode 100644 index 6d718b4..0000000 --- a/src/s3_client.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::error::Error; -use std::sync::Arc; - -use async_trait::async_trait; - -pub type S3ClientError = Arc; - -#[derive(Debug, thiserror::Error, Clone)] -pub struct GetObjectBytesError(pub S3ClientError); - -impl std::ops::Deref for GetObjectBytesError { - type Target = S3ClientError; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::fmt::Display for GetObjectBytesError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "GetObjectBytesError: {}", self.0) - } -} - -impl From> - for GetObjectBytesError -{ - fn from( - error: aws_sdk_s3::error::SdkError, - ) -> Self { - Self(Arc::new(error)) - } -} - -impl From for GetObjectBytesError { - fn from(error: aws_smithy_types::byte_stream::error::Error) -> Self { - Self(Arc::new(error)) - } -} - -#[derive(Debug, thiserror::Error, Clone)] -pub struct ListCommonPrefixesError(pub S3ClientError); - -impl std::fmt::Display for ListCommonPrefixesError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ListCommonPrefixesError: {}", self.0) - } -} - -impl std::ops::Deref for ListCommonPrefixesError { - type Target = S3ClientError; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl From> - for ListCommonPrefixesError -{ - fn from( - error: aws_sdk_s3::error::SdkError< - aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error, - >, - ) -> Self { - Self(Arc::new(error)) - } -} - -#[async_trait] -pub trait S3Client: Send + Sync { - async fn get_object_bytes( - &self, - bucket: &str, - prefix: &str, - ) -> Result, GetObjectBytesError>; - - async fn list_common_prefixes( - &self, - bucket: &str, - start_after_prefix: &str, - ) -> Result, ListCommonPrefixesError>; -}