Skip to content

Commit

Permalink
dekaf: Allow for indirecting DekafConfig if/when desired
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Sep 24, 2024
1 parent 6fa871d commit 5a1f419
Show file tree
Hide file tree
Showing 10 changed files with 1,554 additions and 34 deletions.
8 changes: 6 additions & 2 deletions crates/flowctl/src/generate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use anyhow::Context;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use proto_flow::{capture, derive, flow, materialize};
use std::str::FromStr;
use url::Url;

#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
Expand Down Expand Up @@ -293,8 +295,10 @@ async fn generate_missing_materialization_configs(
connector_type: flow::materialization_spec::ConnectorType::Dekaf as i32,
config_json: serde_json::to_string(config).unwrap(),
},
// Dekaf isn't a pluggable connector, and so does not have dynamic config.
None,
match &config {
models::DekafConfigContainer::Indirect(s) => Url::from_str(s.as_str()).ok(),
_ => None,
},
),
};
let missing_resource_urls: Vec<(url::Url, models::Collection)> = bindings
Expand Down
4 changes: 2 additions & 2 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ pub use journals::{
AZURE_STORAGE_ACCOUNT_RE, GCS_BUCKET_RE, S3_BUCKET_RE,
};
pub use materializations::{
MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields,
SqliteConfig,
DekafConfigContainer, MaterializationBinding, MaterializationDef, MaterializationEndpoint,
MaterializationFields, SqliteConfig,
};
pub use raw_value::RawValue;
pub use references::{
Expand Down
9 changes: 8 additions & 1 deletion crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ pub struct MaterializationDef {
pub delete: bool,
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
#[serde(untagged)]
pub enum DekafConfigContainer {
Direct(DekafConfig),
Indirect(String),
}

/// An Endpoint connector used for Flow materializations.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
Expand All @@ -51,7 +58,7 @@ pub enum MaterializationEndpoint {
/// # A local command (development only).
Local(LocalConfig),
/// # A Dekaf connection
Dekaf(DekafConfig),
Dekaf(DekafConfigContainer),
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
Expand Down
28 changes: 24 additions & 4 deletions crates/sources/src/indirect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::Format;
use crate::Scope;
use anyhow::bail;
use models::RawValue;
use proto_flow::flow::ContentType;
use std::collections::BTreeMap;

Expand Down Expand Up @@ -498,10 +500,28 @@ fn indirect_materialization(
resources,
threshold,
),
// Dekaf isn't a pluggable connector, and so does not have dynamic config.
// All of its config is defined directly within models::DekafConfig, and so
// should not be indirected.
models::MaterializationEndpoint::Dekaf(_) => {}
// I don't think this case can ever get hit as `indirect_materialization` is only called by
// `do_discover`, `do_pull_specs`, and `do_develop`, all of which are working with fully
// inlined specs.
models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Indirect(_)) => {
tracing::warn!("Unexpectedly tried to indirect an already indirected location (dekaf)");
}
models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Direct(config)) => {
indirect_dom(
Scope::new(scope)
.push_prop("endpoint")
.push_prop("local")
.push_prop("config"),
&mut RawValue::from_value(
&serde_json::to_value(config).expect("Serializing DekafConfig should not fail"),
),
ContentType::Config,
format!("{base}.config"),
imports,
resources,
threshold,
)
}
}

for (index, models::MaterializationBinding { resource, .. }) in bindings.iter_mut().enumerate()
Expand Down
71 changes: 47 additions & 24 deletions crates/sources/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl<F: Fetcher> Loader<F> {
models::DeriveUsing::Connector(models::ConnectorConfig { config, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("using")
.push_prop("connector")
Expand All @@ -496,7 +496,7 @@ impl<F: Fetcher> Loader<F> {
for (index, migration) in migrations.iter().enumerate() {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("using")
.push_prop("sqlite")
Expand All @@ -513,7 +513,7 @@ impl<F: Fetcher> Loader<F> {
models::DeriveUsing::Typescript(models::DeriveUsingTypescript { module, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("using")
.push_prop("typescript")
Expand All @@ -528,7 +528,7 @@ impl<F: Fetcher> Loader<F> {
models::DeriveUsing::Local(models::LocalConfig { config, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("using")
.push_prop("local")
Expand All @@ -545,7 +545,7 @@ impl<F: Fetcher> Loader<F> {
for (index, transform) in spec.transforms.iter().enumerate() {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("transforms")
.push_item(index)
Expand All @@ -560,7 +560,7 @@ impl<F: Fetcher> Loader<F> {
if let models::Shuffle::Lambda(lambda) = &transform.shuffle {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("transforms")
.push_item(index)
Expand Down Expand Up @@ -590,7 +590,7 @@ impl<F: Fetcher> Loader<F> {
models::CaptureEndpoint::Connector(models::ConnectorConfig { config, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("endpoint")
.push_prop("connector")
Expand All @@ -605,7 +605,7 @@ impl<F: Fetcher> Loader<F> {
models::CaptureEndpoint::Local(models::LocalConfig { config, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("endpoint")
.push_prop("local")
Expand All @@ -622,7 +622,7 @@ impl<F: Fetcher> Loader<F> {
for (index, binding) in spec.bindings.iter().enumerate() {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("bindings")
.push_item(index)
Expand Down Expand Up @@ -660,7 +660,7 @@ impl<F: Fetcher> Loader<F> {
}) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("endpoint")
.push_prop("connector")
Expand All @@ -675,7 +675,7 @@ impl<F: Fetcher> Loader<F> {
models::MaterializationEndpoint::Local(models::LocalConfig { config, .. }) => {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("endpoint")
.push_prop("local")
Expand All @@ -687,16 +687,31 @@ impl<F: Fetcher> Loader<F> {
.boxed(),
);
}
// Dekaf isn't a pluggable connector, and so does not have dynamic config to possibly
// load from a reference. All of its config is defined directly within models::DekafConfig.
models::MaterializationEndpoint::Dekaf(_) => {}
models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Indirect(
location,
)) => {
tasks.push(
async move {
self.load_config(
scope
.push_prop("endpoint")
.push_prop("dekaf")
.push_prop("config"),
location.as_str(),
)
.await
}
.boxed(),
);
}
models::MaterializationEndpoint::Dekaf(models::DekafConfigContainer::Direct(_)) => {}
};

for (index, binding) in spec.bindings.iter().enumerate() {
if !binding.disable {
tasks.push(
async move {
self.load_config(
self.maybe_load_config(
scope
.push_prop("bindings")
.push_item(index)
Expand Down Expand Up @@ -752,22 +767,30 @@ impl<F: Fetcher> Loader<F> {
);
}

async fn load_config<'s>(&'s self, scope: Scope<'s>, config: &RawValue) {
// If `config` is a JSON string that has no whitespace then presume and
// require that it's a relative or absolute URL to an imported file.
/// If `config` is a JSON string that has no whitespace then presume and
/// require that it's a relative or absolute URL to an imported file.
async fn maybe_load_config<'s>(&'s self, scope: Scope<'s>, config: &RawValue) {
match serde_json::from_str::<&str>(config.get()) {
Ok(import) if !import.chars().any(char::is_whitespace) => {
self.load_import(
scope,
self.fallible(scope, scope.resource().join(&import)),
flow::ContentType::Config,
)
.await;
self.load_config(scope, import).await
}
_ => {}
}
}

/// If `config` has no whitespace then presume and require that
/// it's a relative or absolute URL to an imported file.
async fn load_config<'s>(&'s self, scope: Scope<'s>, config: &str) {
if !config.chars().any(char::is_whitespace) {
self.load_import(
scope,
self.fallible(scope, scope.resource().join(config)),
flow::ContentType::Config,
)
.await;
}
}

// Consume a result capable of producing a LoadError.
// Pass through a Result::Ok<T> as Some<T>.
// Or, record a Result::Err<T> and return None.
Expand Down
95 changes: 94 additions & 1 deletion crates/validation/tests/scenario_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn test_golden_all_visits() {
}

#[test]
fn test_dekaf_materialization() {
fn test_dekaf_materialization_inline_config() {
let fixture = r##"
test://example/catalog.yaml:
collections:
Expand Down Expand Up @@ -38,6 +38,99 @@ driver:
insta::assert_debug_snapshot!(outcome);
}

#[test]
fn test_dekaf_materialization_indirect_config() {
let fixture = r##"
test://example/dekaf.yaml: {}
test://example/catalog.yaml:
collections:
testing/schema_with_properties:
schema:
type: object
properties:
id: { type: string }
required: [id]
key: [/id]
materializations:
testing/test_dekaf:
endpoint:
dekaf: example/dekaf.yaml
bindings:
- source: testing/schema_with_properties
resource: {}
driver:
dataPlanes:
"1d:1d:1d:1d:1d:1d:1d:1d":
default: true
"##;

let outcome = common::run(fixture, "{}");
// Expect not to see any projections for the empty properties
insta::assert_debug_snapshot!(outcome);
}

#[test]
fn test_dekaf_materialization_invalid() {
let fixture = r##"
test://example/dekaf.yaml: {}
test://example/catalog.yaml:
collections:
testing/schema_with_properties:
schema:
type: object
properties:
id: { type: string }
required: [id]
key: [/id]
materializations:
testing/test_dekaf:
endpoint:
dekaf: false
bindings:
- source: testing/schema_with_properties
resource: {}
driver:
dataPlanes:
"1d:1d:1d:1d:1d:1d:1d:1d":
default: true
"##;

let outcome = common::run(fixture, "{}");
// Expect not to see any projections for the empty properties
insta::assert_debug_snapshot!(outcome);
}

#[test]
fn test_dekaf_materialization_nonexistent() {
let fixture = r##"
test://example/dekaf.yaml: {}
test://example/catalog.yaml:
collections:
testing/schema_with_properties:
schema:
type: object
properties:
id: { type: string }
required: [id]
key: [/id]
materializations:
testing/test_dekaf:
endpoint:
dekaf: foo/bar
bindings:
- source: testing/schema_with_properties
resource: {}
driver:
dataPlanes:
"1d:1d:1d:1d:1d:1d:1d:1d":
default: true
"##;

let outcome = common::run(fixture, "{}");
// Expect not to see any projections for the empty properties
insta::assert_debug_snapshot!(outcome);
}

#[test]
fn test_projection_not_created_for_empty_properties() {
let fixture = r##"
Expand Down
Loading

0 comments on commit 5a1f419

Please sign in to comment.