Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MaterializationEndpoint::Dekaf #1642

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions crates/activate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use proto_flow::flow;
use proto_flow::flow::{self, materialization_spec};
use proto_gazette::{
broker::{self, JournalSpec, Label, LabelSelector, LabelSet},
consumer::{self, ShardSpec},
Expand Down Expand Up @@ -34,9 +34,12 @@ pub async fn activate_capture(
.as_ref()
.context("CaptureSpec missing recovery_log_template")?;

Some((shard_template, recovery_template))
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
} else {
None
TaskTemplate::Delete
};

let changes = converge_task_changes(
Expand Down Expand Up @@ -81,14 +84,17 @@ pub async fn activate_collection(
.as_ref()
.context("CollectionSpec.Derivation missing recovery_log_template")?;

Some((shard_template, recovery_template))
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
} else {
None
TaskTemplate::Delete
};

(task_template, Some(partition_template))
} else {
(None, None)
(TaskTemplate::Delete, None)
};

let (changes_1, changes_2) = futures::try_join!(
Expand Down Expand Up @@ -123,20 +129,29 @@ pub async fn activate_materialization(
ops_stats_template: Option<&broker::JournalSpec>,
initial_splits: usize,
) -> anyhow::Result<()> {
let task_template = if let Some(task_spec) = task_spec {
let shard_template = task_spec
.shard_template
.as_ref()
.context("MaterializationSpec missing shard_template")?;
let task_template = match task_spec {
Some(task_spec)
if task_spec.connector_type == materialization_spec::ConnectorType::Dekaf as i32 =>
{
TaskTemplate::UpsertVirtual
}
Some(task_spec) => {
let shard_template = task_spec
.shard_template
.as_ref()
.context("MaterializationSpec missing shard_template")?;

let recovery_template = task_spec
.recovery_log_template
.as_ref()
.context("MaterializationSpec missing recovery_log_template")?;
let recovery_template = task_spec
.recovery_log_template
.as_ref()
.context("MaterializationSpec missing recovery_log_template")?;

Some((shard_template, recovery_template))
} else {
None
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
}
None => TaskTemplate::Delete,
};

let changes = converge_task_changes(
Expand Down Expand Up @@ -253,15 +268,30 @@ async fn apply_changes(
Ok(())
}

/// Describes the desired future state of a task.
/// Virtual tasks get logs and stats journals,
/// but are otherwise purely descriptive and
/// do not get shards and recovery log journals
/// created for them like real tasks do.
#[derive(Clone, Copy, Debug)]
enum TaskTemplate<'a> {
jshearer marked this conversation as resolved.
Show resolved Hide resolved
UpsertReal {
shard: &'a ShardSpec,
recovery_journal: &'a JournalSpec,
},
UpsertVirtual,
Delete,
}

/// Converge a task by listing data-plane ShardSpecs and recovery log
/// JournalSpecs, and then applying updates to bring them into alignment
/// with the templated task configuration.
async fn converge_task_changes(
async fn converge_task_changes<'a>(
journal_client: &gazette::journal::Client,
shard_client: &gazette::shard::Client,
task_type: ops::TaskType,
task_name: &str,
template: Option<(&ShardSpec, &JournalSpec)>,
template: TaskTemplate<'a>,
ops_logs_template: Option<&broker::JournalSpec>,
ops_stats_template: Option<&broker::JournalSpec>,
initial_splits: usize,
Expand Down Expand Up @@ -295,7 +325,10 @@ async fn converge_task_changes(

// If (and only if) the task is being upserted,
// then ensure the creation of its ops collection partitions.
if template.is_some() {
if matches!(
template,
TaskTemplate::UpsertVirtual | TaskTemplate::UpsertReal { .. }
) {
changes.extend(ops_logs_change.into_iter());
changes.extend(ops_stats_change.into_iter());
}
Expand Down Expand Up @@ -401,7 +434,7 @@ fn unpack_journal_listing(
/// Determine the consumer shard and broker recovery log changes required to
/// converge from current `shards` and `recovery` splits into the desired state.
fn task_changes(
template: Option<(&ShardSpec, &JournalSpec)>,
template: TaskTemplate,
shards: &[(String, LabelSet, i64)],
recovery: &[(String, LabelSet, i64)],
initial_splits: usize,
Expand All @@ -412,7 +445,11 @@ fn task_changes(

// If the template is Some and no current shards match its prefix,
// then instantiate `initial_splits` new shards to create.
if let Some((shard_template, _)) = template {
if let TaskTemplate::UpsertReal {
shard: shard_template,
..
} = template
{
if !shards
.iter()
.any(|(id, _, _)| id.starts_with(&shard_template.id))
Expand Down Expand Up @@ -442,7 +479,10 @@ fn task_changes(

for (id, split, shard_revision) in shards {
match template {
Some((shard_template, recovery_template)) if id.starts_with(&shard_template.id) => {
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
} if id.starts_with(&shard_template.id) => {
let mut shard_spec = shard_template.clone();
let mut shard_set = shard_spec.labels.take().unwrap_or_default();

Expand Down Expand Up @@ -894,7 +934,10 @@ mod test {
let partition_changes =
partition_changes(Some(&partition_template), &all_partitions).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&all_shards,
&all_recovery,
4,
Expand All @@ -910,7 +953,10 @@ mod test {
{
let partition_changes = partition_changes(Some(&partition_template), &[]).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&[],
&[],
4,
Expand All @@ -926,7 +972,7 @@ mod test {
{
let partition_changes = partition_changes(None, &all_partitions).unwrap();
let task_changes = task_changes(
None,
TaskTemplate::Delete,
&all_shards,
&all_recovery,
4,
Expand Down Expand Up @@ -961,7 +1007,10 @@ mod test {
let partition_changes =
partition_changes(Some(&partition_template), &all_partitions).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&all_shards,
&all_recovery,
4,
Expand All @@ -983,7 +1032,10 @@ mod test {
map_shard_to_split(parent_id, parent_set, *parent_revision, false).unwrap();

let key_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&key_splits,
&all_recovery[..1],
4,
Expand All @@ -993,7 +1045,10 @@ mod test {
.unwrap();

let clock_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&clock_splits,
&all_recovery[..1],
4,
Expand Down
12 changes: 12 additions & 0 deletions crates/flowctl/src/generate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use anyhow::Context;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use proto_flow::{capture, derive, flow, materialize};
use std::str::FromStr;
use url::Url;

#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
Expand Down Expand Up @@ -287,6 +289,16 @@ async fn generate_missing_materialization_configs(
},
serde_json::from_str::<url::Url>(config.config.get()).ok(),
),
models::MaterializationEndpoint::Dekaf(config) => (
materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Dekaf as i32,
config_json: serde_json::to_string(config).unwrap(),
},
match &config {
models::DekafConfigContainer::Indirect(s) => Url::from_str(s.as_str()).ok(),
_ => None,
},
),
};
let missing_resource_urls: Vec<(url::Url, models::Collection)> = bindings
.iter()
Expand Down
11 changes: 11 additions & 0 deletions crates/models/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ impl LocalConfig {
}
}
}

/// Dekaf configuration. Currently empty, but present to enable easy addition
/// of config options when they show up in the future.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
pub struct DekafConfig {}

impl DekafConfig {
pub fn example() -> Self {
Self {}
}
}
6 changes: 3 additions & 3 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use crate::labels::{Label, LabelSelector, LabelSet};
pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint};
pub use catalogs::{Capability, Catalog, CatalogType};
pub use collections::{CollectionDef, Projection};
pub use connector::{split_image_tag, ConnectorConfig, LocalConfig};
pub use connector::{split_image_tag, ConnectorConfig, DekafConfig, LocalConfig};
pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef};
pub use derive_sqlite::DeriveUsingSqlite;
pub use derive_typescript::DeriveUsingTypescript;
Expand All @@ -35,8 +35,8 @@ pub use journals::{
AZURE_STORAGE_ACCOUNT_RE, GCS_BUCKET_RE, S3_BUCKET_RE,
};
pub use materializations::{
MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields,
SqliteConfig,
DekafConfigContainer, MaterializationBinding, MaterializationDef, MaterializationEndpoint,
MaterializationFields, SqliteConfig,
};
pub use raw_value::RawValue;
pub use references::{
Expand Down
11 changes: 10 additions & 1 deletion crates/models/src/materializations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{source::OnIncompatibleSchemaChange, Collection, Id};
use crate::{connector::DekafConfig, source::OnIncompatibleSchemaChange, Collection, Id};

use super::{
Capture, ConnectorConfig, Field, LocalConfig, RawValue, RelativeUrl, ShardTemplate, Source,
Expand Down Expand Up @@ -41,6 +41,13 @@ pub struct MaterializationDef {
pub delete: bool,
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
#[serde(untagged)]
pub enum DekafConfigContainer {
Direct(DekafConfig),
Copy link
Contributor Author

@jshearer jshearer Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgraettinger This seem right to you? The other endpoint types use a RawValue for their config because they're dynamic, and that lets them inherently accept either a string (which is treated as an indirect reference), or an object (which is treated as an inline config), whereas the dekaf config is defined in-tree.

I contemplated whether I should generalize this Direct / Indirect config container to the other endpoint types to make that distinction more explicit, but it didn't seem warranted, more of a preference than tech debt. Thoughts?

Indirect(String),
}

/// An Endpoint connector used for Flow materializations.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
Expand All @@ -50,6 +57,8 @@ pub enum MaterializationEndpoint {
Connector(ConnectorConfig),
/// # A local command (development only).
Local(LocalConfig),
/// # A Dekaf connection
Dekaf(DekafConfigContainer),
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, PartialEq)]
Expand Down
3 changes: 3 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ pub mod materialization_spec {
Invalid = 0,
Image = 8,
Local = 9,
Dekaf = 10,
}
impl ConnectorType {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -662,6 +663,7 @@ pub mod materialization_spec {
ConnectorType::Invalid => "INVALID",
ConnectorType::Image => "IMAGE",
ConnectorType::Local => "LOCAL",
ConnectorType::Dekaf => "DEKAF",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -670,6 +672,7 @@ pub mod materialization_spec {
"INVALID" => Some(Self::Invalid),
"IMAGE" => Some(Self::Image),
"LOCAL" => Some(Self::Local),
"DEKAF" => Some(Self::Dekaf),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4000,6 +4000,7 @@ impl serde::Serialize for materialization_spec::ConnectorType {
Self::Invalid => "INVALID",
Self::Image => "IMAGE",
Self::Local => "LOCAL",
Self::Dekaf => "DEKAF",
};
serializer.serialize_str(variant)
}
Expand All @@ -4014,6 +4015,7 @@ impl<'de> serde::Deserialize<'de> for materialization_spec::ConnectorType {
"INVALID",
"IMAGE",
"LOCAL",
"DEKAF",
];

struct GeneratedVisitor;
Expand Down Expand Up @@ -4057,6 +4059,7 @@ impl<'de> serde::Deserialize<'de> for materialization_spec::ConnectorType {
"INVALID" => Ok(materialization_spec::ConnectorType::Invalid),
"IMAGE" => Ok(materialization_spec::ConnectorType::Image),
"LOCAL" => Ok(materialization_spec::ConnectorType::Local),
"DEKAF" => Ok(materialization_spec::ConnectorType::Dekaf),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
Expand Down
Loading
Loading