Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iceberg): support rest authentication #19406

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 93 additions & 32 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ pub struct IcebergCommon {
/// Full name of table, must include schema name.
#[serde(rename = "table.name")]
pub table_name: String,
/// Credential for accessing iceberg catalog, only applicable in rest catalog.
/// A credential to exchange for a token in the OAuth2 client credentials flow.
#[serde(rename = "catalog.credential")]
pub credential: Option<String>,
/// token for accessing iceberg catalog, only applicable in rest catalog.
/// A Bearer token which will be used for interaction with the server.
#[serde(rename = "catalog.token")]
pub token: Option<String>,
/// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
/// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
#[serde(rename = "catalog.oauth2-server-uri")]
pub oauth2_server_uri: Option<String>,
/// scope for accessing iceberg catalog, only applicable in rest catalog.
/// Additional scope for OAuth2.
#[serde(rename = "catalog.scope")]
pub scope: Option<String>,

#[serde(
rename = "s3.path.style.access",
Expand Down Expand Up @@ -145,20 +161,32 @@ impl IcebergCommon {
match &self.warehouse_path {
Some(warehouse_path) => {
let (bucket, _) = {
let url = Url::parse(warehouse_path).with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
let url = Url::parse(warehouse_path);
if url.is_err() && catalog_type == "rest" {
// If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
// so we allow it to pass here.
(None, None)
} else {
let url = url.with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!(
"Invalid s3 path: {}, bucket is missing",
warehouse_path
)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(Some(bucket), Some(root))
}
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
if let Some(bucket) = bucket {
iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
}
}
None => {
if catalog_type != "rest" {
Expand Down Expand Up @@ -219,29 +247,48 @@ impl IcebergCommon {
path_style_access.to_string(),
);
}
if matches!(self.catalog_type.as_deref(), Some("glue")) {
java_catalog_configs.insert(
"client.credentials-provider".to_string(),
"com.risingwave.connector.catalog.GlueCredentialProvider".to_string(),
);
// Use S3 ak/sk and region as glue ak/sk and region by default.
// TODO: use different ak/sk and region for s3 and glue.
java_catalog_configs.insert(
"client.credentials-provider.glue.access-key-id".to_string(),
self.access_key.clone().to_string(),
);
java_catalog_configs.insert(
"client.credentials-provider.glue.secret-access-key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(region) = &self.region {
java_catalog_configs
.insert("client.region".to_string(), region.clone().to_string());

match self.catalog_type.as_deref() {
Some("rest") => {
if let Some(credential) = &self.credential {
java_catalog_configs.insert("credential".to_string(), credential.clone());
}
if let Some(token) = &self.token {
java_catalog_configs.insert("token".to_string(), token.clone());
}
if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
java_catalog_configs
.insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone());
}
if let Some(scope) = &self.scope {
java_catalog_configs.insert("scope".to_string(), scope.clone());
}
}
Some("glue") => {
java_catalog_configs.insert(
"client.credentials-provider".to_string(),
"com.risingwave.connector.catalog.GlueCredentialProvider".to_string(),
);
// Use S3 ak/sk and region as glue ak/sk and region by default.
// TODO: use different ak/sk and region for s3 and glue.
java_catalog_configs.insert(
"client.credentials-provider.glue.access-key-id".to_string(),
self.access_key.clone().to_string(),
);
java_catalog_configs.insert(
"glue.endpoint".to_string(),
format!("https://glue.{}.amazonaws.com", region),
"client.credentials-provider.glue.secret-access-key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(region) = &self.region {
java_catalog_configs
.insert("client.region".to_string(), region.clone().to_string());
java_catalog_configs.insert(
"glue.endpoint".to_string(),
format!("https://glue.{}.amazonaws.com", region),
);
}
}
_ => {}
}
}

Expand Down Expand Up @@ -492,6 +539,20 @@ mod v2 {
S3_SECRET_ACCESS_KEY.to_string(),
self.secret_key.clone().to_string(),
);
if let Some(credential) = &self.credential {
iceberg_configs.insert("credential".to_string(), credential.clone());
}
if let Some(token) = &self.token {
iceberg_configs.insert("token".to_string(), token.clone());
}
if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
iceberg_configs
.insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone());
}
if let Some(scope) = &self.scope {
iceberg_configs.insert("scope".to_string(), scope.clone());
}

let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder()
.uri(self.catalog_uri.clone().with_context(|| {
"`catalog.uri` must be set in rest catalog".to_string()
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use risingwave_pb::connector_service::SinkMetadata;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use url::Url;
use with_options::WithOptions;

use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
Expand Down Expand Up @@ -283,7 +284,18 @@ impl IcebergSink {
names.push(self.config.common.table_name.to_string());
match &self.config.common.warehouse_path {
Some(warehouse_path) => {
if warehouse_path.ends_with('/') {
let url = Url::parse(warehouse_path);
if url.is_err() {
// For rest catalog, the warehouse_path could be a warehouse name.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use another option for warehouse name?

The rustdoc of warehouse_path says "only applicable in storage catalog"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, spark uses warehouse key to allow users to specify warehouse name or warehouse path. However, in our system, we use warehouse.path instead. Maybe we should use a warehouse alias.

// In this case, we should specify the location when creating a table.
if self.config.common.catalog_type() == "rest"
|| self.config.common.catalog_type() == "rest_rust"
{
None
} else {
bail!(format!("Invalid warehouse path: {}", warehouse_path))
}
} else if warehouse_path.ends_with('/') {
Some(format!("{}{}", warehouse_path, names.join("/")))
} else {
Some(format!("{}/{}", warehouse_path, names.join("/")))
Expand Down Expand Up @@ -1017,6 +1029,10 @@ mod test {
database_name: Some("demo_db".to_string()),
table_name: "demo_table".to_string(),
path_style_access: Some(true),
credential: None,
oauth2_server_uri: None,
scope: None,
token: None,
},
r#type: "upsert".to_string(),
force_append_only: false,
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,30 @@ IcebergConfig:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: catalog.credential
field_type: String
comments: |-
Credential for accessing iceberg catalog, only applicable in rest catalog.
A credential to exchange for a token in the OAuth2 client credentials flow.
required: false
- name: catalog.token
field_type: String
comments: |-
token for accessing iceberg catalog, only applicable in rest catalog.
A Bearer token which will be used for interaction with the server.
required: false
- name: catalog.oauth2-server-uri
field_type: String
comments: |-
`oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
required: false
- name: catalog.scope
field_type: String
comments: |-
scope for accessing iceberg catalog, only applicable in rest catalog.
Additional scope for OAuth2.
required: false
- name: s3.path.style.access
field_type: bool
required: false
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,30 @@ IcebergProperties:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: catalog.credential
field_type: String
comments: |-
Credential for accessing iceberg catalog, only applicable in rest catalog.
A credential to exchange for a token in the OAuth2 client credentials flow.
required: false
- name: catalog.token
field_type: String
comments: |-
token for accessing iceberg catalog, only applicable in rest catalog.
A Bearer token which will be used for interaction with the server.
required: false
- name: catalog.oauth2-server-uri
field_type: String
comments: |-
`oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
required: false
- name: catalog.scope
field_type: String
comments: |-
scope for accessing iceberg catalog, only applicable in rest catalog.
Additional scope for OAuth2.
required: false
- name: s3.path.style.access
field_type: bool
required: false
Expand Down
Loading