diff --git a/crates/assemble/src/lib.rs b/crates/assemble/src/lib.rs index 6dfabe1112..527b2f5a6c 100644 --- a/crates/assemble/src/lib.rs +++ b/crates/assemble/src/lib.rs @@ -141,7 +141,10 @@ pub fn partition_template( path_postfix_template, refresh_interval, retention, - stores: stores.iter().map(|s| s.to_url().into()).collect(), + stores: stores + .iter() + .map(|s| s.to_url(&collection).into()) + .collect(), }), flags, labels: Some(broker::LabelSet { labels }), @@ -225,7 +228,7 @@ pub fn recovery_log_template( path_postfix_template, refresh_interval, retention, - stores: stores.iter().map(|s| s.to_url().into()).collect(), + stores: stores.iter().map(|s| s.to_url(task_name).into()).collect(), }), flags, labels: Some(broker::LabelSet { labels }), diff --git a/crates/models/src/journals.rs b/crates/models/src/journals.rs index 9f5c2b6c60..060d99b48f 100644 --- a/crates/models/src/journals.rs +++ b/crates/models/src/journals.rs @@ -7,23 +7,59 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use validator::Validate; -/// BucketType is a provider of object storage buckets, -/// which are used to durably storage journal fragments. -#[derive(Deserialize, Debug, Serialize, JsonSchema, Clone)] -#[serde(deny_unknown_fields, rename_all = "SCREAMING_SNAKE_CASE")] -#[schemars(example = "BucketType::example")] -pub enum BucketType { - ///# Google Cloud Storage. - Gcs, - ///# Amazon Simple Storage Service. - S3, - ///# Azure object storage service. - Azure, +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] +#[schemars(example = "BucketAndPrefix::example")] +pub struct BucketAndPrefix { + /// Bucket into which Flow will store data. + #[validate(regex = "BUCKET_RE")] + bucket: String, + + /// Optional prefix of keys written to the bucket. + #[validate] + #[serde(default)] + prefix: Option, } -impl BucketType { +impl BucketAndPrefix { + fn bucket_and_prefix(&self) -> (&str, &str) { + (self.bucket.as_str(), self.prefix.as_deref().unwrap_or("")) + } + pub fn example() -> Self { - BucketType::S3 + Self { + bucket: "my-bucket".to_string(), + prefix: None, + } + } +} + +/// Details of an s3-compatible storage endpoint, such as Minio or R2. +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] +#[schemars(example = "CustomStore::example")] +pub struct CustomStore { + /// Bucket into which Flow will store data. + #[validate(regex = "BUCKET_RE")] + pub bucket: String, + /// endpoint is required when provider is "custom", and specifies the + /// address of an s3-compatible storage provider. + pub endpoint: StorageEndpoint, + /// Optional prefix of keys written to the bucket. + #[validate] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub prefix: Option, +} + +impl CustomStore { + pub fn example() -> Self { + Self { + bucket: "my-bucket".to_string(), + endpoint: StorageEndpoint::example(), + prefix: None, + } + } + + fn bucket_and_prefix(&self) -> (&str, &str) { + (self.bucket.as_str(), self.prefix.as_deref().unwrap_or("")) } } @@ -39,37 +75,53 @@ impl BucketType { /// /// s3://my-bucket/a/prefix/example/events/region=EU/utc_date=2021-10-25/utc_hour=13/000123-000456-789abcdef.gzip /// -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, Validate)] +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] #[schemars(example = "Store::example")] -pub struct Store { - /// Cloud storage provider. - pub provider: BucketType, - /// Bucket into which Flow will store data. - #[validate(regex = "BUCKET_RE")] - pub bucket: String, - /// Optional prefix of keys written to the bucket. - #[validate] - #[serde(default)] - pub prefix: Option, +#[serde(tag = "provider", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Store { + ///# Amazon Simple Storage Service. + S3(BucketAndPrefix), + ///# Google Cloud Storage. + Gcs(BucketAndPrefix), + ///# Azure object storage service. + Azure(BucketAndPrefix), + ///# An S3-compatible endpoint + Custom(CustomStore), } -impl Store { - pub fn to_url(&self) -> url::Url { - let scheme = match self.provider { - BucketType::Azure => "azure", - BucketType::Gcs => "gs", - BucketType::S3 => "s3", - }; - let prefix = self.prefix.as_ref().map(Prefix::as_str).unwrap_or(""); - url::Url::parse(&format!("{}://{}/{}", scheme, self.bucket, prefix)) - .expect("parsing as URL should never fail") +impl Validate for Store { + fn validate(&self) -> Result<(), validator::ValidationErrors> { + match self { + Self::S3(s) | Self::Gcs(s) | Self::Azure(s) => s.validate(), + Self::Custom(s) => s.validate(), + } } +} + +impl Store { pub fn example() -> Self { - Self { - provider: BucketType::S3, - bucket: "my-bucket".to_string(), - prefix: None, + Self::S3(BucketAndPrefix::example()) + } + pub fn to_url(&self, catalog_name: &str) -> url::Url { + let (scheme, (bucket, prefix)) = match self { + Self::S3(cfg) => ("s3", cfg.bucket_and_prefix()), + Self::Gcs(cfg) => ("gs", cfg.bucket_and_prefix()), + Self::Azure(cfg) => ("azure", cfg.bucket_and_prefix()), + // Custom storage endpoints are expected to be s3-compatible, and thus use the s3 scheme + Self::Custom(cfg) => ("s3", cfg.bucket_and_prefix()), + }; + let mut url = url::Url::parse(&format!("{}://{}/{}", scheme, bucket, prefix)) + .expect("parsing as URL should never fail"); + if let Store::Custom(cfg) = self { + let tenant = catalog_name + .split_once('/') + .expect("invalid catalog_name passed to Store::to_url") + .0; + url.query_pairs_mut() + .append_pair("profile", tenant) + .append_pair("endpoint", &cfg.endpoint); } + url } } @@ -214,7 +266,7 @@ lazy_static! { #[cfg(test)] mod test { - use super::BUCKET_RE; + use super::*; #[test] fn test_regexes() { @@ -227,4 +279,39 @@ mod test { assert!(BUCKET_RE.is_match(case) == expect); } } + + // The representation of Store was changed from a struct to an enum, so this test is ensuring + // that existing Stores will deserialize properly with the new representation. + #[test] + fn old_store_json_still_deserializes_into_new_enum() { + let actual: Store = + serde_json::from_str(r#"{"provider":"GCS","prefix":"flow/","bucket":"test-bucket"}"#) + .expect("failed to deserialize"); + let Store::Gcs(b_and_p) = actual else { + panic!("expected a gcs store, got: {:?}", actual); + }; + assert_eq!("test-bucket", &b_and_p.bucket); + assert_eq!(Some("flow/"), b_and_p.prefix.as_deref()); + } + + #[test] + fn custom_storage_endpoint() { + let actual: Store = serde_json::from_str( + r#"{"provider":"CUSTOM","prefix":"test/","bucket":"test-bucket", "endpoint": "http://canary.test:1234"}"#, + ).expect("failed to deserialize"); + let Store::Custom(cfg) = &actual else { + panic!("expected a custom store, got: {:?}", actual); + }; + assert_eq!("http://canary.test:1234", cfg.endpoint.as_str()); + assert_eq!("test-bucket", &cfg.bucket); + assert_eq!(Some("test/"), cfg.prefix.as_deref()); + + actual.validate().expect("failed validation"); + + let actual_url = actual.to_url("testTenant/foo").to_string(); + assert_eq!( + "s3://test-bucket/test/?profile=testTenant&endpoint=http%3A%2F%2Fcanary.test%3A1234", + &actual_url + ); + } } diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 1ef3298710..b9650936bb 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -26,7 +26,8 @@ pub use derivations::{ Derivation, Publish, Register, TransformDef, TransformSource, TypescriptModule, Update, }; pub use journals::{ - BucketType, CompressionCodec, FragmentTemplate, JournalTemplate, StorageDef, Store, + BucketAndPrefix, CompressionCodec, CustomStore, FragmentTemplate, JournalTemplate, StorageDef, + Store, }; pub use materializations::{ MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields, @@ -34,7 +35,7 @@ pub use materializations::{ }; pub use references::{ Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, PartitionField, Prefix, - RelativeUrl, Test, Transform, + RelativeUrl, StorageEndpoint, Test, Transform, }; pub use resources::{ContentType, Import, ResourceDef}; pub use schemas::Schema; diff --git a/crates/models/src/references.rs b/crates/models/src/references.rs index 361b66ce88..299dbaacde 100644 --- a/crates/models/src/references.rs +++ b/crates/models/src/references.rs @@ -38,6 +38,7 @@ lazy_static! { static ref FIELD_RE: Regex = Regex::new(&[&JSON_POINTER_CHAR, "+(/", &JSON_POINTER_CHAR, "+)*"].concat()).unwrap(); // RELATIVE_URL_RE matches a relative or absolute URL. It's quite permissive, prohibiting only a space. static ref RELATIVE_URL_RE: Regex = Regex::new(&["[^", &SPACE_CHAR, "]+"].concat()).unwrap(); + static ref ENDPOINT_RE: Regex = Regex::new(r#"^(http://|https://)?[a-z0-9]+[a-z0-9\.:-]*[a-z0-9]+"#).unwrap(); } macro_rules! string_reference_types { @@ -201,6 +202,9 @@ string_reference_types! { /// with respect to the current resource (i.e, ../path/to/flow.yaml), /// or may be an external absolute URL (i.e., http://example/flow.yaml). pub struct RelativeUrl("RelativeUrl::schema", pattern = RELATIVE_URL_RE, example = "https://example/resource"); + + /// An address for a custom storage endpoint + pub struct StorageEndpoint("StorageEndpoint::schema", pattern = ENDPOINT_RE, example = "storage.example.com"); } impl RelativeUrl { @@ -247,7 +251,9 @@ impl Validate for CompositeKey { #[cfg(test)] mod test { - use super::{Collection, Field, JsonPointer, Prefix, RelativeUrl, Transform, Validate}; + use super::{ + Collection, Field, JsonPointer, Prefix, RelativeUrl, StorageEndpoint, Transform, Validate, + }; #[test] fn test_token_re() { @@ -373,4 +379,24 @@ mod test { } } } + + #[test] + fn test_custom_storage_endpoint() { + for (case, expect) in [ + ("https://github.com/a/path?query#/and/stuff", false), + ("", false), // Cannot be empty + ("foo.bar:123", true), + ("cannot have a space", false), + ("http://test.test:345", true), + ("https://test.test:456", true), + ("123.45.67.8:567", true), + ] { + let out = StorageEndpoint::new(case).validate(); + if expect { + out.unwrap(); + } else { + out.unwrap_err(); + } + } + } } diff --git a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap index fcf353dce7..7e0081586a 100644 --- a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap +++ b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap @@ -178,18 +178,6 @@ expression: "&schema" }, "additionalProperties": false, "definitions": { - "BucketType": { - "description": "BucketType is a provider of object storage buckets, which are used to durably storage journal fragments.", - "examples": [ - "S3" - ], - "type": "string", - "enum": [ - "GCS", - "S3", - "AZURE" - ] - }, "Capture": { "description": "Capture names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", "examples": [ @@ -1176,6 +1164,14 @@ expression: "&schema" } } }, + "StorageEndpoint": { + "description": "An address for a custom storage endpoint", + "examples": [ + "storage.example.com" + ], + "type": "string", + "pattern": "^^(http://|https://)?[a-z0-9]+[a-z0-9\\.:-]*[a-z0-9]+$" + }, "Store": { "description": "A Store into which Flow journal fragments may be written.\n\nThe persisted path of a journal fragment is determined by composing the Store's bucket and prefix with the journal name and a content-addressed fragment file name.\n\nEg, given a Store to S3 with bucket \"my-bucket\" and prefix \"a/prefix\", along with a collection \"example/events\" having a logical partition \"region\", then a complete persisted path might be:\n\ns3://my-bucket/a/prefix/example/events/region=EU/utc_date=2021-10-25/utc_hour=13/000123-000456-789abcdef.gzip", "examples": [ @@ -1185,34 +1181,169 @@ expression: "&schema" "provider": "S3" } ], - "type": "object", - "required": [ - "bucket", - "provider" - ], - "properties": { - "bucket": { - "description": "Bucket into which Flow will store data.", - "type": "string", - "pattern": "(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + "oneOf": [ + { + "title": "Amazon Simple Storage Service.", + "examples": [ + { + "bucket": "my-bucket", + "prefix": null + } + ], + "type": "object", + "required": [ + "bucket", + "provider" + ], + "properties": { + "bucket": { + "description": "Bucket into which Flow will store data.", + "type": "string", + "pattern": "(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + }, + "prefix": { + "description": "Optional prefix of keys written to the bucket.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/Prefix" + }, + { + "type": "null" + } + ] + }, + "provider": { + "type": "string", + "enum": [ + "S3" + ] + } + } }, - "prefix": { - "description": "Optional prefix of keys written to the bucket.", - "default": null, - "anyOf": [ + { + "title": "Google Cloud Storage.", + "examples": [ { - "$ref": "#/definitions/Prefix" + "bucket": "my-bucket", + "prefix": null + } + ], + "type": "object", + "required": [ + "bucket", + "provider" + ], + "properties": { + "bucket": { + "description": "Bucket into which Flow will store data.", + "type": "string", + "pattern": "(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + }, + "prefix": { + "description": "Optional prefix of keys written to the bucket.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/Prefix" + }, + { + "type": "null" + } + ] }, + "provider": { + "type": "string", + "enum": [ + "GCS" + ] + } + } + }, + { + "title": "Azure object storage service.", + "examples": [ { - "type": "null" + "bucket": "my-bucket", + "prefix": null } - ] + ], + "type": "object", + "required": [ + "bucket", + "provider" + ], + "properties": { + "bucket": { + "description": "Bucket into which Flow will store data.", + "type": "string", + "pattern": "(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + }, + "prefix": { + "description": "Optional prefix of keys written to the bucket.", + "default": null, + "anyOf": [ + { + "$ref": "#/definitions/Prefix" + }, + { + "type": "null" + } + ] + }, + "provider": { + "type": "string", + "enum": [ + "AZURE" + ] + } + } }, - "provider": { - "description": "Cloud storage provider.", - "$ref": "#/definitions/BucketType" + { + "title": "An S3-compatible endpoint", + "description": "Details of an s3-compatible storage endpoint, such as Minio or R2.", + "examples": [ + { + "bucket": "my-bucket", + "endpoint": "storage.example.com" + } + ], + "type": "object", + "required": [ + "bucket", + "endpoint", + "provider" + ], + "properties": { + "bucket": { + "description": "Bucket into which Flow will store data.", + "type": "string", + "pattern": "(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + }, + "endpoint": { + "description": "endpoint is required when provider is \"custom\", and specifies the address of an s3-compatible storage provider.", + "$ref": "#/definitions/StorageEndpoint" + }, + "prefix": { + "description": "Optional prefix of keys written to the bucket.", + "anyOf": [ + { + "$ref": "#/definitions/Prefix" + }, + { + "type": "null" + } + ] + }, + "provider": { + "type": "string", + "enum": [ + "CUSTOM" + ] + } + } } - } + ] }, "Test": { "description": "Test names are paths of Unicode letters, numbers, '-', '_', or '.'. Each path component is separated by a slash '/', and a name may not begin or end in a '/'.", diff --git a/crates/validation/src/errors.rs b/crates/validation/src/errors.rs index de741c46cc..7e5f112ddc 100644 --- a/crates/validation/src/errors.rs +++ b/crates/validation/src/errors.rs @@ -39,6 +39,16 @@ pub enum Error { }, #[error("at least one storage mapping must be defined")] NoStorageMappings {}, + /// This comes from a validation that ensures users cannot specify the `endpoint` property of a Store that pertains + /// to the 'default/' prefix. This is because the prefix is used to look up the AWS credentials for each store that + /// uses a custom endpoint, but the 'default' profile is always used for Flow's own credentials. Therefore, allowing + /// a user to specify a custom endpoint for that profile could result in Flow's own credentials being sent to a + /// malicious endpoint. This also pertains to the empty storage prefix, which is also disallowed for custom storage endpoints. + #[error("'custom' storage mapping '{prefix}' is not allowed under the {disallowed} prefix")] + InvalidCustomStoragePrefix { + prefix: String, + disallowed: &'static str, // will either be "empty" or "'default/'" + }, #[error("could not map {this_entity} {this_thing} into a storage mapping; did you mean {suggest_name} defined at {suggest_scope}?")] NoStorageMappingSuggest { this_thing: String, diff --git a/crates/validation/src/storage_mapping.rs b/crates/validation/src/storage_mapping.rs index fc0a017b17..e13a4fe854 100644 --- a/crates/validation/src/storage_mapping.rs +++ b/crates/validation/src/storage_mapping.rs @@ -1,6 +1,7 @@ use crate::errors::Error; use super::{indexed, reference}; +use models::Store; use superslice::Ext; pub fn walk_all_storage_mappings( @@ -8,6 +9,41 @@ pub fn walk_all_storage_mappings( errors: &mut tables::Errors, ) { for m in storage_mappings { + for store in m.stores.iter() { + // TODO: it seems like we should also be calling `walk_name` for the bucket and prefix, right? + + // Disallow specifying custom storage endpoints for the 'default/' prefix and empty prefix. + // See: https://github.com/estuary/flow/issues/892#issuecomment-1403873100 + if let Store::Custom(cfg) = store { + indexed::walk_name( + &m.scope, + "endpoint", + &cfg.endpoint, + models::StorageEndpoint::regex(), + errors, + ); + if m.prefix.is_empty() { + Error::InvalidCustomStoragePrefix { + prefix: m.prefix.to_string(), + disallowed: "empty", + } + .push(&m.scope, errors); + } else if m.prefix.starts_with("default/") { + Error::InvalidCustomStoragePrefix { + prefix: m.prefix.to_string(), + disallowed: "'default/'", + } + .push(&m.scope, errors); + } else if m.prefix.starts_with("recovery/default/") { + Error::InvalidCustomStoragePrefix { + prefix: m.prefix.to_string(), + disallowed: "'recovery/default/'", + } + .push(&m.scope, errors); + } + } + } + if m.prefix.is_empty() { // Prefix is allowed to be empty. Continue because // walk_name will otherwise produce an error. diff --git a/supabase/pending/custom_storage_endpoints.sql b/supabase/pending/custom_storage_endpoints.sql new file mode 100644 index 0000000000..13e6ae3a28 --- /dev/null +++ b/supabase/pending/custom_storage_endpoints.sql @@ -0,0 +1,8 @@ +-- migration to setup custom endpoints for storage mappings + +-- If a user supplies a custom storage endpoint, then we'll always use their tenant name as the AWS profile, which is used for looking up +-- the credentials. But the `default` AWS profile is special, and is configured with Flow's own credentials, so if a malicious +-- user created a `default` tenant with a custom storage endpoint, then we could end up sending our credentials to that endpoint. +-- This prevents a user from being able to create such a tenant. +insert into internal.illegal_tenant_names (name) values ('default') on conflict do nothing; +