Skip to content

Commit 25f5e2a

Browse files
committed
add list_namespaces
1 parent 81b4a9b commit 25f5e2a

File tree

3 files changed

+75
-16
lines changed

3 files changed

+75
-16
lines changed

crates/catalog/s3tables/src/catalog.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,35 @@ use std::collections::HashMap;
22

33
use anyhow::anyhow;
44
use async_trait::async_trait;
5-
use aws_config::BehaviorVersion;
65
use iceberg::table::Table;
76
use iceberg::{
87
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
98
TableIdent,
109
};
1110

11+
use crate::utils::create_sdk_config;
12+
13+
#[derive(Debug)]
14+
pub struct S3TablesCatalogConfig {
15+
table_bucket_arn: String,
16+
properties: HashMap<String, String>,
17+
endpoint_url: Option<String>,
18+
}
19+
1220
/// S3Tables catalog implementation.
1321
#[derive(Debug)]
1422
pub struct S3TablesCatalog {
15-
table_bucket_arn: String,
16-
client: aws_sdk_s3tables::Client,
23+
config: S3TablesCatalogConfig,
24+
s3tables_client: aws_sdk_s3tables::Client,
1725
}
1826

1927
impl S3TablesCatalog {
20-
pub async fn new(table_bucket_arn: String) -> Self {
21-
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
22-
let client = aws_sdk_s3tables::Client::new(&config);
28+
pub async fn new(config: S3TablesCatalogConfig) -> Self {
29+
let aws_config = create_sdk_config(&config.properties, config.endpoint_url.clone()).await;
30+
let s3tables_client = aws_sdk_s3tables::Client::new(&aws_config);
2331
Self {
24-
table_bucket_arn,
25-
client,
32+
config,
33+
s3tables_client,
2634
}
2735
}
2836
}
@@ -34,21 +42,16 @@ impl Catalog for S3TablesCatalog {
3442
parent: Option<&NamespaceIdent>,
3543
) -> Result<Vec<NamespaceIdent>> {
3644
let mut req = self
37-
.client
45+
.s3tables_client
3846
.list_namespaces()
39-
.table_bucket_arn(self.table_bucket_arn.clone());
47+
.table_bucket_arn(self.config.table_bucket_arn.clone());
4048
if let Some(parent) = parent {
4149
req = req.prefix(parent.to_url_string());
4250
}
4351
let resp = req.send().await.map_err(from_aws_sdk_error)?;
4452
let mut result = Vec::new();
4553
for ns in resp.namespaces() {
46-
let ns_names = ns.namespace();
47-
result.extend(
48-
ns_names
49-
.into_iter()
50-
.map(|name| NamespaceIdent::new(name.to_string())),
51-
);
54+
result.push(NamespaceIdent::from_vec(ns.namespace().to_vec())?);
5255
}
5356
Ok(result)
5457
}

crates/catalog/s3tables/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@
2020
#![deny(missing_docs)]
2121

2222
mod catalog;
23+
mod utils;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::collections::HashMap;
2+
3+
use aws_config::default_provider::endpoint_url;
4+
use aws_config::{BehaviorVersion, Region, SdkConfig};
5+
use aws_sdk_s3tables::config::Credentials;
6+
7+
/// Property aws profile name
8+
pub const AWS_PROFILE_NAME: &str = "profile_name";
9+
/// Property aws region
10+
pub const AWS_REGION_NAME: &str = "region_name";
11+
/// Property aws access key
12+
pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
13+
/// Property aws secret access key
14+
pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";
15+
/// Property aws session token
16+
pub const AWS_SESSION_TOKEN: &str = "aws_session_token";
17+
18+
/// Creates an aws sdk configuration based on
19+
/// provided properties and an optional endpoint URL.
20+
pub(crate) async fn create_sdk_config(
21+
properties: &HashMap<String, String>,
22+
endpoint_url: Option<String>,
23+
) -> SdkConfig {
24+
let mut config = aws_config::defaults(BehaviorVersion::latest());
25+
26+
if properties.is_empty() {
27+
return config.load().await;
28+
}
29+
30+
if let Some(endpoint_url) = endpoint_url {
31+
config = config.endpoint_url(endpoint_url);
32+
}
33+
34+
if let (Some(access_key), Some(secret_key)) = (
35+
properties.get(AWS_ACCESS_KEY_ID),
36+
properties.get(AWS_SECRET_ACCESS_KEY),
37+
) {
38+
let session_token = properties.get(AWS_SESSION_TOKEN).cloned();
39+
let credentials_provider =
40+
Credentials::new(access_key, secret_key, session_token, None, "properties");
41+
42+
config = config.credentials_provider(credentials_provider)
43+
};
44+
45+
if let Some(profile_name) = properties.get(AWS_PROFILE_NAME) {
46+
config = config.profile_name(profile_name);
47+
}
48+
49+
if let Some(region_name) = properties.get(AWS_REGION_NAME) {
50+
let region = Region::new(region_name.clone());
51+
config = config.region(region);
52+
}
53+
54+
config.load().await
55+
}

0 commit comments

Comments
 (0)