Skip to content

Commit

Permalink
agent: touch publications and dependency_hash
Browse files Browse the repository at this point in the history
Updates the agent to support `is_touch` in publications and the
`dependency_hash` in publications and controllers.

For specs published with `is_touch=true`, the agent will _not_ add new
`publication_specs`, update the `last_pub_id`, or update the `spec`
(the drafted spec must be identical).

The dependency hash column is added, and persisted as part of committing
a publication. Controllers compute the current hash value whenever they
run, and compare it to the persisted value in order to determine whether
any dependencies have changed.

When controllers detect a change in dependencies, their behavior
generally depends on whether any dependencies have been deleted or not.
As long as no dependencies were deleted, then controllers will publish
their spec using `is_touch = true`. If any dependencies have been
deleted, then task controllers will use a normal publication and update
the model to remove the deleted dependencies, which matches the previous
behavior.

Controllers will attempt to collapse multiple subsequent touch
publications into their publication histories in an attempt to limit the
noise of many publications in response to dependency changes.
  • Loading branch information
psFried committed Sep 10, 2024
1 parent f8d6870 commit 8df5d9d
Show file tree
Hide file tree
Showing 27 changed files with 764 additions and 341 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/agent-sql/src/controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct ControllerJob {
pub live_spec_id: Id,
pub catalog_name: String,
pub last_pub_id: Id,
pub last_build_id: Id,
pub live_spec: Option<TextJson<Box<RawValue>>>,
pub built_spec: Option<TextJson<Box<RawValue>>>,
pub spec_type: Option<CatalogType>,
Expand All @@ -21,6 +22,7 @@ pub struct ControllerJob {
pub failures: i32,
pub error: Option<String>,
pub data_plane_id: Id,
pub live_dependency_hash: Option<String>,
}

/// Returns the next available controller job, if any are due to be run. Filters
Expand All @@ -39,9 +41,11 @@ pub async fn dequeue(
ls.catalog_name as "catalog_name: String",
ls.controller_next_run,
ls.last_pub_id as "last_pub_id: Id",
ls.last_build_id as "last_build_id: Id",
ls.spec as "live_spec: TextJson<Box<RawValue>>",
ls.built_spec as "built_spec: TextJson<Box<RawValue>>",
ls.spec_type as "spec_type: CatalogType",
ls.dependency_hash as "live_dependency_hash",
cj.controller_version as "controller_version: i32",
cj.updated_at,
cj.logs_token,
Expand Down
7 changes: 5 additions & 2 deletions crates/agent-sql/src/live_specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct LiveSpec {
pub user_capability: Option<Capability>,
// Capabilities of the specification with respect to other roles.
pub spec_capabilities: Json<Vec<RoleGrant>>,
pub dependency_hash: Option<String>,
}

/// Returns a `LiveSpec` row for each of the given `names`. This will always return a row for each
Expand Down Expand Up @@ -69,7 +70,8 @@ pub async fn fetch_live_specs(
from role_grants
where starts_with(names, subject_role)),
'[]'
) as "spec_capabilities!: Json<Vec<RoleGrant>>"
) as "spec_capabilities!: Json<Vec<RoleGrant>>",
ls.dependency_hash
from unnest($2::text[]) names
left outer join live_specs ls on ls.catalog_name = names
"#,
Expand Down Expand Up @@ -149,7 +151,8 @@ pub async fn fetch_expanded_live_specs(
from role_grants
where starts_with(ls.catalog_name, subject_role)),
'[]'
) as "spec_capabilities!: Json<Vec<RoleGrant>>"
) as "spec_capabilities!: Json<Vec<RoleGrant>>",
ls.dependency_hash
from exp
join live_specs ls on ls.id = exp.id
where ls.spec is not null and not ls.catalog_name = any($3);
Expand Down
89 changes: 57 additions & 32 deletions crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub async fn lock_live_specs(
pub struct LiveSpecUpdate {
pub catalog_name: String,
pub live_spec_id: Id,
pub expect_pub_id: Id,
pub last_pub_id: Id,
pub expect_build_id: Id,
pub last_build_id: Id,
}

/// Updates all live_specs rows for a publication. Accepts all inputs as slices, which _must_ all
Expand All @@ -54,51 +54,70 @@ pub struct LiveSpecUpdate {
/// roll back the transaction if any are found.
pub async fn update_live_specs(
pub_id: Id,
build_id: Id,
catalog_names: &[String],
spec_types: &[CatalogType],
models: &[Option<Json<Box<RawValue>>>],
built_specs: &[Option<Json<Box<RawValue>>>],
expect_revisions: &[Id],
expect_build_ids: &[Id],
reads_from: &[Option<Json<Vec<String>>>],
writes_to: &[Option<Json<Vec<String>>>],
images: &[Option<String>],
image_tags: &[Option<String>],
data_plane_ids: &[Id],
is_touches: &[bool],
dependency_hashes: &[Option<&str>],
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<Vec<LiveSpecUpdate>> {
let fails = sqlx::query_as!(
LiveSpecUpdate,
r#"
with inputs(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id) as (
with inputs(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id, is_touch, dependency_hash) as (
select * from unnest(
$2::text[],
$3::catalog_spec_type[],
$4::json[],
$3::text[],
$4::catalog_spec_type[],
$5::json[],
$6::flowid[],
$7::json[],
$6::json[],
$7::flowid[],
$8::json[],
$9::text[],
$9::json[],
$10::text[],
$11::flowid[]
$11::text[],
$12::flowid[],
$13::boolean[],
$14::text[]
)
),
joined(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id, last_pub_id) as (
joined(catalog_name, spec_type, spec, built_spec, expect_build_id, reads_from, writes_to, image, image_tag, data_plane_id, is_touch, dependency_hash, last_build_id, next_pub_id) as (
select
inputs.*,
case when ls.spec is null then '00:00:00:00:00:00:00:00'::flowid else ls.last_pub_id end as last_pub_id
case when ls.spec is null then '00:00:00:00:00:00:00:00'::flowid else ls.last_build_id end as last_build_id,
case when inputs.is_touch then ls.last_pub_id else $1::flowid end as next_pub_id
from inputs
left outer join live_specs ls on ls.catalog_name = inputs.catalog_name
),
insert_live_specs(catalog_name,live_spec_id) as (
insert into live_specs (catalog_name, spec_type, spec, built_spec, last_build_id, last_pub_id, controller_next_run, reads_from, writes_to, connector_image_name, connector_image_tag, data_plane_id)
select
insert into live_specs (
catalog_name,
spec_type,
spec,
built_spec,
last_build_id,
last_pub_id,
controller_next_run,
reads_from,
writes_to,
connector_image_name,
connector_image_tag,
data_plane_id,
dependency_hash
) select
catalog_name,
spec_type,
spec,
built_spec,
$1::flowid,
$1::flowid,
$2::flowid,
joined.next_pub_id,
now(),
case when json_typeof(reads_from) is null then
null
Expand All @@ -112,7 +131,8 @@ pub async fn update_live_specs(
end,
image,
image_tag,
data_plane_id
data_plane_id,
dependency_hash
from joined
on conflict (catalog_name) do update set
updated_at = now(),
Expand All @@ -125,10 +145,12 @@ pub async fn update_live_specs(
reads_from = excluded.reads_from,
writes_to = excluded.writes_to,
connector_image_name = excluded.connector_image_name,
connector_image_tag = excluded.connector_image_tag
connector_image_tag = excluded.connector_image_tag,
dependency_hash = excluded.dependency_hash
returning
catalog_name,
id as live_spec_id
id as live_spec_id,
last_build_id
),
insert_controller_jobs as (
insert into controller_jobs(live_spec_id)
Expand All @@ -143,22 +165,25 @@ pub async fn update_live_specs(
select
joined.catalog_name as "catalog_name!: String",
insert_live_specs.live_spec_id as "live_spec_id!: Id",
joined.expect_pub_id as "expect_pub_id!: Id",
joined.last_pub_id as "last_pub_id!: Id"
joined.expect_build_id as "expect_build_id!: Id",
joined.last_build_id as "last_build_id!: Id"
from insert_live_specs
join joined using (catalog_name)
"#,
pub_id as Id, // 1
catalog_names, // 2
spec_types as &[CatalogType], // 3
models as &[Option<Json<Box<RawValue>>>], // 4
built_specs as &[Option<Json<Box<RawValue>>>], // 5
expect_revisions as &[Id], // 6
reads_from as &[Option<Json<Vec<String>>>], // 7
writes_to as &[Option<Json<Vec<String>>>], // 8
images as &[Option<String>], // 9
image_tags as &[Option<String>], // 10
data_plane_ids as &[Id], // 11
build_id as Id, // 2
catalog_names, // 3
spec_types as &[CatalogType], // 4
models as &[Option<Json<Box<RawValue>>>], // 5
built_specs as &[Option<Json<Box<RawValue>>>], // 6
expect_build_ids as &[Id], // 7
reads_from as &[Option<Json<Vec<String>>>], // 8
writes_to as &[Option<Json<Vec<String>>>], // 9
images as &[Option<String>], // 10
image_tags as &[Option<String>], // 11
data_plane_ids as &[Id], // 12
is_touches as &[bool], // 13
dependency_hashes as &[Option<&str>], // 14
)
.fetch_all(txn)
.await?;
Expand Down
15 changes: 8 additions & 7 deletions crates/agent/src/controllers/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ impl CaptureStatus {
.publications
.resolve_dependencies(state, control_plane)
.await?;
if let Some(pub_id) = dependencies.next_pub_id {
let draft = pending_pub.start_spec_update(
pub_id,
state,
format!("in response to publication of one or more depencencies"),
);
if !dependencies.deleted.is_empty() {
if dependencies.hash != state.live_dependency_hash {
if dependencies.deleted.is_empty() {
pending_pub.start_touch(state);
} else {
let draft = pending_pub.start_spec_update(
state,
format!("in response to publication of one or more depencencies"),
);
tracing::debug!(deleted_collections = ?dependencies.deleted, "disabling bindings for collections that have been deleted");
let draft_capture = draft
.captures
Expand Down
11 changes: 3 additions & 8 deletions crates/agent/src/controllers/catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,8 @@ impl TestStatus {
.publications
.resolve_dependencies(state, control_plane)
.await?;
if let Some(pub_id) = dependencies.next_pub_id {
pending_pub.start_spec_update(
pub_id,
state,
format!("in response to publication of one or more depencencies"),
);

if dependencies.hash != state.live_dependency_hash {
pending_pub.start_touch(state);
let result = pending_pub
.finish(state, &mut self.publications, control_plane)
.await
Expand All @@ -40,7 +35,7 @@ impl TestStatus {
result.error_for_status().do_not_retry()?;
// TODO(phil): This would be a great place to trigger an alert if the publication failed
} else {
// We're up-to-date with our dependencies, which means the test has been published successfully
// We've successfully published against the latest versions of the dependencies
self.passing = true;
}

Expand Down
18 changes: 11 additions & 7 deletions crates/agent/src/controllers/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ impl CollectionStatus {
.publications
.resolve_dependencies(state, control_plane)
.await?;
if let Some(pub_id) = dependencies.next_pub_id {
let draft = pending_pub.start_spec_update(
pub_id,
state,
format!("in response to publication of one or more depencencies"),
);
if !dependencies.deleted.is_empty() {

if dependencies.hash != state.live_dependency_hash {
if dependencies.deleted.is_empty() {
pending_pub.start_touch(state);
} else {
let draft = pending_pub.start_spec_update(
state,
"in response to publication of one or more depencencies",
);
let add_detail = handle_deleted_dependencies(draft, state, dependencies);
pending_pub.update_pending_draft(add_detail);
}
Expand Down Expand Up @@ -171,6 +173,7 @@ impl InferredSchemaStatus {
),
expect_pub_id: Some(state.last_pub_id),
model: Some(collection_def.clone()),
is_touch: false, // We intend to update the model
}
});
let (removed, new_schema) = collection_def
Expand Down Expand Up @@ -210,6 +213,7 @@ impl InferredSchemaStatus {
),
expect_pub_id: Some(state.last_pub_id),
model: Some(collection_def.clone()),
is_touch: false, // We intend to update the model
}
});
update_inferred_schema(draft_row, &schema)?;
Expand Down
3 changes: 2 additions & 1 deletion crates/agent/src/controllers/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ impl Handler for ControllerHandler {
catalog_name = %state.catalog_name,
enqueued_at = ?state.next_run,
last_update = %state.updated_at,
last_pub_id = %state.last_pub_id))]
last_pub_id = %state.last_pub_id,
last_build_id = %state.last_build_id))]
async fn run_controller<C: ControlPlane>(
state: &ControllerState,
next_status: &mut Status,
Expand Down
26 changes: 19 additions & 7 deletions crates/agent/src/controllers/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ impl MaterializationStatus {
.publications
.resolve_dependencies(state, control_plane)
.await?;
if let Some(pub_id) = dependencies.next_pub_id {
let draft = pending_pub.start_spec_update(
pub_id,
state,
format!("in response to publication of one or more depencencies"),
);
if !dependencies.deleted.is_empty() {

if dependencies.hash != state.live_dependency_hash {
if dependencies.deleted.is_empty() {
pending_pub.start_touch(state);
} else {
let draft = pending_pub.start_spec_update(
state,
format!("in response to publication of one or more depencencies"),
);
let mat_name = models::Materialization::new(&state.catalog_name);
let draft_row = draft.materializations.get_mut_by_key(&mat_name);
let model = draft_row.unwrap().model.as_mut().unwrap();
Expand Down Expand Up @@ -153,6 +155,15 @@ impl MaterializationStatus {
.materializations
.get_mut_by_key(&mat_name)
.ok_or_else(|| anyhow::anyhow!("missing draft row for materialization"))?;
if draft_materialization.is_touch {
// We started out just touching the spec, so we need to change to a "real" publication
// in order to update the model.
draft_materialization.is_touch = false;
draft_materialization.model = state
.live_spec
.as_ref()
.and_then(|s| s.as_materialization().cloned());
}
let draft_model = draft_materialization
.model
.as_mut()
Expand Down Expand Up @@ -358,6 +369,7 @@ impl SourceCaptureStatus {
),
expect_pub_id: Some(state.last_pub_id),
model: Some(model.clone()),
is_touch: false, // We intend to update the model
});

// Failures here are terminal
Expand Down
Loading

0 comments on commit 8df5d9d

Please sign in to comment.