From acc3610d87d486df2c100bd45e7b57ba52765aa8 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 15 Apr 2024 11:22:50 +1200 Subject: [PATCH] feat: Use custom `S3Client` if configured --- src/lib.rs | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 314456a..11f711d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,6 +253,7 @@ pub use near_indexer_primitives; pub use aws_credential_types::Credentials; pub use types::{LakeConfig, LakeConfigBuilder}; +pub mod s3_client; pub mod s3_fetchers; pub(crate) mod types; @@ -288,7 +289,7 @@ pub fn streamer( } fn stream_block_heights<'a: 'b, 'b>( - lake_s3_client: &'a s3_fetchers::LakeS3Client, + lake_s3_client: &'a dyn s3_client::S3Client, s3_bucket_name: &'a str, mut start_from_block_height: crate::types::BlockHeight, ) -> impl futures::Stream + 'b { @@ -384,16 +385,19 @@ async fn start( ) -> anyhow::Result<()> { let mut start_from_block_height = config.start_block_height; - let s3_client = if let Some(config) = config.s3_config { - aws_sdk_s3::Client::from_conf(config) - } else { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .region(aws_types::region::Region::new(config.s3_region_name)) - .build(); - aws_sdk_s3::Client::from_conf(s3_config) - }; - let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone()); + let lake_s3_client: Box = + if let Some(s3_client) = config.s3_client { + s3_client + } else if let Some(config) = config.s3_config { + Box::new(s3_fetchers::LakeS3Client::from_conf(config)) + } else { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .region(aws_types::region::Region::new(config.s3_region_name)) + .build(); + + Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config)) + }; let mut last_processed_block_hash: Option = None; @@ -406,7 +410,7 @@ async fn start( // We require to stream blocks consistently, so we need to try to load the block again. let pending_block_heights = stream_block_heights( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, start_from_block_height, ); @@ -429,7 +433,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, block_height, ) @@ -530,7 +534,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_s3_client, + &*lake_s3_client, &config.s3_bucket_name, block_height, )