Skip to content

Commit

Permalink
models/journals: region configuration for S3 storage mappings
Browse files Browse the repository at this point in the history
Allows setting the region for S3 storage mappings, for buckets that are not in the us-east-1 region.

See also: gazette/core#365
  • Loading branch information
williamhbaker committed Mar 11, 2024
1 parent 6d90044 commit 26effc7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
27 changes: 19 additions & 8 deletions crates/models/src/journals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl GcsBucketAndPrefix {
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
#[schemars(example = "S3BucketAndPrefix::example")]
pub struct S3BucketAndPrefix {
#[schemars(example = "S3StorageConfig::example")]
pub struct S3StorageConfig {
/// Bucket into which Flow will store data.
#[validate(regex = "S3_BUCKET_RE")]
pub bucket: String,
Expand All @@ -51,24 +51,35 @@ pub struct S3BucketAndPrefix {
#[validate]
#[serde(default)]
pub prefix: Option<Prefix>,

/// AWS region of the S3 bucket. Defaults to us-east-1 if unset, for backward compatibility with
/// pre-existing storage mappings that did not have this configuration and would only work if
/// the bucket was in us-east-1.
pub region: Option<String>,
}

impl S3BucketAndPrefix {
impl S3StorageConfig {
fn as_url(&self) -> url::Url {
// These are validated when we validate storage mappings
// to at least be legal characters in a URI
url::Url::parse(&format!(
let mut u = url::Url::parse(&format!(
"s3://{}/{}",
self.bucket,
self.prefix.as_deref().unwrap_or("")
))
.expect("parsing as URL should never fail")
.expect("parsing as URL should never fail");

u.query_pairs_mut()
.append_pair("region", self.region.as_deref().unwrap_or("us-east-1"));

u
}

pub fn example() -> Self {
Self {
bucket: "my-bucket".to_string(),
prefix: None,
region: None,
}
}
}
Expand All @@ -77,7 +88,7 @@ impl S3BucketAndPrefix {
#[schemars(example = "AzureStorageConfig::example")]
pub struct AzureStorageConfig {
/// The tenant ID that owns the storage account that we're writing into
/// NOTE: This is not the tenant ID that owns the servie principal
/// NOTE: This is not the tenant ID that owns the service principal
pub account_tenant_id: String,

/// Storage accounts in Azure are the equivalent to a "bucket" in S3
Expand Down Expand Up @@ -174,7 +185,7 @@ impl CustomStore {
#[serde(tag = "provider", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Store {
///# Amazon Simple Storage Service.
S3(S3BucketAndPrefix),
S3(S3StorageConfig),
///# Google Cloud Storage.
Gcs(GcsBucketAndPrefix),
///# Azure object storage service.
Expand All @@ -196,7 +207,7 @@ impl Validate for Store {

impl Store {
pub fn example() -> Self {
Self::S3(S3BucketAndPrefix::example())
Self::S3(S3StorageConfig::example())
}
pub fn to_url(&self, catalog_name: &str) -> url::Url {
match self {
Expand Down
4 changes: 2 additions & 2 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef
pub use derive_sqlite::DeriveUsingSqlite;
pub use derive_typescript::DeriveUsingTypescript;
pub use journals::{
CompressionCodec, CustomStore, FragmentTemplate, JournalTemplate, S3BucketAndPrefix,
StorageDef, Store, AZURE_CONTAINER_RE, AZURE_STORAGE_ACCOUNT_RE, GCS_BUCKET_RE, S3_BUCKET_RE,
CompressionCodec, CustomStore, FragmentTemplate, JournalTemplate, S3StorageConfig, StorageDef,
Store, AZURE_CONTAINER_RE, AZURE_STORAGE_ACCOUNT_RE, GCS_BUCKET_RE, S3_BUCKET_RE,
};
pub use materializations::{
MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields,
Expand Down

0 comments on commit 26effc7

Please sign in to comment.