From fa9f9b0e68476dd6648f7470211bdb7727de4cb7 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 10 Sep 2024 17:12:42 -0400 Subject: [PATCH] agent: touch publications and dependency_hash Updates the agent to support `is_touch` in publications and the `dependency_hash` in publications and controllers. For specs published with `is_touch=true`, the agent will _not_ add new `publication_specs`, update the `last_pub_id`, or update the `spec` (the drafted spec must be identical). The dependency hash column is added, and persisted as part of committing a publication. Controllers compute the current hash value whenever they run, and compare it to the persisted value in order to determine whether any dependencies have changed. When controllers detect a change in dependencies, their behavior generally depends on whether any dependencies have been deleted or not. As long as no dependencies were deleted, then controllers will publish their spec using `is_touch = true`. If any dependencies have been deleted, then task controllers will use a normal publication and update the model to remove the deleted dependencies, which matches the previous behavior. Controllers will attempt to collapse multiple subsequent touch publications into their publication histories in an attempt to limit the noise of many publications in response to dependency changes. The `ActivationStatus` has been updated to use `last_build_id` instead of `last_pub_id` as the high water mark for activations. The previous behavior was never fully correct, but `last_build_id` must increase with every modification of the `built_spec`, so this new behavior is guaranteed not to miss activating any changes. Going along with the change to `ActivationStatus`, the `CONTROLLER_VERSION` constant was incremented in order to prevent old agent versions from clobbering changes to the activation status during agent rollout. --- Cargo.lock | 1 + crates/agent-sql/src/controllers.rs | 4 + crates/agent-sql/src/live_specs.rs | 7 +- crates/agent-sql/src/publications.rs | 89 +++++--- crates/agent/src/controllers/capture.rs | 15 +- crates/agent/src/controllers/catalog_test.rs | 11 +- crates/agent/src/controllers/collection.rs | 18 +- crates/agent/src/controllers/handler.rs | 3 +- .../agent/src/controllers/materialization.rs | 26 ++- crates/agent/src/controllers/mod.rs | 32 +-- .../src/controllers/publication_status.rs | 214 ++++++++++++------ ...st__materialization-status-round-trip.snap | 14 +- ...controllers__test__status_json_schema.snap | 55 ++++- crates/agent/src/controlplane.rs | 10 +- crates/agent/src/draft.rs | 2 +- .../dependencies_and_activations.rs | 115 +++++++++- crates/agent/src/integration_tests/harness.rs | 114 ++++++++++ .../src/integration_tests/locking_retries.rs | 128 ++++++++--- crates/agent/src/integration_tests/mod.rs | 78 ------- .../src/integration_tests/schema_evolution.rs | 2 +- .../integration_tests/user_publications.rs | 1 + crates/agent/src/publications.rs | 18 +- crates/agent/src/publications/handler.rs | 14 +- crates/agent/src/publications/specs.rs | 105 ++++++--- crates/agent/src/publications/status.rs | 10 +- crates/flowctl/src/local_specs.rs | 14 +- .../migrations/62_live_specs_dependencies.sql | 11 + 27 files changed, 767 insertions(+), 344 deletions(-) create mode 100644 supabase/migrations/62_live_specs_dependencies.sql diff --git a/Cargo.lock b/Cargo.lock index b567121abd..64026a082a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5731,6 +5731,7 @@ dependencies = [ "serde_json", "superslice", "url", + "xxhash-rust", ] [[package]] diff --git a/crates/agent-sql/src/controllers.rs b/crates/agent-sql/src/controllers.rs index fd5b270323..c7402238e0 100644 --- a/crates/agent-sql/src/controllers.rs +++ b/crates/agent-sql/src/controllers.rs @@ -10,6 +10,7 @@ pub struct ControllerJob { pub live_spec_id: Id, pub catalog_name: String, pub last_pub_id: Id, + pub last_build_id: Id, pub live_spec: Option>>, pub built_spec: Option>>, pub spec_type: Option, @@ -21,6 +22,7 @@ pub struct ControllerJob { pub failures: i32, pub error: Option, pub data_plane_id: Id, + pub live_dependency_hash: Option, } /// Returns the next available controller job, if any are due to be run. Filters @@ -39,9 +41,11 @@ pub async fn dequeue( ls.catalog_name as "catalog_name: String", ls.controller_next_run, ls.last_pub_id as "last_pub_id: Id", + ls.last_build_id as "last_build_id: Id", ls.spec as "live_spec: TextJson>", ls.built_spec as "built_spec: TextJson>", ls.spec_type as "spec_type: CatalogType", + ls.dependency_hash as "live_dependency_hash", cj.controller_version as "controller_version: i32", cj.updated_at, cj.logs_token, diff --git a/crates/agent-sql/src/live_specs.rs b/crates/agent-sql/src/live_specs.rs index cbf7653dc1..9654988d6a 100644 --- a/crates/agent-sql/src/live_specs.rs +++ b/crates/agent-sql/src/live_specs.rs @@ -38,6 +38,7 @@ pub struct LiveSpec { pub user_capability: Option, // Capabilities of the specification with respect to other roles. pub spec_capabilities: Json>, + pub dependency_hash: Option, } /// Returns a `LiveSpec` row for each of the given `names`. This will always return a row for each @@ -69,7 +70,8 @@ pub async fn fetch_live_specs( from role_grants where starts_with(names, subject_role)), '[]' - ) as "spec_capabilities!: Json>" + ) as "spec_capabilities!: Json>", + ls.dependency_hash from unnest($2::text[]) names left outer join live_specs ls on ls.catalog_name = names "#, @@ -149,7 +151,8 @@ pub async fn fetch_expanded_live_specs( from role_grants where starts_with(ls.catalog_name, subject_role)), '[]' - ) as "spec_capabilities!: Json>" + ) as "spec_capabilities!: Json>", + ls.dependency_hash from exp join live_specs ls on ls.id = exp.id where ls.spec is not null and not ls.catalog_name = any($3); diff --git a/crates/agent-sql/src/publications.rs b/crates/agent-sql/src/publications.rs index c6b72c5a05..236d372568 100644 --- a/crates/agent-sql/src/publications.rs +++ b/crates/agent-sql/src/publications.rs @@ -43,8 +43,8 @@ pub async fn lock_live_specs( pub struct LiveSpecUpdate { pub catalog_name: String, pub live_spec_id: Id, - pub expect_pub_id: Id, - pub last_pub_id: Id, + pub expect_build_id: Id, + pub last_build_id: Id, } /// Updates all live_specs rows for a publication. Accepts all inputs as slices, which _must_ all @@ -54,51 +54,70 @@ pub struct LiveSpecUpdate { /// roll back the transaction if any are found. pub async fn update_live_specs( pub_id: Id, + build_id: Id, catalog_names: &[String], spec_types: &[CatalogType], models: &[Option>>], built_specs: &[Option>>], - expect_revisions: &[Id], + expect_build_ids: &[Id], reads_from: &[Option>>], writes_to: &[Option>>], images: &[Option], image_tags: &[Option], data_plane_ids: &[Id], + is_touches: &[bool], + dependency_hashes: &[Option<&str>], txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> sqlx::Result> { let fails = sqlx::query_as!( LiveSpecUpdate, r#" - with inputs(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id) as ( + with inputs(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id, is_touch, dependency_hash) as ( select * from unnest( - $2::text[], - $3::catalog_spec_type[], - $4::json[], + $3::text[], + $4::catalog_spec_type[], $5::json[], - $6::flowid[], - $7::json[], + $6::json[], + $7::flowid[], $8::json[], - $9::text[], + $9::json[], $10::text[], - $11::flowid[] + $11::text[], + $12::flowid[], + $13::boolean[], + $14::text[] ) ), - joined(catalog_name, spec_type, spec, built_spec, expect_pub_id, reads_from, writes_to, image, image_tag, data_plane_id, last_pub_id) as ( + joined(catalog_name, spec_type, spec, built_spec, expect_build_id, reads_from, writes_to, image, image_tag, data_plane_id, is_touch, dependency_hash, last_build_id, next_pub_id) as ( select inputs.*, - case when ls.spec is null then '00:00:00:00:00:00:00:00'::flowid else ls.last_pub_id end as last_pub_id + case when ls.spec is null then '00:00:00:00:00:00:00:00'::flowid else ls.last_build_id end as last_build_id, + case when inputs.is_touch then ls.last_pub_id else $1::flowid end as next_pub_id from inputs left outer join live_specs ls on ls.catalog_name = inputs.catalog_name ), insert_live_specs(catalog_name,live_spec_id) as ( - insert into live_specs (catalog_name, spec_type, spec, built_spec, last_build_id, last_pub_id, controller_next_run, reads_from, writes_to, connector_image_name, connector_image_tag, data_plane_id) - select + insert into live_specs ( + catalog_name, + spec_type, + spec, + built_spec, + last_build_id, + last_pub_id, + controller_next_run, + reads_from, + writes_to, + connector_image_name, + connector_image_tag, + data_plane_id, + dependency_hash + ) select catalog_name, spec_type, spec, built_spec, - $1::flowid, - $1::flowid, + $2::flowid, + joined.next_pub_id, now(), case when json_typeof(reads_from) is null then null @@ -112,7 +131,8 @@ pub async fn update_live_specs( end, image, image_tag, - data_plane_id + data_plane_id, + dependency_hash from joined on conflict (catalog_name) do update set updated_at = now(), @@ -125,10 +145,12 @@ pub async fn update_live_specs( reads_from = excluded.reads_from, writes_to = excluded.writes_to, connector_image_name = excluded.connector_image_name, - connector_image_tag = excluded.connector_image_tag + connector_image_tag = excluded.connector_image_tag, + dependency_hash = excluded.dependency_hash returning catalog_name, - id as live_spec_id + id as live_spec_id, + last_build_id ), insert_controller_jobs as ( insert into controller_jobs(live_spec_id) @@ -143,22 +165,25 @@ pub async fn update_live_specs( select joined.catalog_name as "catalog_name!: String", insert_live_specs.live_spec_id as "live_spec_id!: Id", - joined.expect_pub_id as "expect_pub_id!: Id", - joined.last_pub_id as "last_pub_id!: Id" + joined.expect_build_id as "expect_build_id!: Id", + joined.last_build_id as "last_build_id!: Id" from insert_live_specs join joined using (catalog_name) "#, pub_id as Id, // 1 - catalog_names, // 2 - spec_types as &[CatalogType], // 3 - models as &[Option>>], // 4 - built_specs as &[Option>>], // 5 - expect_revisions as &[Id], // 6 - reads_from as &[Option>>], // 7 - writes_to as &[Option>>], // 8 - images as &[Option], // 9 - image_tags as &[Option], // 10 - data_plane_ids as &[Id], // 11 + build_id as Id, // 2 + catalog_names, // 3 + spec_types as &[CatalogType], // 4 + models as &[Option>>], // 5 + built_specs as &[Option>>], // 6 + expect_build_ids as &[Id], // 7 + reads_from as &[Option>>], // 8 + writes_to as &[Option>>], // 9 + images as &[Option], // 10 + image_tags as &[Option], // 11 + data_plane_ids as &[Id], // 12 + is_touches as &[bool], // 13 + dependency_hashes as &[Option<&str>], // 14 ) .fetch_all(txn) .await?; diff --git a/crates/agent/src/controllers/capture.rs b/crates/agent/src/controllers/capture.rs index b62d8069b1..758adab58c 100644 --- a/crates/agent/src/controllers/capture.rs +++ b/crates/agent/src/controllers/capture.rs @@ -34,13 +34,14 @@ impl CaptureStatus { .publications .resolve_dependencies(state, control_plane) .await?; - if let Some(pub_id) = dependencies.next_pub_id { - let draft = pending_pub.start_spec_update( - pub_id, - state, - format!("in response to publication of one or more depencencies"), - ); - if !dependencies.deleted.is_empty() { + if dependencies.hash != state.live_dependency_hash { + if dependencies.deleted.is_empty() { + pending_pub.start_touch(state); + } else { + let draft = pending_pub.start_spec_update( + state, + format!("in response to publication of one or more depencencies"), + ); tracing::debug!(deleted_collections = ?dependencies.deleted, "disabling bindings for collections that have been deleted"); let draft_capture = draft .captures diff --git a/crates/agent/src/controllers/catalog_test.rs b/crates/agent/src/controllers/catalog_test.rs index b8a5cbce78..4d5a5ae913 100644 --- a/crates/agent/src/controllers/catalog_test.rs +++ b/crates/agent/src/controllers/catalog_test.rs @@ -24,13 +24,8 @@ impl TestStatus { .publications .resolve_dependencies(state, control_plane) .await?; - if let Some(pub_id) = dependencies.next_pub_id { - pending_pub.start_spec_update( - pub_id, - state, - format!("in response to publication of one or more depencencies"), - ); - + if dependencies.hash != state.live_dependency_hash { + pending_pub.start_touch(state); let result = pending_pub .finish(state, &mut self.publications, control_plane) .await @@ -40,7 +35,7 @@ impl TestStatus { result.error_for_status().do_not_retry()?; // TODO(phil): This would be a great place to trigger an alert if the publication failed } else { - // We're up-to-date with our dependencies, which means the test has been published successfully + // We've successfully published against the latest versions of the dependencies self.passing = true; } diff --git a/crates/agent/src/controllers/collection.rs b/crates/agent/src/controllers/collection.rs index 5588792ebf..2dad2ce081 100644 --- a/crates/agent/src/controllers/collection.rs +++ b/crates/agent/src/controllers/collection.rs @@ -33,13 +33,15 @@ impl CollectionStatus { .publications .resolve_dependencies(state, control_plane) .await?; - if let Some(pub_id) = dependencies.next_pub_id { - let draft = pending_pub.start_spec_update( - pub_id, - state, - format!("in response to publication of one or more depencencies"), - ); - if !dependencies.deleted.is_empty() { + + if dependencies.hash != state.live_dependency_hash { + if dependencies.deleted.is_empty() { + pending_pub.start_touch(state); + } else { + let draft = pending_pub.start_spec_update( + state, + "in response to publication of one or more depencencies", + ); let add_detail = handle_deleted_dependencies(draft, state, dependencies); pending_pub.update_pending_draft(add_detail); } @@ -171,6 +173,7 @@ impl InferredSchemaStatus { ), expect_pub_id: Some(state.last_pub_id), model: Some(collection_def.clone()), + is_touch: false, // We intend to update the model } }); let (removed, new_schema) = collection_def @@ -210,6 +213,7 @@ impl InferredSchemaStatus { ), expect_pub_id: Some(state.last_pub_id), model: Some(collection_def.clone()), + is_touch: false, // We intend to update the model } }); update_inferred_schema(draft_row, &schema)?; diff --git a/crates/agent/src/controllers/handler.rs b/crates/agent/src/controllers/handler.rs index e681875ab3..0752f0956e 100644 --- a/crates/agent/src/controllers/handler.rs +++ b/crates/agent/src/controllers/handler.rs @@ -126,7 +126,8 @@ impl Handler for ControllerHandler { catalog_name = %state.catalog_name, enqueued_at = ?state.next_run, last_update = %state.updated_at, - last_pub_id = %state.last_pub_id))] + last_pub_id = %state.last_pub_id, + last_build_id = %state.last_build_id))] async fn run_controller( state: &ControllerState, next_status: &mut Status, diff --git a/crates/agent/src/controllers/materialization.rs b/crates/agent/src/controllers/materialization.rs index 66fd561496..e8f5270ecb 100644 --- a/crates/agent/src/controllers/materialization.rs +++ b/crates/agent/src/controllers/materialization.rs @@ -37,13 +37,15 @@ impl MaterializationStatus { .publications .resolve_dependencies(state, control_plane) .await?; - if let Some(pub_id) = dependencies.next_pub_id { - let draft = pending_pub.start_spec_update( - pub_id, - state, - format!("in response to publication of one or more depencencies"), - ); - if !dependencies.deleted.is_empty() { + + if dependencies.hash != state.live_dependency_hash { + if dependencies.deleted.is_empty() { + pending_pub.start_touch(state); + } else { + let draft = pending_pub.start_spec_update( + state, + format!("in response to publication of one or more depencencies"), + ); let mat_name = models::Materialization::new(&state.catalog_name); let draft_row = draft.materializations.get_mut_by_key(&mat_name); let model = draft_row.unwrap().model.as_mut().unwrap(); @@ -153,6 +155,15 @@ impl MaterializationStatus { .materializations .get_mut_by_key(&mat_name) .ok_or_else(|| anyhow::anyhow!("missing draft row for materialization"))?; + if draft_materialization.is_touch { + // We started out just touching the spec, so we need to change to a "real" publication + // in order to update the model. + draft_materialization.is_touch = false; + draft_materialization.model = state + .live_spec + .as_ref() + .and_then(|s| s.as_materialization().cloned()); + } let draft_model = draft_materialization .model .as_mut() @@ -358,6 +369,7 @@ impl SourceCaptureStatus { ), expect_pub_id: Some(state.last_pub_id), model: Some(model.clone()), + is_touch: false, // We intend to update the model }); // Failures here are terminal diff --git a/crates/agent/src/controllers/mod.rs b/crates/agent/src/controllers/mod.rs index 705bf02976..20b25fc530 100644 --- a/crates/agent/src/controllers/mod.rs +++ b/crates/agent/src/controllers/mod.rs @@ -27,7 +27,7 @@ pub use handler::ControllerHandler; /// to "upgrade" it. Any controller state having a higher version than this _must_ be ignored. /// /// Increment this version whenever we need to ensure that controllers re-visit all live specs. -pub const CONTROLLER_VERSION: i32 = 1; +pub const CONTROLLER_VERSION: i32 = 2; /// Represents the state of a specific controller and catalog_name. #[derive(Clone, Debug, Serialize)] @@ -52,10 +52,10 @@ pub struct ControllerState { /// most recent run was successful. If `error` is `Some`, then `failures` /// will be > 0. pub error: Option, - /// The `last_pub_id` of the corresponding `live_specs` row. This is used - /// to determine when the controller needs to re-publish a task, by - /// comparing this value to the `last_pub_id`s of all its dependencies. + /// The `last_pub_id` of the corresponding `live_specs` row. pub last_pub_id: Id, + /// The `last_build_id` of the corresponding `live_specs` row. + pub last_build_id: Id, /// The logs token that's used for all operations of this controller. Every /// run of a given controller uses the same `logs_token` so that you can /// see all the logs in one place. @@ -68,6 +68,9 @@ pub struct ControllerState { pub current_status: Status, /// ID of the data plane in which this specification lives. pub data_plane_id: Id, + /// The `dependency_hash` of the `live_specs` row, used to determine whether any + /// dependencies have had their models changed. + pub live_dependency_hash: Option, } impl ControllerState { @@ -139,10 +142,12 @@ impl ControllerState { catalog_name: job.catalog_name.clone(), error: job.error.clone(), last_pub_id: job.last_pub_id.into(), + last_build_id: job.last_build_id.into(), logs_token: job.logs_token, controller_version: job.controller_version, current_status: status, data_plane_id: job.data_plane_id.into(), + live_dependency_hash: job.live_dependency_hash.clone(), }; Ok(controller_state) } @@ -442,21 +447,6 @@ impl Status { } } -/// Selects the smallest next run from among the arguments, returning `None` -/// only if all `next_runs` are `None`. -#[allow(dead_code)] -fn reduce_next_run(next_runs: &[Option]) -> Option { - let mut min: Option = None; - for next_run in next_runs { - match (min, *next_run) { - (Some(l), Some(r)) => min = Some(l.min(r)), - (None, Some(r)) => min = Some(r), - (_, None) => { /* nada */ } - } - } - min -} - fn datetime_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { serde_json::from_value(serde_json::json!({ "type": "string", @@ -505,6 +495,8 @@ mod test { scope: Some("flow://materializations/snails/shells".to_string()), detail: "a_field simply cannot be tolerated".to_string(), }], + count: 1, + is_touch: false, }; let mut history = VecDeque::new(); history.push_front(pub_status); @@ -518,9 +510,9 @@ mod test { add_bindings, }), publications: PublicationStatus { - target_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]), max_observed_pub_id: Id::new([1, 2, 3, 4, 5, 6, 7, 8]), history, + dependency_hash: Some("abc12345".to_string()), }, }); diff --git a/crates/agent/src/controllers/publication_status.rs b/crates/agent/src/controllers/publication_status.rs index 9898cb01a2..62594dbedc 100644 --- a/crates/agent/src/controllers/publication_status.rs +++ b/crates/agent/src/controllers/publication_status.rs @@ -21,17 +21,14 @@ pub struct Dependencies { /// Dependencies that have been deleted. If this is non-empty, then the dependent needs to be /// published. pub deleted: BTreeSet, - /// If a publication is required, then this will be Some non-zero publication id. - /// If `next_pub_id` is `None`, then no publication in order to stay up to date with - /// respect to dependencies. - pub next_pub_id: Option, + pub hash: Option, } /// Status of the activation of the task in the data-plane #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)] pub struct ActivationStatus { - /// The publication id that was last activated in the data plane. - /// If this is less than the `last_pub_id` of the controlled spec, + /// The build id that was last activated in the data plane. + /// If this is less than the `last_build_id` of the controlled spec, /// then an activation is still pending. #[serde(default = "Id::zero", skip_serializing_if = "Id::is_zero")] pub last_activated: Id, @@ -51,7 +48,7 @@ impl ActivationStatus { state: &ControllerState, control_plane: &mut C, ) -> anyhow::Result<()> { - if state.last_pub_id > self.last_activated { + if state.last_build_id > self.last_activated { let name = state.catalog_name.clone(); let built_spec = state.built_spec.as_ref().expect("built_spec must be Some"); control_plane @@ -59,8 +56,8 @@ impl ActivationStatus { .await .with_retry(backoff_data_plane_activate(state.failures)) .context("failed to activate")?; - tracing::debug!(last_activated = %self.last_activated, "activated"); - self.last_activated = state.last_pub_id; + tracing::debug!(last_activated = %state.last_build_id, "activated"); + self.last_activated = state.last_build_id; } Ok(()) } @@ -87,6 +84,27 @@ pub struct PublicationInfo { /// Errors will be non-empty for publications that were not successful #[serde(default, skip_serializing_if = "Vec::is_empty")] pub errors: Vec, + #[serde(default)] + pub is_touch: bool, + #[serde(default = "default_count", skip_serializing_if = "is_one")] + #[schemars(schema_with = "count_schema")] + pub count: u32, +} + +fn default_count() -> u32 { + 1 +} + +fn is_one(i: &u32) -> bool { + *i == 1 +} + +fn count_schema(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + serde_json::from_value(serde_json::json!({ + "type": "integer", + "minimum": 1, + })) + .unwrap() } impl PublicationInfo { @@ -96,6 +114,14 @@ impl PublicationInfo { } pub fn observed(publication: &PublicationResult) -> Self { + let is_touch = publication.draft.tests.iter().all(|r| r.is_touch) + && publication.draft.collections.iter().all(|r| r.is_touch) + && publication.draft.captures.iter().all(|r| r.is_touch) + && publication + .draft + .materializations + .iter() + .all(|r| r.is_touch); PublicationInfo { id: publication.pub_id, created: Some(publication.started_at), @@ -103,15 +129,27 @@ impl PublicationInfo { result: Some(publication.status.clone()), detail: publication.detail.clone(), errors: publication.draft_errors(), + count: 1, + is_touch, } } + + fn try_reduce(&mut self, other: PublicationInfo) -> Option { + if !self.is_touch || !other.is_touch || self.result != other.result { + return Some(other); + } + self.count += other.count; + self.completed = other.completed; + self.errors = other.errors; + self.detail = other.detail; + None + } } /// Represents a draft that is pending publication #[derive(Debug)] pub struct PendingPublication { - /// The publication id, or 0 if none has yet been determined - pub id: Id, + pub is_touch: bool, /// The draft to be published pub draft: tables::DraftCatalog, /// Reasons for updating the draft, which will be joined together to become @@ -130,7 +168,7 @@ impl PartialEq for PendingPublication { impl PendingPublication { pub fn new() -> Self { PendingPublication { - id: Id::zero(), + is_touch: false, draft: tables::DraftCatalog::default(), details: Vec::new(), } @@ -140,23 +178,56 @@ impl PendingPublication { self.draft.spec_count() > 0 } + pub fn start_touch(&mut self, state: &ControllerState) { + tracing::info!("starting touch"); + + let model = state + .live_spec + .as_ref() + .expect("cannot start touch after live spec has been deleted"); + self.draft = tables::DraftCatalog::default(); + let catalog_type = state.live_spec.as_ref().unwrap().catalog_type(); + let scope = tables::synthetic_scope(catalog_type, &state.catalog_name); + self.draft + .add_spec( + catalog_type, + &state.catalog_name, + scope, + Some(state.last_pub_id), + Some(&model.to_raw_value()), + true, + ) + .unwrap(); + } + pub fn start_spec_update( &mut self, - pub_id: models::Id, state: &ControllerState, detail: impl Into, ) -> &mut tables::DraftCatalog { - self.id = pub_id; + tracing::info!("starting spec update"); let model = state .live_spec .as_ref() .expect("cannot start spec update after live spec has been deleted"); - self.draft = draft_publication(state, model); + self.draft = tables::DraftCatalog::default(); + let scope = tables::synthetic_scope(model.catalog_type(), &state.catalog_name); + self.draft + .add_spec( + model.catalog_type(), + &state.catalog_name, + scope, + Some(state.last_pub_id), + Some(&model.to_raw_value()), + false, + ) + .unwrap(); self.update_pending_draft(detail) } pub fn update_pending_draft(&mut self, detail: impl Into) -> &mut tables::DraftCatalog { + self.is_touch = false; self.details.push(detail.into()); &mut self.draft } @@ -167,17 +238,37 @@ impl PendingPublication { status: &mut PublicationStatus, control_plane: &mut C, ) -> anyhow::Result { - // If no publication id has been assigned, do so now - if self.id.is_zero() { - self.id = control_plane.next_pub_id(); - status.target_pub_id = self.id; - } - let PendingPublication { id, draft, details } = - std::mem::replace(self, PendingPublication::new()); + let pub_id = if self.is_touch { + debug_assert!( + self.draft.captures.iter().all(|c| c.is_touch), + "all drafted specs must have is_touch: true for touch pub" + ); + debug_assert!( + self.draft.collections.iter().all(|c| c.is_touch), + "all drafted specs must have is_touch: true for touch pub" + ); + debug_assert!( + self.draft.materializations.iter().all(|c| c.is_touch), + "all drafted specs must have is_touch: true for touch pub" + ); + debug_assert!( + self.draft.tests.iter().all(|c| c.is_touch), + "all drafted specs must have is_touch: true for touch pub" + ); + + state.last_pub_id + } else { + control_plane.next_pub_id() + }; + let PendingPublication { + is_touch, + draft, + details, + } = std::mem::replace(self, PendingPublication::new()); let detail = details.join(", "); let result = control_plane - .publish(id, Some(detail), state.logs_token, draft) + .publish(pub_id, Some(detail), state.logs_token, draft) .await; match result.as_ref() { Ok(r) => { @@ -187,12 +278,12 @@ impl PendingPublication { .notify_dependents(state.catalog_name.clone()) .await .context("notifying dependents after successful publication")?; - status.max_observed_pub_id = id; + status.max_observed_pub_id = pub_id; } } Err(err) => { let info = PublicationInfo { - id, + id: pub_id, completed: Some(control_plane.current_time()), detail: Some(details.join(", ")), errors: vec![crate::draft::Error { @@ -201,6 +292,8 @@ impl PendingPublication { }], created: None, result: None, + count: 1, + is_touch, }; status.record_result(info); } @@ -213,14 +306,8 @@ impl PendingPublication { /// This does not include any information on user-initiated publications. #[derive(Debug, Serialize, Deserialize, PartialEq, JsonSchema)] pub struct PublicationStatus { - /// The largest `last_pub_id` among all of this spec's dependencies. - /// For example, for materializations the dependencies are all of - /// the collections of all enabled binding `source`s, as well as the - /// `sourceCapture`. If any of these are published, it will increase - /// the `target_pub_id` and the materialization will be published at - /// `target_pub_id` in turn. - #[serde(default = "Id::zero", skip_serializing_if = "Id::is_zero")] - pub target_pub_id: Id, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub dependency_hash: Option, /// The publication id at which the controller has last notified dependent /// specs. A publication of the controlled spec will cause the controller to /// notify the controllers of all dependent specs. When it does so, it sets @@ -235,9 +322,9 @@ pub struct PublicationStatus { impl Clone for PublicationStatus { fn clone(&self) -> Self { PublicationStatus { - target_pub_id: self.target_pub_id, max_observed_pub_id: self.max_observed_pub_id, history: self.history.clone(), + dependency_hash: self.dependency_hash.clone(), } } } @@ -245,7 +332,7 @@ impl Clone for PublicationStatus { impl Default for PublicationStatus { fn default() -> Self { PublicationStatus { - target_pub_id: Id::zero(), + dependency_hash: None, max_observed_pub_id: Id::zero(), history: VecDeque::new(), } @@ -261,10 +348,11 @@ impl PublicationStatus { control_plane: &mut C, ) -> anyhow::Result { let Some(model) = state.live_spec.as_ref() else { + // The spec is being deleted, and thus has no dependencies return Ok(Dependencies { live: Default::default(), deleted: Default::default(), - next_pub_id: None, + hash: None, }); }; let all_deps = model.all_dependencies(); @@ -277,23 +365,22 @@ impl PublicationStatus { deleted.remove(name); } - let next_pub_id = if deleted.is_empty() { - live.last_pub_ids() - .max() - .filter(|id| id > &state.last_pub_id) - } else { - // If any dependencies have been deleted, then we'll need to publish - // at a new pub_id, since we cannot know the `last_pub_id` of - // deleted specs. - Some(control_plane.next_pub_id()) + let dep_hasher = tables::Dependencies::from_live(&live); + let hash = match model { + AnySpec::Capture(c) => dep_hasher.compute_hash(c), + AnySpec::Collection(c) => dep_hasher.compute_hash(c), + AnySpec::Materialization(m) => dep_hasher.compute_hash(m), + AnySpec::Test(t) => dep_hasher.compute_hash(t), }; - if let Some(next) = next_pub_id { - self.target_pub_id = next; + + if hash != state.live_dependency_hash { + tracing::info!(?state.live_dependency_hash, new_hash = ?hash, deleted_count = %deleted.len(), "spec dependencies have changed"); } + Ok(Dependencies { live, deleted, - next_pub_id, + hash, }) } @@ -311,28 +398,19 @@ impl PublicationStatus { Ok(()) } + // TODO: fold touch publication into history pub fn record_result(&mut self, publication: PublicationInfo) { tracing::info!(pub_id = ?publication.id, status = ?publication.result, "controller finished publication"); - self.history.push_front(publication); - while self.history.len() > PublicationStatus::MAX_HISTORY { - self.history.pop_back(); + let maybe_new_entry = if let Some(last_entry) = self.history.front_mut() { + last_entry.try_reduce(publication) + } else { + Some(publication) + }; + if let Some(new_entry) = maybe_new_entry { + self.history.push_front(new_entry); + while self.history.len() > PublicationStatus::MAX_HISTORY { + self.history.pop_back(); + } } } } - -fn draft_publication(state: &ControllerState, spec: &AnySpec) -> tables::DraftCatalog { - let mut draft = tables::DraftCatalog::default(); - let scope = tables::synthetic_scope(spec.catalog_type(), &state.catalog_name); - - draft - .add_spec( - spec.catalog_type(), - &state.catalog_name, - scope, - Some(state.last_pub_id), - Some(&spec.to_raw_value()), - ) - .unwrap(); - - draft -} diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap index e420588046..064e737624 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__materialization-status-round-trip.snap @@ -16,7 +16,9 @@ StatusSnapshot { }, ), publications: PublicationStatus { - target_pub_id: 0102030405060708, + dependency_hash: Some( + "abc12345", + ), max_observed_pub_id: 0102030405060708, history: [ PublicationInfo { @@ -61,6 +63,8 @@ StatusSnapshot { detail: "a_field simply cannot be tolerated", }, ], + is_touch: false, + count: 1, }, ], }, @@ -69,7 +73,7 @@ StatusSnapshot { }, }, ), - json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"target_pub_id\": \"0102030405060708\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ]\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}", + json: "{\n \"type\": \"Materialization\",\n \"source_capture\": {\n \"add_bindings\": [\n \"snails/shells\"\n ]\n },\n \"publications\": {\n \"dependency_hash\": \"abc12345\",\n \"max_observed_pub_id\": \"0102030405060708\",\n \"history\": [\n {\n \"id\": \"0403020101020304\",\n \"created\": \"2024-05-30T09:10:11Z\",\n \"completed\": \"2024-05-30T09:10:11Z\",\n \"detail\": \"some detail\",\n \"result\": {\n \"type\": \"buildFailed\",\n \"incompatible_collections\": [\n {\n \"collection\": \"snails/water\",\n \"affected_materializations\": [\n {\n \"name\": \"snails/materialize\",\n \"fields\": [\n {\n \"field\": \"a_field\",\n \"reason\": \"do not like\"\n }\n ]\n }\n ]\n }\n ]\n },\n \"errors\": [\n {\n \"catalog_name\": \"snails/shells\",\n \"scope\": \"flow://materializations/snails/shells\",\n \"detail\": \"a_field simply cannot be tolerated\"\n }\n ],\n \"is_touch\": false\n }\n ]\n },\n \"activation\": {\n \"last_activated\": \"0102030404030201\"\n }\n}", parsed: Materialization( MaterializationStatus { source_capture: Some( @@ -83,7 +87,9 @@ StatusSnapshot { }, ), publications: PublicationStatus { - target_pub_id: 0102030405060708, + dependency_hash: Some( + "abc12345", + ), max_observed_pub_id: 0102030405060708, history: [ PublicationInfo { @@ -128,6 +134,8 @@ StatusSnapshot { detail: "a_field simply cannot be tolerated", }, ], + is_touch: false, + count: 1, }, ], }, diff --git a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap index 66e07ec02b..5c37bbafb7 100644 --- a/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap +++ b/crates/agent/src/controllers/snapshots/agent__controllers__test__status_json_schema.snap @@ -230,19 +230,34 @@ expression: schema "type" ], "type": "object" + }, + { + "description": "Optimistic locking failure for one or more specs in the publication. This case should typically be retried by the publisher.", + "properties": { + "failures": { + "items": { + "$ref": "#/definitions/LockFailure" + }, + "type": "array" + }, + "type": { + "enum": [ + "buildIdLockFailure" + ], + "type": "string" + } + }, + "required": [ + "type" + ], + "type": "object" } ] }, "LockFailure": { "description": "Represents an optimistic lock failure when trying to update live specs.", "properties": { - "catalog_name": { - "type": "string" - }, - "expect_pub_id": { - "$ref": "#/definitions/Id" - }, - "last_pub_id": { + "actual": { "anyOf": [ { "$ref": "#/definitions/Id" @@ -251,11 +266,17 @@ expression: schema "type": "null" } ] + }, + "catalog_name": { + "type": "string" + }, + "expected": { + "$ref": "#/definitions/Id" } }, "required": [ "catalog_name", - "expect_pub_id" + "expected" ], "type": "object" }, @@ -267,6 +288,10 @@ expression: schema "format": "date-time", "type": "string" }, + "count": { + "minimum": 1.0, + "type": "integer" + }, "created": { "description": "Time at which the publication was initiated", "format": "date-time", @@ -289,6 +314,10 @@ expression: schema "id": { "$ref": "#/definitions/Id" }, + "is_touch": { + "default": false, + "type": "boolean" + }, "result": { "anyOf": [ { @@ -309,6 +338,12 @@ expression: schema "PublicationStatus": { "description": "Information on the publications performed by the controller. This does not include any information on user-initiated publications.", "properties": { + "dependency_hash": { + "type": [ + "string", + "null" + ] + }, "history": { "description": "A limited history of publications performed by this controller", "items": { @@ -319,10 +354,6 @@ expression: schema "max_observed_pub_id": { "$ref": "#/definitions/Id", "description": "The publication id at which the controller has last notified dependent specs. A publication of the controlled spec will cause the controller to notify the controllers of all dependent specs. When it does so, it sets `max_observed_pub_id` to the current `last_pub_id`, so that it can avoid notifying dependent controllers unnecessarily." - }, - "target_pub_id": { - "$ref": "#/definitions/Id", - "description": "The largest `last_pub_id` among all of this spec's dependencies. For example, for materializations the dependencies are all of the collections of all enabled binding `source`s, as well as the `sourceCapture`. If any of these are published, it will increase the `target_pub_id` and the materialization will be published at `target_pub_id` in turn." } }, "required": [ diff --git a/crates/agent/src/controlplane.rs b/crates/agent/src/controlplane.rs index 6750de4225..5252eabad1 100644 --- a/crates/agent/src/controlplane.rs +++ b/crates/agent/src/controlplane.rs @@ -308,8 +308,10 @@ impl ControlPlane for PGControlPlane { row.id.into(), row.data_plane_id.into(), row.last_pub_id.into(), + row.last_build_id.into(), model_json, built_spec_json, + row.dependency_hash, ) .with_context(|| format!("deserializing specs for {:?}", row.catalog_name))?; } @@ -373,15 +375,17 @@ impl ControlPlane for PGControlPlane { return Ok(built.build_failed()); } let commit_result = self.publications_handler.commit(built).await?; - let JobStatus::ExpectPubIdMismatch { failures } = &commit_result.status else { + + // Has there been an optimistic locking failure? + let JobStatus::BuildIdLockFailure { failures } = &commit_result.status else { + // All other statuses are terminal. return Ok(commit_result); }; - // There's been an optimistic locking failure. if attempt == Publisher::MAX_OPTIMISTIC_LOCKING_RETRIES { tracing::error!(%attempt, ?failures, "giving up after maximum number of optimistic locking retries"); return Ok(commit_result); } else { - tracing::info!(%attempt, ?failures, "publish failed due to optimistic locking failures (will retry)"); + tracing::info!(%attempt, ?failures, "publish failed due to optimistic locking failure (will retry)"); maybe_draft = Some(commit_result.draft); } } diff --git a/crates/agent/src/draft.rs b/crates/agent/src/draft.rs index e815210f87..6c9346e686 100644 --- a/crates/agent/src/draft.rs +++ b/crates/agent/src/draft.rs @@ -35,7 +35,7 @@ impl From for Error { fn from(err: LockFailure) -> Self { let detail = format!( "the expectPubId of spec {:?} {:?} did not match that of the live spec {:?}", - err.catalog_name, err.expect_pub_id, err.last_pub_id + err.catalog_name, err.expected, err.actual ); Error { catalog_name: err.catalog_name, diff --git a/crates/agent/src/integration_tests/dependencies_and_activations.rs b/crates/agent/src/integration_tests/dependencies_and_activations.rs index 6c718ab37d..01653d962d 100644 --- a/crates/agent/src/integration_tests/dependencies_and_activations.rs +++ b/crates/agent/src/integration_tests/dependencies_and_activations.rs @@ -138,6 +138,14 @@ async fn test_dependencies_and_controllers() { .collect::>(); // Controller runs should have been immediately enqueued for all published specs. + let due_controllers = harness + .get_enqueued_controllers(chrono::Duration::seconds(1)) + .await; + assert_eq!( + all_names.iter().cloned().collect::>(), + due_controllers + ); + let runs = harness.run_pending_controllers(None).await; let run_names = runs .into_iter() @@ -157,11 +165,21 @@ async fn test_dependencies_and_controllers() { // Fetch and re-publish just the hoots collection. This should trigger controller updates of // all the other specs. - let live = harness + let live_hoots = harness .control_plane() - .get_live_specs(std::iter::once("owls/hoots".to_string()).collect()) + .get_collection(models::Collection::new("owls/hoots")) .await - .expect("failed to fetch hoots"); + .unwrap() + .expect("hoots spec must be Some"); + let hoots_last_activated = live_hoots.last_build_id; + let mut draft = tables::DraftCatalog::default(); + draft.collections.insert(tables::DraftCollection { + collection: models::Collection::new("owls/hoots"), + scope: tables::synthetic_scope(models::CatalogType::Collection, "owls/hoots"), + expect_pub_id: None, + model: Some(live_hoots.model.clone()), + is_touch: false, + }); let next_pub = harness.control_plane().next_pub_id(); let result = harness @@ -170,12 +188,13 @@ async fn test_dependencies_and_controllers() { next_pub, Some("test publication of owls/hoots".to_string()), Uuid::new_v4(), - super::live_to_draft(live), + draft, ) .await .expect("publication failed"); assert_eq!(1, result.draft.spec_count()); assert!(result.status.is_success()); + assert_eq!(next_pub, result.pub_id); // Simulate a failed call to activate the collection in the data plane harness.control_plane().fail_next_activation("owls/hoots"); @@ -193,12 +212,13 @@ async fn test_dependencies_and_controllers() { .unwrap() .contains("data_plane_delete simulated failure")); assert_eq!( - first_pub_id, + hoots_last_activated, hoots_state .current_status .unwrap_collection() .activation - .last_activated + .last_activated, + "expect hoots last_activated to be unchanged" ); harness.control_plane().reset_activations(); @@ -213,15 +233,28 @@ async fn test_dependencies_and_controllers() { assert_eq!(0, hoots_state.failures); assert!(hoots_state.error.is_none()); assert_eq!( - next_pub, + hoots_state.last_build_id, hoots_state .current_status .unwrap_collection() .activation .last_activated ); + assert!( + hoots_state.last_build_id > hoots_last_activated, + "sanity check that the last_build_id increased" + ); // Other controllers should run now that the activation of hoots was successful. + // Before running, fetch the current state of their live specs, so we can assert + // that they only get touched instead of being fully published. + let mut all_except_hoots = all_names.clone(); + all_except_hoots.remove("owls/hoots"); + let starting_specs = harness + .control_plane() + .get_live_specs(all_except_hoots.clone()) + .await + .unwrap(); let runs = harness.run_pending_controllers(None).await; assert_controllers_ran( &[ @@ -233,6 +266,7 @@ async fn test_dependencies_and_controllers() { ], runs, ); + harness.assert_specs_touched_since(&starting_specs).await; harness.control_plane().assert_activations( "subsequent activations", vec![ @@ -242,6 +276,71 @@ async fn test_dependencies_and_controllers() { // tests do not get activated ], ); + + // Publish hoots again and expect dependents to be touched again in response. + // This time, we'll assert that the dependent's publication histories have collapsed both + // touch publications into one history entry. + let starting_specs = harness + .control_plane() + .get_live_specs(all_except_hoots) + .await + .unwrap(); + + let mut draft = tables::DraftCatalog::default(); + draft.collections.insert(tables::DraftCollection { + collection: models::Collection::new("owls/hoots"), + scope: tables::synthetic_scope(models::CatalogType::Collection, "owls/hoots"), + expect_pub_id: None, + model: Some(live_hoots.model.clone()), + is_touch: false, + }); + let pub_id = harness.control_plane().next_pub_id(); + harness + .control_plane() + .publish( + pub_id, + Some("3rd pub of hoots".to_string()), + Uuid::new_v4(), + draft, + ) + .await + .expect("publication must succeed"); + let runs = harness.run_pending_controllers(None).await; + assert_controllers_ran( + &[ + "owls/capture", + "owls/materialize", + "owls/hoots", + "owls/nests", + "owls/test-test", + ], + runs, + ); + harness.assert_specs_touched_since(&starting_specs).await; + harness.control_plane().assert_activations( + "3rd activations", + vec![ + ("owls/capture", Some(CatalogType::Capture)), + ("owls/materialize", Some(CatalogType::Materialization)), + ("owls/nests", Some(CatalogType::Collection)), + ("owls/hoots", Some(CatalogType::Collection)), + // tests do not get activated + ], + ); + + let mat_state = harness.get_controller_state("owls/materialize").await; + let mat_history = &mat_state + .current_status + .unwrap_materialization() + .publications + .history; + assert_eq!( + 1, + mat_history.len(), + "unexpected entry count in materialize pub history: {:?}", + mat_history + ); + // Insert an inferred schema so that we can assert it gets deleted along with the collection harness .upsert_inferred_schema(mock_inferred_schema("owls/hoots", 3)) @@ -500,6 +599,6 @@ fn assert_controllers_ran(expected: &[&str], actual: Vec) { let expected_names = expected.into_iter().map(|n| *n).collect::>(); assert_eq!( expected_names, actual_names, - "mismatched controller runs, expected:\n{expected_names:?}\nactual:\n{actual:?}" + "mismatched controller runs, expected" ); } diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index e90423581f..7f4899a2e4 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -19,6 +19,7 @@ use tempfile::tempdir; const FIXED_DATABASE_URL: &str = "postgresql://postgres:postgres@localhost:5432/postgres"; #[derive(Debug, Deserialize)] +#[allow(dead_code)] // Suppress lints for fields used only in test snapshots pub struct LiveSpec { pub catalog_name: String, pub connector_image_name: Option, @@ -47,6 +48,7 @@ pub struct TestHarness { pub test_name: String, pub pool: sqlx::PgPool, pub publisher: Publisher, + #[allow(dead_code)] // only here so we don't drop it until the harness is dropped pub builds_root: tempfile::TempDir, pub controllers: ControllerHandler, } @@ -339,6 +341,116 @@ impl TestHarness { txn.commit().await.unwrap(); } + pub async fn assert_specs_touched_since(&mut self, prev_specs: &tables::LiveCatalog) { + let user_id = self.control_plane().inner.system_user_id; + let owned_names: Vec = prev_specs + .all_spec_names() + .map(|n| (*n).to_owned()) + .collect(); + let specs = agent_sql::live_specs::fetch_live_specs(user_id, &owned_names, &self.pool) + .await + .expect("failed to query live specs"); + assert_eq!( + prev_specs.spec_count(), + specs.len(), + "expected to fetch {} specs, but got {}", + prev_specs.spec_count(), + specs.len() + ); + + for spec in specs { + let (expect_last_pub, prev_last_build) = match spec.spec_type.map(Into::into) { + None => panic!( + "expected spec {} to have been touched, but spec_type is null", + spec.catalog_name + ), + Some(CatalogType::Capture) => { + let row = prev_specs + .captures + .get_by_key(&models::Capture::new(&spec.catalog_name)) + .unwrap(); + (row.last_pub_id, row.last_build_id) + } + Some(CatalogType::Collection) => { + let row = prev_specs + .collections + .get_by_key(&models::Collection::new(&spec.catalog_name)) + .unwrap(); + (row.last_pub_id, row.last_build_id) + } + Some(CatalogType::Materialization) => { + let row = prev_specs + .materializations + .get_by_key(&models::Materialization::new(&spec.catalog_name)) + .unwrap(); + (row.last_pub_id, row.last_build_id) + } + Some(CatalogType::Test) => { + let row = prev_specs + .tests + .get_by_key(&models::Test::new(&spec.catalog_name)) + .unwrap(); + (row.last_pub_id, row.last_build_id) + } + }; + + assert_eq!( + expect_last_pub, + spec.last_pub_id.into(), + "expected touched spec '{}' to have last_pub_id: {}, but was: {}", + spec.catalog_name, + expect_last_pub, + spec.last_pub_id + ); + assert!( + spec.last_build_id > prev_last_build, + "expected touched spec '{}' to have last_build_id ({}) > the previous last_build_id ({})", + spec.catalog_name, + spec.last_build_id, + prev_last_build, + ); + + assert!( + spec.last_build_id > spec.last_pub_id, + "sanity check to ensure that last_build_id > last_pub_id" + ); + // sanity check that we haven't created publication specs + let rows = sqlx::query!( + r#"select + ls.catalog_name, + ps.pub_id as "pub_id: agent_sql::Id" + from live_specs ls + join publication_specs ps on ls.id = ps.live_spec_id + where ls.catalog_name = $1 + and ps.pub_id > $2 + order by ls.catalog_name;"#, + spec.catalog_name.as_str(), + agent_sql::Id::from(expect_last_pub) as agent_sql::Id, + ) + .fetch_all(&self.pool) + .await + .expect("failed to query publication_specs"); + assert!( + rows.is_empty(), + "expected no publication specs to exist for touched specs, got {rows:?}" + ); + } + } + + pub async fn get_enqueued_controllers(&mut self, within: chrono::Duration) -> Vec { + let threshold = chrono::Utc::now() + within; + sqlx::query_scalar!( + r#"select catalog_name + from live_specs + where controller_next_run is not null and controller_next_run <= $1 + order by catalog_name"#, + threshold, + ) + .fetch_all(&self.pool) + .await + .expect("failed to query enqueued controllers") + } + pub async fn assert_live_spec_hard_deleted(&mut self, name: &str) { let rows = sqlx::query!( r#"select @@ -422,9 +534,11 @@ impl TestHarness { ls.catalog_name as "catalog_name!: String", ls.controller_next_run, ls.last_pub_id as "last_pub_id: agent_sql::Id", + ls.last_build_id as "last_build_id: agent_sql::Id", ls.spec as "live_spec: TextJson>", ls.built_spec as "built_spec: TextJson>", ls.spec_type as "spec_type: agent_sql::CatalogType", + ls.dependency_hash as "live_dependency_hash", cj.controller_version as "controller_version: i32", cj.updated_at, cj.logs_token, diff --git a/crates/agent/src/integration_tests/locking_retries.rs b/crates/agent/src/integration_tests/locking_retries.rs index 9bfb46de45..3e561628b6 100644 --- a/crates/agent/src/integration_tests/locking_retries.rs +++ b/crates/agent/src/integration_tests/locking_retries.rs @@ -65,12 +65,12 @@ async fn test_publication_optimistic_locking_failures() { } }); - let pub_a = harness.control_plane().next_pub_id(); - let build_a = harness + let will_fail_pub = harness.control_plane().next_pub_id(); + let will_fail_build = harness .publisher .build( user_id, - pub_a, + will_fail_pub, Some("pub a".to_string()), draft_catalog(initial_catalog.clone()), Uuid::new_v4(), @@ -79,12 +79,12 @@ async fn test_publication_optimistic_locking_failures() { .await .expect("build a failed"); - let pub_b = harness.control_plane().next_pub_id(); - let build_b = harness + let will_commit_pub = harness.control_plane().next_pub_id(); + let will_commit_build = harness .publisher .build( user_id, - pub_b, + will_commit_pub, Some("pub b".to_string()), draft_catalog(initial_catalog.clone()), Uuid::new_v4(), @@ -92,31 +92,33 @@ async fn test_publication_optimistic_locking_failures() { ) .await .expect("build b failed"); + let will_commit_build_id = will_commit_build.build_id; - let result_b = harness + let expect_success_result = harness .publisher - .commit(build_b) + .commit(will_commit_build) .await .expect("commit b failed"); - assert!(result_b.status.is_success()); + assert!(expect_success_result.status.is_success()); - let result_a = harness + let expect_fail_result = harness .publisher - .commit(build_a) + .commit(will_fail_build) .await .expect("commit a failed"); assert_lock_failures( &[ - ("mice/cheese", Id::zero(), Some(pub_b)), - ("mice/seeds", Id::zero(), Some(pub_b)), - ("mice/capture", Id::zero(), Some(pub_b)), + ("mice/cheese", Id::zero(), Some(will_commit_build_id)), + ("mice/seeds", Id::zero(), Some(will_commit_build_id)), + ("mice/capture", Id::zero(), Some(will_commit_build_id)), ], - &result_a.status, + &expect_fail_result.status, ); // Now simulate raced publications of cheese and seeds, wich each publication having "expanded" // to include the capture. - let cheese_pub = harness.control_plane().next_pub_id(); + let expect_current_build_id = will_commit_build_id; + let will_fail_pub_id = harness.control_plane().next_pub_id(); let cheese_draft = draft_catalog(serde_json::json!({ "collections": { "mice/cheese": minimal_collection(None), @@ -125,11 +127,11 @@ async fn test_publication_optimistic_locking_failures() { "mice/capture": minimal_capture(None, &["mice/cheese", "mice/seeds"]), } })); - let cheese_build = harness + let will_fail_build = harness .publisher .build( user_id, - cheese_pub, + will_fail_pub_id, Some("cheese pub".to_string()), cheese_draft, Uuid::new_v4(), @@ -137,10 +139,10 @@ async fn test_publication_optimistic_locking_failures() { ) .await .expect("cheese build failed"); - assert!(!cheese_build.has_errors()); + assert!(!will_fail_build.has_errors()); - let seeds_pub = harness.control_plane().next_pub_id(); - let seeds_draft = draft_catalog(serde_json::json!({ + let will_commit_pub = harness.control_plane().next_pub_id(); + let will_commit_draft = draft_catalog(serde_json::json!({ "collections": { "mice/seeds": minimal_collection(None), }, @@ -148,39 +150,95 @@ async fn test_publication_optimistic_locking_failures() { "mice/capture": minimal_capture(None, &["mice/cheese", "mice/seeds"]), } })); - let seeds_build = harness + let will_commit_build = harness .publisher .build( user_id, - seeds_pub, + will_commit_pub, Some("seeds pub".to_string()), - seeds_draft, + will_commit_draft, Uuid::new_v4(), "ops/dp/public/test", ) .await .expect("seeds build failed"); - assert!(!seeds_build.has_errors()); - let seeds_result = harness + assert!(!will_commit_build.has_errors()); + let will_commit_build_id = will_commit_build.build_id; + let expect_success_result = harness .publisher - .commit(seeds_build) + .commit(will_commit_build) .await .expect("failed to commit seeds"); - assert!(seeds_result.status.is_success()); + assert!(expect_success_result.status.is_success()); + assert_last_pub_build( + &mut harness, + "mice/seeds", + will_commit_pub, + will_commit_build_id, + ) + .await; + assert_last_pub_build( + &mut harness, + "mice/capture", + will_commit_pub, + will_commit_build_id, + ) + .await; - let cheese_result = harness + let expect_fail_result = harness .publisher - .commit(cheese_build) + .commit(will_fail_build) .await .expect("failed to commit cheese"); // lol assert_lock_failures( - &[("mice/capture", pub_b, Some(seeds_pub))], - &cheese_result.status, + &[( + "mice/capture", + expect_current_build_id, + Some(will_commit_build_id), + )], + &expect_fail_result.status, + ); +} + +async fn assert_last_pub_build( + harness: &mut TestHarness, + catalog_name: &str, + expect_last_pub: Id, + expect_last_build: Id, +) { + let mut names = std::collections::BTreeSet::new(); + names.insert(catalog_name.to_string()); + let live = harness + .control_plane() + .get_live_specs(names) + .await + .expect("failed to fetch live specs"); + let (last_pub, last_build) = live + .captures + .get(0) + .map(|r| (r.last_pub_id, r.last_build_id)) + .or(live + .collections + .get(0) + .map(|r| (r.last_pub_id, r.last_build_id))) + .or(live + .materializations + .get(0) + .map(|r| (r.last_pub_id, r.last_build_id))) + .or(live.tests.get(0).map(|r| (r.last_pub_id, r.last_build_id))) + .expect("no live spec found"); + assert_eq!( + expect_last_pub, last_pub, + "mismatched last_pub_id for {catalog_name}" + ); + assert_eq!( + expect_last_build, last_build, + "mismatched last_build_id for {catalog_name}" ); } fn assert_lock_failures(expected: &[(&'static str, Id, Option)], actual: &JobStatus) { - let JobStatus::ExpectPubIdMismatch { failures } = actual else { + let JobStatus::BuildIdLockFailure { failures } = actual else { panic!("unexpected publication status: {:?}", actual); }; let mut act: Vec = failures.iter().cloned().collect(); @@ -190,8 +248,8 @@ fn assert_lock_failures(expected: &[(&'static str, Id, Option)], actual: &Jo .iter() .map(|(name, expect, last)| LockFailure { catalog_name: name.to_string(), - expect_pub_id: *expect, - last_pub_id: *last, + expected: *expect, + actual: *last, }) .collect(); exp.sort_by(|l, r| l.catalog_name.cmp(&r.catalog_name)); diff --git a/crates/agent/src/integration_tests/mod.rs b/crates/agent/src/integration_tests/mod.rs index e65cbec409..cf84ecd88f 100644 --- a/crates/agent/src/integration_tests/mod.rs +++ b/crates/agent/src/integration_tests/mod.rs @@ -11,81 +11,3 @@ mod schema_evolution; mod source_captures; mod unknown_connectors; mod user_publications; - -// TODO(johnny): Do we need this? It's used in only one integration test. -fn live_to_draft(live: tables::LiveCatalog) -> tables::DraftCatalog { - let tables::LiveCatalog { - captures, - collections, - materializations, - tests, - .. - } = live; - - let captures = captures.into_iter().map( - |tables::LiveCapture { - capture, - last_pub_id, - model, - .. - }| tables::DraftCapture { - scope: tables::synthetic_scope(models::CatalogType::Capture.to_string(), &capture), - capture, - expect_pub_id: Some(last_pub_id), - model: Some(model), - }, - ); - let collections = collections.into_iter().map( - |tables::LiveCollection { - collection, - last_pub_id, - model, - .. - }| tables::DraftCollection { - scope: tables::synthetic_scope( - models::CatalogType::Collection.to_string(), - &collection, - ), - collection, - expect_pub_id: Some(last_pub_id), - model: Some(model), - }, - ); - let materializations = materializations.into_iter().map( - |tables::LiveMaterialization { - materialization, - last_pub_id, - model, - .. - }| tables::DraftMaterialization { - scope: tables::synthetic_scope( - models::CatalogType::Materialization.to_string(), - &materialization, - ), - materialization, - expect_pub_id: Some(last_pub_id), - model: Some(model), - }, - ); - let tests = tests.into_iter().map( - |tables::LiveTest { - test, - last_pub_id, - model, - .. - }| tables::DraftTest { - scope: tables::synthetic_scope(models::CatalogType::Test.to_string(), &test), - test, - expect_pub_id: Some(last_pub_id), - model: Some(model), - }, - ); - - tables::DraftCatalog { - captures: captures.collect(), - collections: collections.collect(), - materializations: materializations.collect(), - tests: tests.collect(), - ..Default::default() - } -} diff --git a/crates/agent/src/integration_tests/schema_evolution.rs b/crates/agent/src/integration_tests/schema_evolution.rs index 5957ea8397..008e19a7f5 100644 --- a/crates/agent/src/integration_tests/schema_evolution.rs +++ b/crates/agent/src/integration_tests/schema_evolution.rs @@ -528,7 +528,7 @@ impl FailBuild for UnsatisfiableConstraints { .built .built_materializations .iter_mut() - .filter(|m| !m.is_unchanged()) + .filter(|m| !m.is_passthrough()) .next() else { panic!("no materialization in build"); diff --git a/crates/agent/src/integration_tests/user_publications.rs b/crates/agent/src/integration_tests/user_publications.rs index 9e09af90ae..96824b7c47 100644 --- a/crates/agent/src/integration_tests/user_publications.rs +++ b/crates/agent/src/integration_tests/user_publications.rs @@ -212,6 +212,7 @@ async fn test_user_publications() { collection: noms_collection, expect_pub_id: Some(noms_last_pub_id), model: Some(noms_model), + is_touch: false, }); let result = harness diff --git a/crates/agent/src/publications.rs b/crates/agent/src/publications.rs index 68368da11d..06fdb059c4 100644 --- a/crates/agent/src/publications.rs +++ b/crates/agent/src/publications.rs @@ -124,6 +124,7 @@ impl Publisher { pub struct UncommittedBuild { pub(crate) publication_id: models::Id, + pub(crate) build_id: models::Id, pub(crate) user_id: Uuid, pub(crate) detail: Option, pub(crate) started_at: DateTime, @@ -167,6 +168,7 @@ impl UncommittedBuild { output, test_errors, incompatible_collections, + build_id: _, } = self; debug_assert!( incompatible_collections.is_empty(), @@ -227,6 +229,7 @@ impl Publisher { }; return Ok(UncommittedBuild { publication_id, + build_id, user_id, detail, started_at: start_time, @@ -241,6 +244,7 @@ impl Publisher { if !live_catalog.errors.is_empty() { return Ok(UncommittedBuild { publication_id, + build_id, user_id, detail, started_at: start_time, @@ -270,6 +274,7 @@ impl Publisher { }; return Ok(UncommittedBuild { publication_id, + build_id, user_id, detail, started_at: start_time, @@ -347,6 +352,7 @@ impl Publisher { Ok(UncommittedBuild { publication_id, + build_id, user_id, detail, started_at: start_time, @@ -358,6 +364,7 @@ impl Publisher { #[tracing::instrument(err, skip_all, fields( publication_id = %uncommitted.publication_id, + build_id = %uncommitted.publication_id, user_id = %uncommitted.user_id, detail = ?uncommitted.detail ))] @@ -398,7 +405,7 @@ impl Publisher { let failures = specs::persist_updates(&mut uncommitted, &mut txn).await?; if !failures.is_empty() { return Ok( - uncommitted.into_result(completed_at, JobStatus::ExpectPubIdMismatch { failures }) + uncommitted.into_result(completed_at, JobStatus::BuildIdLockFailure { failures }) ); } @@ -414,13 +421,13 @@ fn is_empty_draft(build: &UncommittedBuild) -> bool { use tables::BuiltRow; let built = &build.output.built; - built.built_captures.iter().all(BuiltRow::is_unchanged) - && built.built_collections.iter().all(BuiltRow::is_unchanged) + built.built_captures.iter().all(BuiltRow::is_passthrough) + && built.built_collections.iter().all(BuiltRow::is_passthrough) && built .built_materializations .iter() - .all(BuiltRow::is_unchanged) - && built.built_tests.iter().all(BuiltRow::is_unchanged) + .all(BuiltRow::is_passthrough) + && built.built_tests.iter().all(BuiltRow::is_passthrough) } pub fn partitions(projections: &BTreeMap) -> Vec { @@ -492,6 +499,7 @@ mod test { fn test_errors_result_in_test_failed_status() { let build = UncommittedBuild { publication_id: models::Id::zero(), + build_id: models::Id::zero(), user_id: Uuid::new_v4(), detail: None, started_at: Utc::now(), diff --git a/crates/agent/src/publications/handler.rs b/crates/agent/src/publications/handler.rs index 109e29e6f8..297fe5573c 100644 --- a/crates/agent/src/publications/handler.rs +++ b/crates/agent/src/publications/handler.rs @@ -79,6 +79,7 @@ impl Handler for Publisher { impl Publisher { pub async fn process(&mut self, row: Row) -> anyhow::Result { info!( + %row.pub_id, %row.created_at, %row.draft_id, %row.dry_run, @@ -94,10 +95,6 @@ impl Publisher { loop { attempt += 1; let draft = specs::load_draft(row.draft_id.into(), &self.db).await?; - // let all_drafted_specs = draft - // .all_spec_names() - // .map(|n| n.to_string()) - // .collect::>(); tracing::debug!( %attempt, n_drafted = draft.all_spec_names().count(), @@ -150,8 +147,9 @@ impl Publisher { } } - let JobStatus::ExpectPubIdMismatch { failures } = &result.status else { - return Ok(result); + // Has there been an optimistic locking failure? + let JobStatus::BuildIdLockFailure { failures } = &result.status else { + return Ok(result); // All other statuses are terminal. }; if attempt == Publisher::MAX_OPTIMISTIC_LOCKING_RETRIES { tracing::error!(%attempt, ?failures, "giving up after maximum number of optimistic locking retries"); @@ -203,12 +201,16 @@ impl Publisher { anyhow::bail!("missing spec for expanded row: {:?}", exp.catalog_name); }; let scope = tables::synthetic_scope(spec_type, &exp.catalog_name); + // TODO(phil): currently we set `is_touch: false`, which is consistent with the prior + // behavior when publishing. It seems to make more sense to only "touch" expanded + // specs, but I'm holding off until a subsequent commit. if let Err(e) = draft.add_spec( spec_type, &exp.catalog_name, scope, Some(exp.last_pub_id.into()), Some(&model_json), + false, // !is_touch ) { draft.errors.push(e); } diff --git a/crates/agent/src/publications/specs.rs b/crates/agent/src/publications/specs.rs index 0a8b2c31fd..85545018f0 100644 --- a/crates/agent/src/publications/specs.rs +++ b/crates/agent/src/publications/specs.rs @@ -14,13 +14,14 @@ pub async fn persist_updates( ) -> anyhow::Result> { let UncommittedBuild { ref publication_id, + ref build_id, output, ref user_id, ref detail, .. } = uncommitted; - let lock_failures = update_live_specs(*publication_id, output, txn).await?; + let lock_failures = update_live_specs(*publication_id, *build_id, output, txn).await?; if !lock_failures.is_empty() { return Ok(lock_failures); } @@ -84,7 +85,7 @@ async fn update_drafted_live_spec_flows( .built .built_captures .iter() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { update_live_spec_flows(&r.catalog_name(), agent_sql::CatalogType::Capture, r, txn) .await @@ -94,7 +95,7 @@ async fn update_drafted_live_spec_flows( .built .built_collections .iter() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { update_live_spec_flows( &r.catalog_name(), @@ -109,7 +110,7 @@ async fn update_drafted_live_spec_flows( .built .built_materializations .iter() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { update_live_spec_flows( &r.catalog_name(), @@ -120,7 +121,12 @@ async fn update_drafted_live_spec_flows( .await .with_context(|| format!("updating live_spec_flows for '{}'", r.catalog_name()))?; } - for r in build.built.built_tests.iter().filter(|r| !r.is_unchanged()) { + for r in build + .built + .built_tests + .iter() + .filter(|r| !r.is_passthrough()) + { update_live_spec_flows(&r.catalog_name(), agent_sql::CatalogType::Test, r, txn) .await .with_context(|| format!("updating live_spec_flows for '{}'", r.catalog_name()))?; @@ -130,6 +136,7 @@ async fn update_drafted_live_spec_flows( async fn update_live_specs( pub_id: Id, + build_id: Id, output: &mut build::Output, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> anyhow::Result> { @@ -139,44 +146,47 @@ async fn update_live_specs( let mut spec_types: Vec = Vec::with_capacity(n_specs); let mut models = Vec::with_capacity(n_specs); let mut built_specs = Vec::with_capacity(n_specs); - let mut expect_pub_ids: Vec = Vec::with_capacity(n_specs); + let mut expect_build_ids: Vec = Vec::with_capacity(n_specs); let mut reads_froms = Vec::with_capacity(n_specs); let mut writes_tos = Vec::with_capacity(n_specs); let mut images = Vec::with_capacity(n_specs); let mut image_tags = Vec::with_capacity(n_specs); let mut data_plane_ids = Vec::with_capacity(n_specs); + let mut is_touches = Vec::with_capacity(n_specs); + let mut dependency_hashes = Vec::with_capacity(n_specs); for r in output .built .built_captures .iter_mut() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { catalog_names.push(r.catalog_name().to_string()); spec_types.push(agent_sql::CatalogType::Capture); models.push(to_raw_value(r.model(), agent_sql::TextJson)?); built_specs.push(to_raw_value(r.spec(), agent_sql::TextJson)?); - expect_pub_ids.push(r.expect_pub_id().into()); + expect_build_ids.push(r.expect_build_id().into()); reads_froms.push(None); writes_tos.push(get_dependencies(r.model(), ModelDef::writes_to)); let (image_name, image_tag) = image_and_tag(r.model()); images.push(image_name); image_tags.push(image_tag); data_plane_ids.push(r.data_plane_id.into()); - + is_touches.push(r.is_touch()); control_ids.insert(&r.capture, &mut r.control_id); + dependency_hashes.push(r.dependency_hash.as_deref()); } for r in output .built .built_collections .iter_mut() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { catalog_names.push(r.catalog_name().to_string()); spec_types.push(agent_sql::CatalogType::Collection); models.push(to_raw_value(r.model(), agent_sql::TextJson)?); built_specs.push(to_raw_value(r.spec(), agent_sql::TextJson)?); - expect_pub_ids.push(r.expect_pub_id().into()); + expect_build_ids.push(r.expect_build_id().into()); reads_froms.push(get_dependencies( // reads_from should be null for regular collections r.model().filter(|m| m.derive.is_some()), @@ -187,62 +197,68 @@ async fn update_live_specs( images.push(image_name); image_tags.push(image_tag); data_plane_ids.push(r.data_plane_id.into()); - + is_touches.push(r.is_touch()); control_ids.insert(&r.collection, &mut r.control_id); + dependency_hashes.push(r.dependency_hash.as_deref()); } for r in output .built .built_materializations .iter_mut() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { catalog_names.push(r.catalog_name().to_string()); spec_types.push(agent_sql::CatalogType::Materialization); models.push(to_raw_value(r.model(), agent_sql::TextJson)?); built_specs.push(to_raw_value(r.spec(), agent_sql::TextJson)?); - expect_pub_ids.push(r.expect_pub_id().into()); + expect_build_ids.push(r.expect_build_id().into()); reads_froms.push(get_dependencies(r.model(), ModelDef::reads_from)); writes_tos.push(None); let (image_name, image_tag) = image_and_tag(r.model()); images.push(image_name); image_tags.push(image_tag); data_plane_ids.push(r.data_plane_id.into()); - + is_touches.push(r.is_touch()); control_ids.insert(&r.materialization, &mut r.control_id); + dependency_hashes.push(r.dependency_hash.as_deref()); } for r in output .built .built_tests .iter_mut() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough()) { catalog_names.push(r.catalog_name().to_string()); spec_types.push(agent_sql::CatalogType::Test); models.push(to_raw_value(r.model(), agent_sql::TextJson)?); built_specs.push(to_raw_value(r.spec(), agent_sql::TextJson)?); - expect_pub_ids.push(r.expect_pub_id().into()); + expect_build_ids.push(r.expect_build_id().into()); reads_froms.push(get_dependencies(r.model(), ModelDef::reads_from)); writes_tos.push(get_dependencies(r.model(), ModelDef::writes_to)); let (image_name, image_tag) = image_and_tag(r.model()); images.push(image_name); image_tags.push(image_tag); data_plane_ids.push(models::Id::zero().into()); - + is_touches.push(r.is_touch()); control_ids.insert(&r.test, &mut r.control_id); + dependency_hashes.push(r.dependency_hash.as_deref()); } let updates = agent_sql::publications::update_live_specs( pub_id.into(), + build_id.into(), &catalog_names, &spec_types, &models, &built_specs, - &expect_pub_ids, + &expect_build_ids, &reads_froms, &writes_tos, &images, &image_tags, &data_plane_ids, + &is_touches, + &dependency_hashes, txn, ) .await?; @@ -252,8 +268,8 @@ async fn update_live_specs( for update in updates { let LiveSpecUpdate { catalog_name, - expect_pub_id, - last_pub_id, + expect_build_id, + last_build_id, live_spec_id, } = update; @@ -272,11 +288,11 @@ async fn update_live_specs( ); } - if last_pub_id != expect_pub_id { + if last_build_id != expect_build_id { lock_failures.push(LockFailure { - catalog_name: catalog_name, - last_pub_id: Some(last_pub_id.into()).filter(|id: &models::Id| !id.is_zero()), - expect_pub_id: expect_pub_id.into(), + catalog_name, + actual: Some(last_build_id.into()).filter(|id: &models::Id| !id.is_zero()), + expected: expect_build_id.into(), }) } } @@ -376,7 +392,11 @@ async fn insert_publication_specs( built: &tables::Validations, txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> anyhow::Result<()> { - for r in built.built_captures.iter().filter(|r| !r.is_unchanged()) { + for r in built + .built_captures + .iter() + .filter(|r| !r.is_passthrough() && !r.is_touch()) + { let spec = to_raw_value(r.model(), agent_sql::TextJson)?; agent_sql::publications::insert_publication_spec( r.control_id().into(), @@ -390,7 +410,11 @@ async fn insert_publication_specs( .await .with_context(|| format!("inserting spec for '{}'", r.catalog_name()))?; } - for r in built.built_collections.iter().filter(|r| !r.is_unchanged()) { + for r in built + .built_collections + .iter() + .filter(|r| !r.is_passthrough() && !r.is_touch()) + { let spec = to_raw_value(r.model(), agent_sql::TextJson)?; agent_sql::publications::insert_publication_spec( r.control_id().into(), @@ -407,7 +431,7 @@ async fn insert_publication_specs( for r in built .built_materializations .iter() - .filter(|r| !r.is_unchanged()) + .filter(|r| !r.is_passthrough() && !r.is_touch()) { let spec = to_raw_value(r.model(), agent_sql::TextJson)?; agent_sql::publications::insert_publication_spec( @@ -422,7 +446,11 @@ async fn insert_publication_specs( .await .with_context(|| format!("inserting spec for '{}'", r.catalog_name()))?; } - for r in built.built_tests.iter().filter(|r| !r.is_unchanged()) { + for r in built + .built_tests + .iter() + .filter(|r| !r.is_passthrough() && !r.is_touch()) + { let spec = to_raw_value(r.model(), agent_sql::TextJson)?; agent_sql::publications::insert_publication_spec( r.control_id().into(), @@ -455,14 +483,14 @@ async fn verify_unchanged_revisions( .built .built_captures .iter() - .filter(|r| r.is_unchanged()) + .filter(|r| r.is_passthrough()) .map(|r| (r.catalog_name().as_str(), r.expect_pub_id())) .chain( output .built .built_collections .iter() - .filter(|r| r.is_unchanged()) + .filter(|r| r.is_passthrough()) .map(|r| (r.catalog_name().as_str(), r.expect_pub_id())), ) .chain( @@ -470,7 +498,7 @@ async fn verify_unchanged_revisions( .built .built_materializations .iter() - .filter(|r| r.is_unchanged()) + .filter(|r| r.is_passthrough()) .map(|r| (r.catalog_name().as_str(), r.expect_pub_id())), ) .chain( @@ -478,7 +506,7 @@ async fn verify_unchanged_revisions( .built .built_tests .iter() - .filter(|r| r.is_unchanged()) + .filter(|r| r.is_passthrough()) .map(|r| (r.catalog_name().as_str(), r.expect_pub_id())), ) .collect(); @@ -495,8 +523,8 @@ async fn verify_unchanged_revisions( if expect_pub_id != last_pub_id.into() { errors.push(LockFailure { catalog_name, - last_pub_id: Some(last_pub_id.into()), - expect_pub_id, + actual: Some(last_pub_id.into()), + expected: expect_pub_id, }); } } @@ -506,8 +534,8 @@ async fn verify_unchanged_revisions( if !expect_pub_id.is_zero() {} errors.push(LockFailure { catalog_name: catalog_name.to_string(), - last_pub_id: None, - expect_pub_id, + actual: None, + expected: expect_pub_id, }); } Ok(errors) @@ -703,6 +731,7 @@ pub async fn resolve_live_specs( spec_row.id.into(), spec_row.data_plane_id.into(), spec_row.last_pub_id.into(), + spec_row.last_build_id.into(), &model, &spec_row .built_spec @@ -710,6 +739,7 @@ pub async fn resolve_live_specs( .ok_or_else(|| { anyhow::anyhow!("row has non-null spec, but null built_spec: catalog_name: {:?}, live_spec_id: {}", &spec_row.catalog_name, spec_row.id) })?, + spec_row.dependency_hash, ) .with_context(|| format!("adding live spec for {:?}", spec_row.catalog_name))?; } @@ -867,6 +897,7 @@ pub async fn load_draft( scope, row.expect_pub_id.map(Into::into), row.spec.as_deref().map(|j| &**j), + false, // !is_touch ) { draft.errors.push(err); } diff --git a/crates/agent/src/publications/status.rs b/crates/agent/src/publications/status.rs index e2f9e71561..60d717d55b 100644 --- a/crates/agent/src/publications/status.rs +++ b/crates/agent/src/publications/status.rs @@ -34,6 +34,12 @@ pub enum JobStatus { #[serde(default, skip_serializing_if = "Vec::is_empty")] failures: Vec, }, + /// Optimistic locking failure for one or more specs in the publication. This case should + /// typically be retried by the publisher. + BuildIdLockFailure { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + failures: Vec, + }, } impl JobStatus { @@ -67,8 +73,8 @@ impl JobStatus { #[derive(Debug, Serialize, Deserialize, PartialEq, Clone, JsonSchema)] pub struct LockFailure { pub catalog_name: String, - pub expect_pub_id: models::Id, - pub last_pub_id: Option, + pub expected: models::Id, + pub actual: Option, } /// Reasons why a draft collection spec would need to be published under a new name. diff --git a/crates/flowctl/src/local_specs.rs b/crates/flowctl/src/local_specs.rs index 4d1ddc4b93..df0e4cadc8 100644 --- a/crates/flowctl/src/local_specs.rs +++ b/crates/flowctl/src/local_specs.rs @@ -245,6 +245,8 @@ impl Resolver { model: models::RawValue, built_spec: models::RawValue, last_pub_id: models::Id, + last_build_id: models::Id, + dependency_hash: Option, } let rows = catalog_names @@ -255,7 +257,7 @@ impl Resolver { let builder = self .client .from("live_specs_ext") - .select("id,catalog_name,spec_type,spec,built_spec,last_pub_id") + .select("id,catalog_name,spec_type,spec,built_spec,last_pub_id,last_build_id") .not("is", "spec_type", "null") .in_("catalog_name", names); @@ -272,6 +274,8 @@ impl Resolver { model, built_spec, last_pub_id, + last_build_id, + dependency_hash, } in rows.into_iter().flat_map(|i| i.into_iter()) { match spec_type { @@ -280,31 +284,39 @@ impl Resolver { id, models::Id::zero(), last_pub_id, + last_build_id, serde_json::from_str::(model.get())?, serde_json::from_str::(built_spec.get())?, + dependency_hash, ), CatalogType::Collection => live.collections.insert_row( models::Collection::new(catalog_name), id, models::Id::zero(), last_pub_id, + last_build_id, serde_json::from_str::(model.get())?, serde_json::from_str::(built_spec.get())?, + dependency_hash, ), CatalogType::Materialization => live.materializations.insert_row( models::Materialization::new(catalog_name), id, models::Id::zero(), last_pub_id, + last_build_id, serde_json::from_str::(model.get())?, serde_json::from_str::(built_spec.get())?, + dependency_hash, ), CatalogType::Test => live.tests.insert_row( models::Test::new(catalog_name), id, last_pub_id, + last_build_id, serde_json::from_str::(model.get())?, serde_json::from_str::(built_spec.get())?, + dependency_hash, ), } } diff --git a/supabase/migrations/62_live_specs_dependencies.sql b/supabase/migrations/62_live_specs_dependencies.sql new file mode 100644 index 0000000000..649d1a5e0b --- /dev/null +++ b/supabase/migrations/62_live_specs_dependencies.sql @@ -0,0 +1,11 @@ +begin; + +alter table live_specs add column dependency_hash text; + +comment on column live_specs.dependency_hash is +'An hash of all the dependencies which were used to build this spec. +Any change to the _model_ of a dependency will change this hash. +Changes to the built spec of a dependency without an accompanying +model change will not change the hash.'; + +commit;