Skip to content

Commit

Permalink
Add support for custom storage endpoints
Browse files Browse the repository at this point in the history
Adds a new `CUSTOM` variant of storage mappings, which allows catalogs
to use a variety of S3-compatible storage services by specifying the
`endpoint` explicitly. This is not yet directly exposed to end-users,
since `storageMappings` are handled by the control plane. But it does
give us the ability to use custom storage endpoints by configuring the
storage mappings using something like:

```
{"stores":[{"provider":"CUSTOM","bucket":"the-bucket","endpoint":"some.storage.endpoint.com"}]}
```

Credentials are handled by using the tenant name of each task or
collection as a `profile` in the journal storage URI. This profile
is understood by the brokers and is looked up in `~/.aws/credentials` and
`~/.aws/config` to provide the credentials and region configuration.

In order to prevent any `CUSTOM` storage endpoints from using the
`default` aws config values, an additional validation was added
to ensure that tenant names cannot be `default`.

One thing to point is that the catalog JSON schema now isn't able to
mark the "provider" field as required, due to SchemaRS lacking
[support for internally tagged enums](GREsau/schemars#39).
I'm thinking this isn't actually a huge deal since end users don't edit
storage mappings in catalog specs anyway. So I'm inclined to leave that
as it is for right now.
  • Loading branch information
psFried committed Jan 30, 2023
1 parent 92141de commit 641cf97
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 78 deletions.
7 changes: 5 additions & 2 deletions crates/assemble/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ pub fn partition_template(
path_postfix_template,
refresh_interval,
retention,
stores: stores.iter().map(|s| s.to_url().into()).collect(),
stores: stores
.iter()
.map(|s| s.to_url(&collection).into())
.collect(),
}),
flags,
labels: Some(broker::LabelSet { labels }),
Expand Down Expand Up @@ -225,7 +228,7 @@ pub fn recovery_log_template(
path_postfix_template,
refresh_interval,
retention,
stores: stores.iter().map(|s| s.to_url().into()).collect(),
stores: stores.iter().map(|s| s.to_url(task_name).into()).collect(),
}),
flags,
labels: Some(broker::LabelSet { labels }),
Expand Down
167 changes: 127 additions & 40 deletions crates/models/src/journals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,59 @@ use serde::{Deserialize, Serialize};
use std::time::Duration;
use validator::Validate;

/// BucketType is a provider of object storage buckets,
/// which are used to durably storage journal fragments.
#[derive(Deserialize, Debug, Serialize, JsonSchema, Clone)]
#[serde(deny_unknown_fields, rename_all = "SCREAMING_SNAKE_CASE")]
#[schemars(example = "BucketType::example")]
pub enum BucketType {
///# Google Cloud Storage.
Gcs,
///# Amazon Simple Storage Service.
S3,
///# Azure object storage service.
Azure,
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
#[schemars(example = "BucketAndPrefix::example")]
pub struct BucketAndPrefix {
/// Bucket into which Flow will store data.
#[validate(regex = "BUCKET_RE")]
bucket: String,

/// Optional prefix of keys written to the bucket.
#[validate]
#[serde(default)]
prefix: Option<Prefix>,
}

impl BucketType {
impl BucketAndPrefix {
fn bucket_and_prefix(&self) -> (&str, &str) {
(self.bucket.as_str(), self.prefix.as_deref().unwrap_or(""))
}

pub fn example() -> Self {
BucketType::S3
Self {
bucket: "my-bucket".to_string(),
prefix: None,
}
}
}

/// Details of an s3-compatible storage endpoint, such as Minio or R2.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
#[schemars(example = "CustomStore::example")]
pub struct CustomStore {
/// Bucket into which Flow will store data.
#[validate(regex = "BUCKET_RE")]
pub bucket: String,
/// endpoint is required when provider is "custom", and specifies the
/// address of an s3-compatible storage provider.
pub endpoint: StorageEndpoint,
/// Optional prefix of keys written to the bucket.
#[validate]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prefix: Option<Prefix>,
}

impl CustomStore {
pub fn example() -> Self {
Self {
bucket: "my-bucket".to_string(),
endpoint: StorageEndpoint::example(),
prefix: None,
}
}

fn bucket_and_prefix(&self) -> (&str, &str) {
(self.bucket.as_str(), self.prefix.as_deref().unwrap_or(""))
}
}

Expand All @@ -39,37 +75,53 @@ impl BucketType {
///
/// s3://my-bucket/a/prefix/example/events/region=EU/utc_date=2021-10-25/utc_hour=13/000123-000456-789abcdef.gzip
///
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)]
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[schemars(example = "Store::example")]
pub struct Store {
/// Cloud storage provider.
pub provider: BucketType,
/// Bucket into which Flow will store data.
#[validate(regex = "BUCKET_RE")]
pub bucket: String,
/// Optional prefix of keys written to the bucket.
#[validate]
#[serde(default)]
pub prefix: Option<Prefix>,
#[serde(tag = "provider", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Store {
///# Amazon Simple Storage Service.
S3(BucketAndPrefix),
///# Google Cloud Storage.
Gcs(BucketAndPrefix),
///# Azure object storage service.
Azure(BucketAndPrefix),
///# An S3-compatible endpoint
Custom(CustomStore),
}

impl Store {
pub fn to_url(&self) -> url::Url {
let scheme = match self.provider {
BucketType::Azure => "azure",
BucketType::Gcs => "gs",
BucketType::S3 => "s3",
};
let prefix = self.prefix.as_ref().map(Prefix::as_str).unwrap_or("");
url::Url::parse(&format!("{}://{}/{}", scheme, self.bucket, prefix))
.expect("parsing as URL should never fail")
impl Validate for Store {
fn validate(&self) -> Result<(), validator::ValidationErrors> {
match self {
Self::S3(s) | Self::Gcs(s) | Self::Azure(s) => s.validate(),
Self::Custom(s) => s.validate(),
}
}
}

impl Store {
pub fn example() -> Self {
Self {
provider: BucketType::S3,
bucket: "my-bucket".to_string(),
prefix: None,
Self::S3(BucketAndPrefix::example())
}
pub fn to_url(&self, catalog_name: &str) -> url::Url {
let (scheme, (bucket, prefix)) = match self {
Self::S3(cfg) => ("s3", cfg.bucket_and_prefix()),
Self::Gcs(cfg) => ("gs", cfg.bucket_and_prefix()),
Self::Azure(cfg) => ("azure", cfg.bucket_and_prefix()),
// Custom storage endpoints are expected to be s3-compatible, and thus use the s3 scheme
Self::Custom(cfg) => ("s3", cfg.bucket_and_prefix()),
};
let mut url = url::Url::parse(&format!("{}://{}/{}", scheme, bucket, prefix))
.expect("parsing as URL should never fail");
if let Store::Custom(cfg) = self {
let tenant = catalog_name
.split_once('/')
.expect("invalid catalog_name passed to Store::to_url")
.0;
url.query_pairs_mut()
.append_pair("profile", tenant)
.append_pair("endpoint", &cfg.endpoint);
}
url
}
}

Expand Down Expand Up @@ -214,7 +266,7 @@ lazy_static! {

#[cfg(test)]
mod test {
use super::BUCKET_RE;
use super::*;

#[test]
fn test_regexes() {
Expand All @@ -227,4 +279,39 @@ mod test {
assert!(BUCKET_RE.is_match(case) == expect);
}
}

// The representation of Store was changed from a struct to an enum, so this test is ensuring
// that existing Stores will deserialize properly with the new representation.
#[test]
fn old_store_json_still_deserializes_into_new_enum() {
let actual: Store =
serde_json::from_str(r#"{"provider":"GCS","prefix":"flow/","bucket":"test-bucket"}"#)
.expect("failed to deserialize");
let Store::Gcs(b_and_p) = actual else {
panic!("expected a gcs store, got: {:?}", actual);
};
assert_eq!("test-bucket", &b_and_p.bucket);
assert_eq!(Some("flow/"), b_and_p.prefix.as_deref());
}

#[test]
fn custom_storage_endpoint() {
let actual: Store = serde_json::from_str(
r#"{"provider":"CUSTOM","prefix":"test/","bucket":"test-bucket", "endpoint": "http://canary.test:1234"}"#,
).expect("failed to deserialize");
let Store::Custom(cfg) = &actual else {
panic!("expected a custom store, got: {:?}", actual);
};
assert_eq!("http://canary.test:1234", cfg.endpoint.as_str());
assert_eq!("test-bucket", &cfg.bucket);
assert_eq!(Some("test/"), cfg.prefix.as_deref());

actual.validate().expect("failed validation");

let actual_url = actual.to_url("testTenant/foo").to_string();
assert_eq!(
"s3://test-bucket/test/?profile=testTenant&endpoint=http%3A%2F%2Fcanary.test%3A1234",
&actual_url
);
}
}
5 changes: 3 additions & 2 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ pub use derivations::{
Derivation, Publish, Register, TransformDef, TransformSource, TypescriptModule, Update,
};
pub use journals::{
BucketType, CompressionCodec, FragmentTemplate, JournalTemplate, StorageDef, Store,
BucketAndPrefix, CompressionCodec, CustomStore, FragmentTemplate, JournalTemplate, StorageDef,
Store,
};
pub use materializations::{
MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields,
SqliteConfig,
};
pub use references::{
Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, PartitionField, Prefix,
RelativeUrl, Test, Transform,
RelativeUrl, StorageEndpoint, Test, Transform,
};
pub use resources::{ContentType, Import, ResourceDef};
pub use schemas::Schema;
Expand Down
28 changes: 27 additions & 1 deletion crates/models/src/references.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy_static! {
static ref FIELD_RE: Regex = Regex::new(&[&JSON_POINTER_CHAR, "+(/", &JSON_POINTER_CHAR, "+)*"].concat()).unwrap();
// RELATIVE_URL_RE matches a relative or absolute URL. It's quite permissive, prohibiting only a space.
static ref RELATIVE_URL_RE: Regex = Regex::new(&["[^", &SPACE_CHAR, "]+"].concat()).unwrap();
static ref ENDPOINT_RE: Regex = Regex::new(r#"^(http://|https://)?[a-z0-9]+[a-z0-9\.:-]*[a-z0-9]+"#).unwrap();
}

macro_rules! string_reference_types {
Expand Down Expand Up @@ -201,6 +202,9 @@ string_reference_types! {
/// with respect to the current resource (i.e, ../path/to/flow.yaml),
/// or may be an external absolute URL (i.e., http://example/flow.yaml).
pub struct RelativeUrl("RelativeUrl::schema", pattern = RELATIVE_URL_RE, example = "https://example/resource");

/// An address for a custom storage endpoint
pub struct StorageEndpoint("StorageEndpoint::schema", pattern = ENDPOINT_RE, example = "storage.example.com");
}

impl RelativeUrl {
Expand Down Expand Up @@ -247,7 +251,9 @@ impl Validate for CompositeKey {

#[cfg(test)]
mod test {
use super::{Collection, Field, JsonPointer, Prefix, RelativeUrl, Transform, Validate};
use super::{
Collection, Field, JsonPointer, Prefix, RelativeUrl, StorageEndpoint, Transform, Validate,
};

#[test]
fn test_token_re() {
Expand Down Expand Up @@ -373,4 +379,24 @@ mod test {
}
}
}

#[test]
fn test_custom_storage_endpoint() {
for (case, expect) in [
("https://github.com/a/path?query#/and/stuff", false),
("", false), // Cannot be empty
("foo.bar:123", true),
("cannot have a space", false),
("http://test.test:345", true),
("https://test.test:456", true),
("123.45.67.8:567", true),
] {
let out = StorageEndpoint::new(case).validate();
if expect {
out.unwrap();
} else {
out.unwrap_err();
}
}
}
}
Loading

0 comments on commit 641cf97

Please sign in to comment.