Skip to content

Commit

Permalink
chore: Upgrade AWS SDK dependencies (#91)
Browse files Browse the repository at this point in the history
* chore: Upgrade AWS SDK dependencies

* Expose underlying error messages (from AWS SDK). Update README's custom endpoint example
  • Loading branch information
khorolets authored Nov 22, 2023
1 parent 049b0e3 commit 8c13c16
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.2...HEAD)
- Simpler start boilerplate, simpler structures to deal with!
- Upgrade to latest AWS SDK version (*since beta.3*)

### Breaking changes

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [

# cargo-workspaces
[workspace.package]
version = "0.8.0-beta.2"
version = "0.8.0-beta.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/near/near-lake-framework-rs"
description = "Library to connect to the NEAR Lake S3 and stream the data"
Expand Down
11 changes: 6 additions & 5 deletions lake-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ license.workspace = true
repository.workspace = true

[dependencies]
aws-config = "0.53.0"
aws-types = "0.53.0"
aws-credential-types = "0.53.0"
aws-sdk-s3 = "0.23.0"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"] }
aws-types = "1.0.0"
aws-credential-types = "1.0.0"
aws-sdk-s3 = "0.39.0"
async-stream = "0.3.3"
async-trait = "0.1.64"
derive_builder = "0.11.2"
Expand All @@ -26,7 +26,8 @@ near-lake-primitives = { path = "../lake-primitives", version = "0.8.0-beta.2" }
near-lake-context-derive = { path = "../lake-context-derive", version = "0.8.0-beta.2" }

[dev-dependencies]
aws-smithy-http = "0.53.0"
aws-smithy-http = "0.60.0"
aws-smithy-types = "1.0.0"
# use by examples
anyhow = "1.0.51"

Expand Down
26 changes: 14 additions & 12 deletions lake-framework/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,21 @@ $ mkdir -p /data/near-lake-custom && minio server /data
use near_lake_framework::LakeBuilder;
# #[tokio::main]
# async fn main() {
let aws_config = aws_config::from_env().load().await;
let mut s3_conf = aws_sdk_s3::config::Builder::from(&aws_config)
.endpoint_url("http://0.0.0.0:9000")
.build();
# async fn main() -> anyhow::Result<()> {
let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::config::Builder::from(&aws_types::SdkConfig::from(aws_config))
.endpoint_url("http://0.0.0.0:9000")
.build();
LakeBuilder::default()
.s3_bucket_name("near-lake-custom")
.s3_region_name("eu-central-1")
.start_block_height(0)
.s3_config(s3_config)
.build()
.expect("Failed to build Lake");
let lake = LakeBuilder::default()
.s3_config(s3_conf)
.s3_bucket_name("near-lake-data-custom")
.s3_region_name("eu-central-1")
.start_block_height(1)
.build()
.expect("Failed to build LakeConfig");
# Ok(())
# }
```

Expand Down
41 changes: 25 additions & 16 deletions lake-framework/src/s3_fetchers.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
use async_trait::async_trait;
use std::str::FromStr;

use aws_sdk_s3::output::{GetObjectOutput, ListObjectsV2Output};
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;

#[async_trait]
pub trait S3Client {
async fn get_object(
&self,
bucket: &str,
prefix: &str,
) -> Result<GetObjectOutput, aws_sdk_s3::types::SdkError<aws_sdk_s3::error::GetObjectError>>;
) -> Result<
GetObjectOutput,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
>;

async fn list_objects(
&self,
bucket: &str,
start_after: &str,
) -> Result<
ListObjectsV2Output,
aws_sdk_s3::types::SdkError<aws_sdk_s3::error::ListObjectsV2Error>,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>,
>;
}

Expand All @@ -38,14 +42,16 @@ impl S3Client for LakeS3Client {
&self,
bucket: &str,
prefix: &str,
) -> Result<GetObjectOutput, aws_sdk_s3::types::SdkError<aws_sdk_s3::error::GetObjectError>>
{
) -> Result<
GetObjectOutput,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
Ok(self
.s3
.get_object()
.bucket(bucket)
.key(prefix)
.request_payer(aws_sdk_s3::model::RequestPayer::Requester)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.send()
.await?)
}
Expand All @@ -56,15 +62,15 @@ impl S3Client for LakeS3Client {
start_after: &str,
) -> Result<
ListObjectsV2Output,
aws_sdk_s3::types::SdkError<aws_sdk_s3::error::ListObjectsV2Error>,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>,
> {
Ok(self
.s3
.list_objects_v2()
.max_keys(1000) // 1000 is the default and max value for this parameter
.delimiter("/".to_string())
.start_after(start_after)
.request_payer(aws_sdk_s3::model::RequestPayer::Requester)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.bucket(bucket)
.send()
.await?)
Expand Down Expand Up @@ -218,10 +224,11 @@ mod test {

use async_trait::async_trait;

use aws_sdk_s3::output::{get_object_output, list_objects_v2_output};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::operation::get_object::builders::GetObjectOutputBuilder;
use aws_sdk_s3::operation::list_objects_v2::builders::ListObjectsV2OutputBuilder;
use aws_sdk_s3::primitives::ByteStream;

use aws_smithy_http::body::SdkBody;
use aws_smithy_types::body::SdkBody;

#[derive(Clone, Debug)]
pub struct LakeS3Client {}
Expand All @@ -232,12 +239,14 @@ mod test {
&self,
_bucket: &str,
prefix: &str,
) -> Result<GetObjectOutput, aws_sdk_s3::types::SdkError<aws_sdk_s3::error::GetObjectError>>
{
) -> Result<
GetObjectOutput,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
> {
let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix);
let file_bytes = tokio::fs::read(path).await.unwrap();
let stream = ByteStream::new(SdkBody::from(file_bytes));
Ok(get_object_output::Builder::default().body(stream).build())
Ok(GetObjectOutputBuilder::default().body(stream).build())
}

async fn list_objects(
Expand All @@ -246,9 +255,9 @@ mod test {
_start_after: &str,
) -> Result<
ListObjectsV2Output,
aws_sdk_s3::types::SdkError<aws_sdk_s3::error::ListObjectsV2Error>,
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>,
> {
Ok(list_objects_v2_output::Builder::default().build())
Ok(ListObjectsV2OutputBuilder::default().build())
}
}

Expand Down
15 changes: 8 additions & 7 deletions lake-framework/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,28 @@ pub enum LakeError {
#[from]
error_message: serde_json::Error,
},
#[error("AWS S3 error")]
#[error("AWS S3 error: {error}")]
AwsGetObjectError {
#[from]
error: aws_sdk_s3::types::SdkError<aws_sdk_s3::error::GetObjectError>,
error: aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
},
#[error("AWS S3 error")]
#[error("AWS S3 error: {error}")]
AwsLisObjectsV2Error {
#[from]
error: aws_sdk_s3::types::SdkError<aws_sdk_s3::error::ListObjectsV2Error>,
error:
aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Error>,
},
#[error("Failed to convert integer")]
#[error("Failed to convert integer: {error}")]
IntConversionError {
#[from]
error: std::num::TryFromIntError,
},
#[error("Join error")]
#[error("Join error: {error}")]
JoinError {
#[from]
error: tokio::task::JoinError,
},
#[error("Failed to start runtime")]
#[error("Failed to start runtime: {error}")]
RuntimeStartError {
#[from]
error: std::io::Error,
Expand Down

0 comments on commit 8c13c16

Please sign in to comment.