Skip to content

Commit

Permalink
crates/activate: Dekaf materializations should not get shards or reco…
Browse files Browse the repository at this point in the history
…very journals, but they _should_ get ops stats+logs collections
  • Loading branch information
jshearer committed Sep 19, 2024
1 parent 265c080 commit 96653f0
Showing 1 changed file with 85 additions and 30 deletions.
115 changes: 85 additions & 30 deletions crates/activate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use proto_flow::flow;
use proto_flow::flow::{self, materialization_spec};
use proto_gazette::{
broker::{self, JournalSpec, Label, LabelSelector, LabelSet},
consumer::{self, ShardSpec},
Expand Down Expand Up @@ -34,9 +34,12 @@ pub async fn activate_capture(
.as_ref()
.context("CaptureSpec missing recovery_log_template")?;

Some((shard_template, recovery_template))
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
} else {
None
TaskTemplate::Delete
};

let changes = converge_task_changes(
Expand Down Expand Up @@ -81,14 +84,17 @@ pub async fn activate_collection(
.as_ref()
.context("CollectionSpec.Derivation missing recovery_log_template")?;

Some((shard_template, recovery_template))
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
} else {
None
TaskTemplate::Delete
};

(task_template, Some(partition_template))
} else {
(None, None)
(TaskTemplate::Delete, None)
};

let (changes_1, changes_2) = futures::try_join!(
Expand Down Expand Up @@ -123,20 +129,29 @@ pub async fn activate_materialization(
ops_stats_template: Option<&broker::JournalSpec>,
initial_splits: usize,
) -> anyhow::Result<()> {
let task_template = if let Some(task_spec) = task_spec {
let shard_template = task_spec
.shard_template
.as_ref()
.context("MaterializationSpec missing shard_template")?;
let task_template = match task_spec {
Some(task_spec)
if task_spec.connector_type == materialization_spec::ConnectorType::Dekaf as i32 =>
{
TaskTemplate::UpsertVirtual
}
Some(task_spec) => {
let shard_template = task_spec
.shard_template
.as_ref()
.context("MaterializationSpec missing shard_template")?;

let recovery_template = task_spec
.recovery_log_template
.as_ref()
.context("MaterializationSpec missing recovery_log_template")?;
let recovery_template = task_spec
.recovery_log_template
.as_ref()
.context("MaterializationSpec missing recovery_log_template")?;

Some((shard_template, recovery_template))
} else {
None
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
}
}
None => TaskTemplate::Delete,
};

let changes = converge_task_changes(
Expand Down Expand Up @@ -253,15 +268,30 @@ async fn apply_changes(
Ok(())
}

/// Describes the desired future state of a task.
/// Virtual tasks get logs and stats journals,
/// but are otherwise purely descriptive and
/// do not get shards and recovery log journals
/// created for them like real tasks do.
#[derive(Clone, Copy, Debug)]
enum TaskTemplate<'a> {
UpsertReal {
shard: &'a ShardSpec,
recovery_journal: &'a JournalSpec,
},
UpsertVirtual,
Delete,
}

/// Converge a task by listing data-plane ShardSpecs and recovery log
/// JournalSpecs, and then applying updates to bring them into alignment
/// with the templated task configuration.
async fn converge_task_changes(
async fn converge_task_changes<'a>(
journal_client: &gazette::journal::Client,
shard_client: &gazette::shard::Client,
task_type: ops::TaskType,
task_name: &str,
template: Option<(&ShardSpec, &JournalSpec)>,
template: TaskTemplate<'a>,
ops_logs_template: Option<&broker::JournalSpec>,
ops_stats_template: Option<&broker::JournalSpec>,
initial_splits: usize,
Expand Down Expand Up @@ -295,7 +325,10 @@ async fn converge_task_changes(

// If (and only if) the task is being upserted,
// then ensure the creation of its ops collection partitions.
if template.is_some() {
if matches!(
template,
TaskTemplate::UpsertVirtual | TaskTemplate::UpsertReal { .. }
) {
changes.extend(ops_logs_change.into_iter());
changes.extend(ops_stats_change.into_iter());
}
Expand Down Expand Up @@ -401,7 +434,7 @@ fn unpack_journal_listing(
/// Determine the consumer shard and broker recovery log changes required to
/// converge from current `shards` and `recovery` splits into the desired state.
fn task_changes(
template: Option<(&ShardSpec, &JournalSpec)>,
template: TaskTemplate,
shards: &[(String, LabelSet, i64)],
recovery: &[(String, LabelSet, i64)],
initial_splits: usize,
Expand All @@ -412,7 +445,11 @@ fn task_changes(

// If the template is Some and no current shards match its prefix,
// then instantiate `initial_splits` new shards to create.
if let Some((shard_template, _)) = template {
if let TaskTemplate::UpsertReal {
shard: shard_template,
..
} = template
{
if !shards
.iter()
.any(|(id, _, _)| id.starts_with(&shard_template.id))
Expand Down Expand Up @@ -442,7 +479,10 @@ fn task_changes(

for (id, split, shard_revision) in shards {
match template {
Some((shard_template, recovery_template)) if id.starts_with(&shard_template.id) => {
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
} if id.starts_with(&shard_template.id) => {
let mut shard_spec = shard_template.clone();
let mut shard_set = shard_spec.labels.take().unwrap_or_default();

Expand Down Expand Up @@ -894,7 +934,10 @@ mod test {
let partition_changes =
partition_changes(Some(&partition_template), &all_partitions).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&all_shards,
&all_recovery,
4,
Expand All @@ -910,7 +953,10 @@ mod test {
{
let partition_changes = partition_changes(Some(&partition_template), &[]).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&[],
&[],
4,
Expand All @@ -926,7 +972,7 @@ mod test {
{
let partition_changes = partition_changes(None, &all_partitions).unwrap();
let task_changes = task_changes(
None,
TaskTemplate::Delete,
&all_shards,
&all_recovery,
4,
Expand Down Expand Up @@ -961,7 +1007,10 @@ mod test {
let partition_changes =
partition_changes(Some(&partition_template), &all_partitions).unwrap();
let task_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&all_shards,
&all_recovery,
4,
Expand All @@ -983,7 +1032,10 @@ mod test {
map_shard_to_split(parent_id, parent_set, *parent_revision, false).unwrap();

let key_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&key_splits,
&all_recovery[..1],
4,
Expand All @@ -993,7 +1045,10 @@ mod test {
.unwrap();

let clock_changes = task_changes(
Some((&shard_template, &recovery_template)),
TaskTemplate::UpsertReal {
shard: shard_template,
recovery_journal: recovery_template,
},
&clock_splits,
&all_recovery[..1],
4,
Expand Down

0 comments on commit 96653f0

Please sign in to comment.