diff --git a/crates/flowctl/src/generate/mod.rs b/crates/flowctl/src/generate/mod.rs index 3d95e8ea0f..23250d51f6 100644 --- a/crates/flowctl/src/generate/mod.rs +++ b/crates/flowctl/src/generate/mod.rs @@ -3,6 +3,8 @@ use anyhow::Context; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use proto_flow::{capture, derive, flow, materialize}; +use std::str::FromStr; +use url::Url; #[derive(Debug, clap::Args)] #[clap(rename_all = "kebab-case")] @@ -292,8 +294,10 @@ async fn generate_missing_materialization_configs( connector_type: flow::materialization_spec::ConnectorType::Dekaf as i32, config_json: serde_json::to_string(config).unwrap(), }, - // Dekaf isn't a pluggable connector, and so does not have dynamic config. - None, + match &config { + models::DekafConfigContainer::Indirect(s) => Url::from_str(s.as_str()).ok(), + _ => None, + }, ), }; let missing_resource_urls: Vec<(url::Url, models::Collection)> = bindings diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index d2f604757c..93d3e7af07 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -35,8 +35,8 @@ pub use journals::{ AZURE_STORAGE_ACCOUNT_RE, GCS_BUCKET_RE, S3_BUCKET_RE, }; pub use materializations::{ - MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields, - SqliteConfig, + DekafConfigContainer, MaterializationBinding, MaterializationDef, MaterializationEndpoint, + MaterializationFields, SqliteConfig, }; pub use raw_value::RawValue; pub use references::{ diff --git a/crates/models/src/materializations.rs b/crates/models/src/materializations.rs index 0ff3e4659d..78b604799a 100644 --- a/crates/models/src/materializations.rs +++ b/crates/models/src/materializations.rs @@ -41,6 +41,13 @@ pub struct MaterializationDef { pub delete: bool, } +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)] +#[serde(untagged)] +pub enum DekafConfigContainer { + Direct(DekafConfig), + Indirect(String), +} + /// An Endpoint connector used for Flow materializations. #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)] #[serde(deny_unknown_fields, rename_all = "camelCase")] @@ -51,7 +58,7 @@ pub enum MaterializationEndpoint { /// # A local command (development only). Local(LocalConfig), /// # A Dekaf connection - Dekaf(DekafConfig), + Dekaf(DekafConfigContainer), } #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)] diff --git a/crates/sources/src/indirect.rs b/crates/sources/src/indirect.rs index 60986e4ae7..c7a36069a1 100644 --- a/crates/sources/src/indirect.rs +++ b/crates/sources/src/indirect.rs @@ -1,5 +1,6 @@ use super::Format; use crate::Scope; +use models::RawValue; use proto_flow::flow::ContentType; use std::collections::BTreeMap; @@ -498,10 +499,28 @@ fn indirect_materialization( resources, threshold, ), - // Dekaf isn't a pluggable connector, and so does not have dynamic config. - // All of its config is defined directly within models::DekafConfig, and so - // should not be indirected. - models::MaterializationEndpoint::Dekaf(_) => {} + // I don't think this case can ever get hit as `indirect_materialization` is only called by + // `do_discover`, `do_pull_specs`, and `do_develop`, all of which are working with fully + // inlined specs. + models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Indirect(_)) => { + tracing::warn!("Unexpectedly tried to indirect an already indirected location (dekaf)"); + } + models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Direct(config)) => { + indirect_dom( + Scope::new(scope) + .push_prop("endpoint") + .push_prop("local") + .push_prop("config"), + &mut RawValue::from_value( + &serde_json::to_value(config).expect("Serializing DekafConfig should not fail"), + ), + ContentType::Config, + format!("{base}.config"), + imports, + resources, + threshold, + ) + } } for (index, models::MaterializationBinding { resource, .. }) in bindings.iter_mut().enumerate() diff --git a/crates/sources/src/loader.rs b/crates/sources/src/loader.rs index be6ad06acf..1c84786e28 100644 --- a/crates/sources/src/loader.rs +++ b/crates/sources/src/loader.rs @@ -480,7 +480,7 @@ impl Loader { models::DeriveUsing::Connector(models::ConnectorConfig { config, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("using") .push_prop("connector") @@ -496,7 +496,7 @@ impl Loader { for (index, migration) in migrations.iter().enumerate() { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("using") .push_prop("sqlite") @@ -513,7 +513,7 @@ impl Loader { models::DeriveUsing::Typescript(models::DeriveUsingTypescript { module, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("using") .push_prop("typescript") @@ -528,7 +528,7 @@ impl Loader { models::DeriveUsing::Local(models::LocalConfig { config, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("using") .push_prop("local") @@ -545,7 +545,7 @@ impl Loader { for (index, transform) in spec.transforms.iter().enumerate() { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("transforms") .push_item(index) @@ -560,7 +560,7 @@ impl Loader { if let models::Shuffle::Lambda(lambda) = &transform.shuffle { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("transforms") .push_item(index) @@ -590,7 +590,7 @@ impl Loader { models::CaptureEndpoint::Connector(models::ConnectorConfig { config, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("endpoint") .push_prop("connector") @@ -605,7 +605,7 @@ impl Loader { models::CaptureEndpoint::Local(models::LocalConfig { config, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("endpoint") .push_prop("local") @@ -622,7 +622,7 @@ impl Loader { for (index, binding) in spec.bindings.iter().enumerate() { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("bindings") .push_item(index) @@ -660,7 +660,7 @@ impl Loader { }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("endpoint") .push_prop("connector") @@ -675,7 +675,7 @@ impl Loader { models::MaterializationEndpoint::Local(models::LocalConfig { config, .. }) => { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("endpoint") .push_prop("local") @@ -687,16 +687,31 @@ impl Loader { .boxed(), ); } - // Dekaf isn't a pluggable connector, and so does not have dynamic config to possibly - // load from a reference. All of its config is defined directly within models::DekafConfig. - models::MaterializationEndpoint::Dekaf(_) => {} + models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Indirect( + location, + )) => { + tasks.push( + async move { + self.load_config( + scope + .push_prop("endpoint") + .push_prop("dekaf") + .push_prop("config"), + location.as_str(), + ) + .await + } + .boxed(), + ); + } + models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Direct(_)) => {} }; for (index, binding) in spec.bindings.iter().enumerate() { if !binding.disable { tasks.push( async move { - self.load_config( + self.maybe_load_config( scope .push_prop("bindings") .push_item(index) @@ -752,22 +767,30 @@ impl Loader { ); } - async fn load_config<'s>(&'s self, scope: Scope<'s>, config: &RawValue) { - // If `config` is a JSON string that has no whitespace then presume and - // require that it's a relative or absolute URL to an imported file. + /// If `config` is a JSON string that has no whitespace then presume and + /// require that it's a relative or absolute URL to an imported file. + async fn maybe_load_config<'s>(&'s self, scope: Scope<'s>, config: &RawValue) { match serde_json::from_str::<&str>(config.get()) { Ok(import) if !import.chars().any(char::is_whitespace) => { - self.load_import( - scope, - self.fallible(scope, scope.resource().join(&import)), - flow::ContentType::Config, - ) - .await; + self.load_config(scope, import).await } _ => {} } } + /// If `config` has no whitespace then presume and require that + /// it's a relative or absolute URL to an imported file. + async fn load_config<'s>(&'s self, scope: Scope<'s>, config: &str) { + if !config.chars().any(char::is_whitespace) { + self.load_import( + scope, + self.fallible(scope, scope.resource().join(config)), + flow::ContentType::Config, + ) + .await; + } + } + // Consume a result capable of producing a LoadError. // Pass through a Result::Ok as Some. // Or, record a Result::Err and return None. diff --git a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap index d5194e96d0..175250a674 100644 --- a/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap +++ b/crates/sources/tests/snapshots/schema_generation__catalog_schema_snapshot.snap @@ -465,6 +465,16 @@ expression: "&schema" "description": "Dekaf configuration. Currently empty, but present to enable easy addition of config options when they show up in the future.", "type": "object" }, + "DekafConfigContainer": { + "anyOf": [ + { + "$ref": "#/definitions/DekafConfig" + }, + { + "type": "string" + } + ] + }, "Derivation": { "description": "Derive specifies how a collection is derived from other collections.", "type": "object", @@ -931,7 +941,7 @@ expression: "&schema" ], "properties": { "dekaf": { - "$ref": "#/definitions/DekafConfig" + "$ref": "#/definitions/DekafConfigContainer" } }, "additionalProperties": false diff --git a/crates/validation/tests/scenario_tests.rs b/crates/validation/tests/scenario_tests.rs index 90366507ef..5836fd7d6a 100644 --- a/crates/validation/tests/scenario_tests.rs +++ b/crates/validation/tests/scenario_tests.rs @@ -9,7 +9,7 @@ fn test_golden_all_visits() { } #[test] -fn test_dekaf_materialization() { +fn test_dekaf_materialization_inline_config() { let fixture = r##" test://example/catalog.yaml: collections: @@ -38,6 +38,99 @@ driver: insta::assert_debug_snapshot!(outcome); } +#[test] +fn test_dekaf_materialization_indirect_config() { + let fixture = r##" +test://example/dekaf.yaml: {} +test://example/catalog.yaml: + collections: + testing/schema_with_properties: + schema: + type: object + properties: + id: { type: string } + required: [id] + key: [/id] + materializations: + testing/test_dekaf: + endpoint: + dekaf: example/dekaf.yaml + bindings: + - source: testing/schema_with_properties + resource: {} +driver: + dataPlanes: + "1d:1d:1d:1d:1d:1d:1d:1d": + default: true +"##; + + let outcome = common::run(fixture, "{}"); + // Expect not to see any projections for the empty properties + insta::assert_debug_snapshot!(outcome); +} + +#[test] +fn test_dekaf_materialization_invalid() { + let fixture = r##" +test://example/dekaf.yaml: {} +test://example/catalog.yaml: + collections: + testing/schema_with_properties: + schema: + type: object + properties: + id: { type: string } + required: [id] + key: [/id] + materializations: + testing/test_dekaf: + endpoint: + dekaf: false + bindings: + - source: testing/schema_with_properties + resource: {} +driver: + dataPlanes: + "1d:1d:1d:1d:1d:1d:1d:1d": + default: true +"##; + + let outcome = common::run(fixture, "{}"); + // Expect not to see any projections for the empty properties + insta::assert_debug_snapshot!(outcome); +} + +#[test] +fn test_dekaf_materialization_nonexistent() { + let fixture = r##" +test://example/dekaf.yaml: {} +test://example/catalog.yaml: + collections: + testing/schema_with_properties: + schema: + type: object + properties: + id: { type: string } + required: [id] + key: [/id] + materializations: + testing/test_dekaf: + endpoint: + dekaf: foo/bar + bindings: + - source: testing/schema_with_properties + resource: {} +driver: + dataPlanes: + "1d:1d:1d:1d:1d:1d:1d:1d": + default: true +"##; + + let outcome = common::run(fixture, "{}"); + // Expect not to see any projections for the empty properties + insta::assert_debug_snapshot!(outcome); +} + #[test] fn test_projection_not_created_for_empty_properties() { let fixture = r##" diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap new file mode 100644 index 0000000000..961ba8d6fa --- /dev/null +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_indirect_config.snap @@ -0,0 +1,661 @@ +--- +source: crates/validation/tests/scenario_tests.rs +assertion_line: 69 +expression: outcome +--- +Outcome { + built_captures: [], + built_collections: [ + BuiltCollection { + collection: testing/schema_with_properties, + scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, + control_id: "0000000000000000", + data_plane_id: "1d1d1d1d1d1d1d1d", + expect_pub_id: "0000000000000000", + expect_build_id: "0000000000000000", + model: { + "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, + "key": [ + "/id" + ] + }, + validated: NULL, + spec: CollectionSpec { + name: "testing/schema_with_properties", + write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", + read_schema_json: "", + key: [ + "/id", + ], + uuid_ptr: "/_meta/uuid", + partition_fields: [], + projections: [ + Projection { + ptr: "/_meta/flow_truncated", + field: "_meta/flow_truncated", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "boolean", + ], + string: None, + title: "Flow truncation indicator", + description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "", + field: "flow_document", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "object", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/_meta/uuid", + field: "flow_published_at", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "date-time", + content_encoding: "uuid", + max_length: 0, + }, + ), + title: "Flow Publication Time", + description: "Flow publication date-time of this document", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/id", + field: "id", + explicit: false, + is_partition_key: false, + is_primary_key: true, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "", + content_encoding: "", + max_length: 0, + }, + ), + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + ], + ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", + partition_template: Some( + JournalSpec { + name: "testing/schema_with_properties/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-ndjson", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 536870912, + compression_codec: Gzip, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + derivation: None, + }, + previous_spec: NULL, + is_touch: 0, + dependency_hash: NULL, + }, + ], + built_materializations: [ + BuiltMaterialization { + materialization: testing/test_dekaf, + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + control_id: "0000000000000000", + data_plane_id: "1d1d1d1d1d1d1d1d", + expect_pub_id: "0000000000000000", + expect_build_id: "0000000000000000", + model: { + "endpoint": { + "dekaf": "example/dekaf.yaml" + }, + "bindings": [ + { + "resource": {}, + "source": "testing/schema_with_properties", + "fields": { + "recommended": true + } + } + ] + }, + validated: Validated { + bindings: [ + Binding { + constraints: { + "_meta/flow_truncated": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "flow_document": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "flow_published_at": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "id": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + }, + resource_path: [ + "binding-0", + ], + delta_updates: true, + }, + ], + }, + spec: MaterializationSpec { + name: "testing/test_dekaf", + connector_type: Dekaf, + config_json: "\"example/dekaf.yaml\"", + bindings: [ + Binding { + resource_config_json: "{}", + resource_path: [ + "binding-0", + ], + collection: Some( + CollectionSpec { + name: "testing/schema_with_properties", + write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", + read_schema_json: "", + key: [ + "/id", + ], + uuid_ptr: "/_meta/uuid", + partition_fields: [], + projections: [ + Projection { + ptr: "/_meta/flow_truncated", + field: "_meta/flow_truncated", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "boolean", + ], + string: None, + title: "Flow truncation indicator", + description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "", + field: "flow_document", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "object", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/_meta/uuid", + field: "flow_published_at", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "date-time", + content_encoding: "uuid", + max_length: 0, + }, + ), + title: "Flow Publication Time", + description: "Flow publication date-time of this document", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/id", + field: "id", + explicit: false, + is_partition_key: false, + is_primary_key: true, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "", + content_encoding: "", + max_length: 0, + }, + ), + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + ], + ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", + partition_template: Some( + JournalSpec { + name: "testing/schema_with_properties/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-ndjson", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 536870912, + compression_codec: Gzip, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + derivation: None, + }, + ), + partition_selector: Some( + LabelSelector { + include: Some( + LabelSet { + labels: [ + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + exclude: Some( + LabelSet { + labels: [], + }, + ), + }, + ), + priority: 0, + field_selection: Some( + FieldSelection { + keys: [], + values: [], + document: "", + field_config_json_map: {}, + }, + ), + delta_updates: true, + deprecated_shuffle: None, + journal_read_suffix: "materialize/testing/test_dekaf/binding-0", + not_before: None, + not_after: None, + backfill: 0, + state_key: "binding-0", + }, + ], + shard_template: Some( + ShardSpec { + id: "materialize/testing/test_dekaf/2020202020202020", + sources: [], + recovery_log_prefix: "recovery", + hint_prefix: "/estuary/flow/hints", + hint_backups: 2, + max_txn_duration: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + min_txn_duration: Some( + Duration { + seconds: 0, + nanos: 0, + }, + ), + disable: false, + hot_standbys: 0, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/log-level", + value: "info", + prefix: false, + }, + Label { + name: "estuary.dev/task-name", + value: "testing/test_dekaf", + prefix: false, + }, + Label { + name: "estuary.dev/task-type", + value: "materialization", + prefix: false, + }, + ], + }, + ), + disable_wait_for_ack: false, + ring_buffer_size: 65536, + read_channel_size: 4096, + }, + ), + recovery_log_template: Some( + JournalSpec { + name: "recovery/materialize/testing/test_dekaf/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-gazette-recoverylog", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/task-name", + value: "testing/test_dekaf", + prefix: false, + }, + Label { + name: "estuary.dev/task-type", + value: "materialization", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 268435456, + compression_codec: Snappy, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + network_ports: [], + }, + previous_spec: NULL, + is_touch: 0, + dependency_hash: 32182596aeb1e4a0, + }, + ], + built_tests: [], + captures: [], + collections: [ + DraftCollection { + collection: testing/schema_with_properties, + scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, + expect_pub_id: NULL, + model: { + "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, + "key": [ + "/id" + ] + }, + is_touch: 0, + }, + ], + errors: [], + errors_draft: [ + Error { + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + error: failed to fetch resource test://example/example/dekaf.yaml: fixture not found, + }, + ], + fetches: [ + Fetch { + depth: 1, + resource: test://example/catalog.yaml, + }, + Fetch { + depth: 2, + resource: test://example/example/dekaf.yaml, + }, + ], + imports: [ + Import { + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + to_resource: test://example/example/dekaf.yaml, + }, + ], + materializations: [ + DraftMaterialization { + materialization: testing/test_dekaf, + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + expect_pub_id: NULL, + model: { + "endpoint": { + "dekaf": "example/dekaf.yaml" + }, + "bindings": [ + { + "resource": {}, + "source": "testing/schema_with_properties", + "fields": { + "recommended": true + } + } + ] + }, + is_touch: 0, + }, + ], + resources: [ + Resource { + resource: test://example/catalog.yaml, + content_type: "CATALOG", + content: ".. binary ..", + content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":"example/dekaf.yaml"}}}}, + }, + ], + storage_mappings: [ + StorageMapping { + catalog_prefix: , + control_id: "0000000000000000", + stores: [ + { + "provider": "S3", + "bucket": "a-bucket", + "prefix": null, + "region": null + } + ], + }, + ], + tests: [], +} diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap similarity index 100% rename from crates/validation/tests/snapshots/scenario_tests__dekaf_materialization.snap rename to crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_inline_config.snap diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap new file mode 100644 index 0000000000..697478a621 --- /dev/null +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_invalid.snap @@ -0,0 +1,51 @@ +--- +source: crates/validation/tests/scenario_tests.rs +assertion_line: 100 +expression: outcome +--- +Outcome { + built_captures: [], + built_collections: [], + built_materializations: [], + built_tests: [], + captures: [], + collections: [], + errors: [], + errors_draft: [ + Error { + scope: test://example/catalog.yaml, + error: failed to parse document (data did not match any variant of untagged enum DekafConfigContainer at line 1 column 288): data did not match any variant of untagged enum DekafConfigContainer at line 1 column 288, + }, + ], + fetches: [ + Fetch { + depth: 1, + resource: test://example/catalog.yaml, + }, + ], + imports: [], + materializations: [], + resources: [ + Resource { + resource: test://example/catalog.yaml, + content_type: "CATALOG", + content: ".. binary ..", + content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":false}}}}, + }, + ], + storage_mappings: [ + StorageMapping { + catalog_prefix: , + control_id: "0000000000000000", + stores: [ + { + "provider": "S3", + "bucket": "a-bucket", + "prefix": null, + "region": null + } + ], + }, + ], + tests: [], +} diff --git a/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap new file mode 100644 index 0000000000..7240236b95 --- /dev/null +++ b/crates/validation/tests/snapshots/scenario_tests__dekaf_materialization_nonexistent.snap @@ -0,0 +1,661 @@ +--- +source: crates/validation/tests/scenario_tests.rs +assertion_line: 131 +expression: outcome +--- +Outcome { + built_captures: [], + built_collections: [ + BuiltCollection { + collection: testing/schema_with_properties, + scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, + control_id: "0000000000000000", + data_plane_id: "1d1d1d1d1d1d1d1d", + expect_pub_id: "0000000000000000", + expect_build_id: "0000000000000000", + model: { + "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, + "key": [ + "/id" + ] + }, + validated: NULL, + spec: CollectionSpec { + name: "testing/schema_with_properties", + write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", + read_schema_json: "", + key: [ + "/id", + ], + uuid_ptr: "/_meta/uuid", + partition_fields: [], + projections: [ + Projection { + ptr: "/_meta/flow_truncated", + field: "_meta/flow_truncated", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "boolean", + ], + string: None, + title: "Flow truncation indicator", + description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "", + field: "flow_document", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "object", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/_meta/uuid", + field: "flow_published_at", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "date-time", + content_encoding: "uuid", + max_length: 0, + }, + ), + title: "Flow Publication Time", + description: "Flow publication date-time of this document", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/id", + field: "id", + explicit: false, + is_partition_key: false, + is_primary_key: true, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "", + content_encoding: "", + max_length: 0, + }, + ), + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + ], + ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", + partition_template: Some( + JournalSpec { + name: "testing/schema_with_properties/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-ndjson", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 536870912, + compression_codec: Gzip, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + derivation: None, + }, + previous_spec: NULL, + is_touch: 0, + dependency_hash: NULL, + }, + ], + built_materializations: [ + BuiltMaterialization { + materialization: testing/test_dekaf, + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + control_id: "0000000000000000", + data_plane_id: "1d1d1d1d1d1d1d1d", + expect_pub_id: "0000000000000000", + expect_build_id: "0000000000000000", + model: { + "endpoint": { + "dekaf": "foo/bar" + }, + "bindings": [ + { + "resource": {}, + "source": "testing/schema_with_properties", + "fields": { + "recommended": true + } + } + ] + }, + validated: Validated { + bindings: [ + Binding { + constraints: { + "_meta/flow_truncated": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "flow_document": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "flow_published_at": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + "id": Constraint { + r#type: FieldOptional, + reason: "no-op validator allows everything", + }, + }, + resource_path: [ + "binding-0", + ], + delta_updates: true, + }, + ], + }, + spec: MaterializationSpec { + name: "testing/test_dekaf", + connector_type: Dekaf, + config_json: "\"foo/bar\"", + bindings: [ + Binding { + resource_config_json: "{}", + resource_path: [ + "binding-0", + ], + collection: Some( + CollectionSpec { + name: "testing/schema_with_properties", + write_schema_json: "{\"$id\":\"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema\",\"properties\":{\"id\":{\"type\":\"string\"}},\"required\":[\"id\"],\"type\":\"object\"}", + read_schema_json: "", + key: [ + "/id", + ], + uuid_ptr: "/_meta/uuid", + partition_fields: [], + projections: [ + Projection { + ptr: "/_meta/flow_truncated", + field: "_meta/flow_truncated", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "boolean", + ], + string: None, + title: "Flow truncation indicator", + description: "Indicates whether any of the materialized values for this row have been truncated to make them fit inside the limitations of the destination system.", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "", + field: "flow_document", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "object", + ], + string: None, + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/_meta/uuid", + field: "flow_published_at", + explicit: false, + is_partition_key: false, + is_primary_key: false, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "date-time", + content_encoding: "uuid", + max_length: 0, + }, + ), + title: "Flow Publication Time", + description: "Flow publication date-time of this document", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + Projection { + ptr: "/id", + field: "id", + explicit: false, + is_partition_key: false, + is_primary_key: true, + inference: Some( + Inference { + types: [ + "string", + ], + string: Some( + String { + content_type: "", + format: "", + content_encoding: "", + max_length: 0, + }, + ), + title: "", + description: "", + default_json: "", + secret: false, + exists: Must, + numeric: None, + }, + ), + }, + ], + ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}", + partition_template: Some( + JournalSpec { + name: "testing/schema_with_properties/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-ndjson", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 536870912, + compression_codec: Gzip, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + derivation: None, + }, + ), + partition_selector: Some( + LabelSelector { + include: Some( + LabelSet { + labels: [ + Label { + name: "estuary.dev/collection", + value: "testing/schema_with_properties", + prefix: false, + }, + ], + }, + ), + exclude: Some( + LabelSet { + labels: [], + }, + ), + }, + ), + priority: 0, + field_selection: Some( + FieldSelection { + keys: [], + values: [], + document: "", + field_config_json_map: {}, + }, + ), + delta_updates: true, + deprecated_shuffle: None, + journal_read_suffix: "materialize/testing/test_dekaf/binding-0", + not_before: None, + not_after: None, + backfill: 0, + state_key: "binding-0", + }, + ], + shard_template: Some( + ShardSpec { + id: "materialize/testing/test_dekaf/2020202020202020", + sources: [], + recovery_log_prefix: "recovery", + hint_prefix: "/estuary/flow/hints", + hint_backups: 2, + max_txn_duration: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + min_txn_duration: Some( + Duration { + seconds: 0, + nanos: 0, + }, + ), + disable: false, + hot_standbys: 0, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/log-level", + value: "info", + prefix: false, + }, + Label { + name: "estuary.dev/task-name", + value: "testing/test_dekaf", + prefix: false, + }, + Label { + name: "estuary.dev/task-type", + value: "materialization", + prefix: false, + }, + ], + }, + ), + disable_wait_for_ack: false, + ring_buffer_size: 65536, + read_channel_size: 4096, + }, + ), + recovery_log_template: Some( + JournalSpec { + name: "recovery/materialize/testing/test_dekaf/2020202020202020", + replication: 3, + labels: Some( + LabelSet { + labels: [ + Label { + name: "app.gazette.dev/managed-by", + value: "estuary.dev/flow", + prefix: false, + }, + Label { + name: "content-type", + value: "application/x-gazette-recoverylog", + prefix: false, + }, + Label { + name: "estuary.dev/build", + value: "2121212121212121", + prefix: false, + }, + Label { + name: "estuary.dev/task-name", + value: "testing/test_dekaf", + prefix: false, + }, + Label { + name: "estuary.dev/task-type", + value: "materialization", + prefix: false, + }, + ], + }, + ), + fragment: Some( + Fragment { + length: 268435456, + compression_codec: Snappy, + stores: [ + "s3://a-bucket/", + ], + refresh_interval: Some( + Duration { + seconds: 300, + nanos: 0, + }, + ), + retention: None, + flush_interval: None, + path_postfix_template: "", + }, + ), + flags: 4, + max_append_rate: 4194304, + }, + ), + network_ports: [], + }, + previous_spec: NULL, + is_touch: 0, + dependency_hash: 32182596aeb1e4a0, + }, + ], + built_tests: [], + captures: [], + collections: [ + DraftCollection { + collection: testing/schema_with_properties, + scope: test://example/catalog.yaml#/collections/testing~1schema_with_properties, + expect_pub_id: NULL, + model: { + "schema": {"$id":"test://example/catalog.yaml?ptr=/collections/testing~1schema_with_properties/schema","properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}, + "key": [ + "/id" + ] + }, + is_touch: 0, + }, + ], + errors: [], + errors_draft: [ + Error { + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + error: failed to fetch resource test://example/foo/bar: fixture not found, + }, + ], + fetches: [ + Fetch { + depth: 1, + resource: test://example/catalog.yaml, + }, + Fetch { + depth: 2, + resource: test://example/foo/bar, + }, + ], + imports: [ + Import { + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf/endpoint/dekaf/config, + to_resource: test://example/foo/bar, + }, + ], + materializations: [ + DraftMaterialization { + materialization: testing/test_dekaf, + scope: test://example/catalog.yaml#/materializations/testing~1test_dekaf, + expect_pub_id: NULL, + model: { + "endpoint": { + "dekaf": "foo/bar" + }, + "bindings": [ + { + "resource": {}, + "source": "testing/schema_with_properties", + "fields": { + "recommended": true + } + } + ] + }, + is_touch: 0, + }, + ], + resources: [ + Resource { + resource: test://example/catalog.yaml, + content_type: "CATALOG", + content: ".. binary ..", + content_dom: {"collections":{"testing/schema_with_properties":{"key":["/id"],"schema":{"properties":{"id":{"type":"string"}},"required":["id"],"type":"object"}}},"materializations":{"testing/test_dekaf":{"bindings":[{"resource":{},"source":"testing/schema_with_properties"}],"endpoint":{"dekaf":"foo/bar"}}}}, + }, + ], + storage_mappings: [ + StorageMapping { + catalog_prefix: , + control_id: "0000000000000000", + stores: [ + { + "provider": "S3", + "bucket": "a-bucket", + "prefix": null, + "region": null + } + ], + }, + ], + tests: [], +} diff --git a/flow.schema.json b/flow.schema.json index fa097bde92..0e9c472fa3 100644 --- a/flow.schema.json +++ b/flow.schema.json @@ -460,6 +460,16 @@ "description": "Dekaf configuration. Currently empty, but present to enable easy addition of config options when they show up in the future.", "type": "object" }, + "DekafConfigContainer": { + "anyOf": [ + { + "$ref": "#/definitions/DekafConfig" + }, + { + "type": "string" + } + ] + }, "Derivation": { "description": "Derive specifies how a collection is derived from other collections.", "type": "object", @@ -926,7 +936,7 @@ ], "properties": { "dekaf": { - "$ref": "#/definitions/DekafConfig" + "$ref": "#/definitions/DekafConfigContainer" } }, "additionalProperties": false