diff --git a/core/src/worker/workflow/machines/patch_state_machine.rs b/core/src/worker/workflow/machines/patch_state_machine.rs index aa72ec187..ba6650fbd 100644 --- a/core/src/worker/workflow/machines/patch_state_machine.rs +++ b/core/src/worker/workflow/machines/patch_state_machine.rs @@ -270,6 +270,13 @@ impl TryFrom for PatchMachineEvents { } } +impl PatchMachine { + /// Returns true if this patch machine has the same id as the one provided + pub(crate) fn matches_patch(&self, id: &str) -> bool { + self.shared_state.patch_id == id + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index b49f49500..39a6da2c7 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -138,7 +138,7 @@ pub(crate) struct WorkflowMachines { current_wf_task_commands: VecDeque, /// Information about patch markers we have already seen while replaying history - encountered_change_markers: HashMap, + encountered_patch_markers: HashMap, /// Contains extra local-activity related data local_activity_data: LocalActivityData, @@ -255,7 +255,7 @@ impl WorkflowMachines { id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - encountered_change_markers: Default::default(), + encountered_patch_markers: Default::default(), local_activity_data: LocalActivityData::default(), have_seen_terminal_event: false, } @@ -547,7 +547,7 @@ impl WorkflowMachines { .peek_next_wft_sequence(last_handled_wft_started_id) { if let Some((patch_id, _)) = e.get_patch_marker_details() { - self.encountered_change_markers.insert( + self.encountered_patch_markers.insert( patch_id.clone(), ChangeInfo { created_command: false, @@ -718,7 +718,7 @@ impl WorkflowMachines { let consumed_cmd = loop { if let Some(peek_machine) = self.commands.front() { let mach = self.machine(peek_machine.machine); - match change_marker_handling(event, mach, next_event)? { + match patch_marker_handling(event, mach, next_event)? { EventHandlingOutcome::SkipCommand => { self.commands.pop_front(); continue; @@ -1138,7 +1138,7 @@ impl WorkflowMachines { WFCommand::SetPatchMarker(attrs) => { // Do not create commands for change IDs that we have already created commands // for. - let encountered_entry = self.encountered_change_markers.get(&attrs.patch_id); + let encountered_entry = self.encountered_patch_markers.get(&attrs.patch_id); if !matches!(encountered_entry, Some(ChangeInfo {created_command}) if *created_command) { @@ -1147,17 +1147,17 @@ impl WorkflowMachines { self.replaying, attrs.deprecated, encountered_entry.is_some(), - self.encountered_change_markers.keys().map(|s| s.as_str()), + self.encountered_patch_markers.keys().map(|s| s.as_str()), self.observed_internal_flags.clone(), )?; let mkey = self.add_cmd_to_wf_task(patch_machine, CommandIdKind::NeverResolves); self.process_machine_responses(mkey, other_cmds)?; - if let Some(ci) = self.encountered_change_markers.get_mut(&attrs.patch_id) { + if let Some(ci) = self.encountered_patch_markers.get_mut(&attrs.patch_id) { ci.created_command = true; } else { - self.encountered_change_markers.insert( + self.encountered_patch_markers.insert( attrs.patch_id, ChangeInfo { created_command: true, @@ -1360,45 +1360,62 @@ enum EventHandlingOutcome { /// Special handling for patch markers, when handling command events as in /// [WorkflowMachines::handle_command_event] -fn change_marker_handling( +fn patch_marker_handling( event: &HistoryEvent, mach: &Machines, next_event: Option<&HistoryEvent>, ) -> Result { - if !mach.matches_event(event) { - // Version markers can be skipped in the event they are deprecated - if let Some((patch_name, deprecated)) = event.get_patch_marker_details() { + let patch_machine = match mach { + Machines::PatchMachine(pm) => Some(pm), + _ => None, + }; + let patch_details = event.get_patch_marker_details(); + fn skip_one_or_two_events(next_event: Option<&HistoryEvent>) -> Result { + // Also ignore the subsequent upsert event if present + let mut skip_next_event = false; + if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) = + next_event.and_then(|ne| ne.attributes.as_ref()) + { + if let Some(ref sa) = atts.search_attributes { + skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY); + } + } + + Ok(EventHandlingOutcome::SkipEvent { skip_next_event }) + } + + if let Some((patch_name, deprecated)) = patch_details { + if let Some(pm) = patch_machine { + // If the next machine *is* a patch machine, but this marker is deprecated, it may + // either apply to this machine (the `deprecate_patch` call is still in workflow code) - + // or it could be another `patched` or `deprecate_patch` call for a *different* patch, + // which we should also permit. In the latter case, we should skip this event. + if !pm.matches_patch(&patch_name) && deprecated { + skip_one_or_two_events(next_event) + } else { + Ok(EventHandlingOutcome::Normal) + } + } else { + // Version markers can be skipped in the event they are deprecated // Is deprecated. We can simply ignore this event, as deprecated change // markers are allowed without matching changed calls. if deprecated { - debug!("Deprecated patch marker tried against wrong machine, skipping."); - - // Also ignore the subsequent upsert event if present - let mut skip_next_event = false; - if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) = - next_event.and_then(|ne| ne.attributes.as_ref()) - { - if let Some(ref sa) = atts.search_attributes { - skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY); - } - } - - return Ok(EventHandlingOutcome::SkipEvent { skip_next_event }); + debug!("Deprecated patch marker tried against non-patch machine, skipping."); + skip_one_or_two_events(next_event) + } else { + Err(WFMachinesError::Nondeterminism(format!( + "Non-deprecated patch marker encountered for change {patch_name}, but there is \ + no corresponding change command!" + ))) } - return Err(WFMachinesError::Nondeterminism(format!( - "Non-deprecated patch marker encountered for change {patch_name}, \ - but there is no corresponding change command!" - ))); - } - // Patch machines themselves may also not *have* matching markers, where non-deprecated - // calls take the old path, and deprecated calls assume history is produced by a new-code - // worker. - if matches!(mach, Machines::PatchMachine(_)) { - debug!("Skipping non-matching event against patch machine"); - return Ok(EventHandlingOutcome::SkipCommand); } + } else if patch_machine.is_some() { + debug!("Skipping non-matching event against patch machine"); + Ok(EventHandlingOutcome::SkipCommand) + } else { + // Not a patch machine or a patch event + Ok(EventHandlingOutcome::Normal) } - Ok(EventHandlingOutcome::Normal) } #[derive(derive_more::From)] diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 640b06b89..896da218c 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -249,8 +249,8 @@ impl WfContext { /// Record that this workflow history was created with the provided patch, and it is being /// phased out. - pub fn deprecate_patch(&self, patch_id: &str) { - self.patch_impl(patch_id, true); + pub fn deprecate_patch(&self, patch_id: &str) -> bool { + self.patch_impl(patch_id, true) } fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool { diff --git a/tests/integ_tests/workflow_tests/patches.rs b/tests/integ_tests/workflow_tests/patches.rs index bf90eac15..12482e1a2 100644 --- a/tests/integ_tests/workflow_tests/patches.rs +++ b/tests/integ_tests/workflow_tests/patches.rs @@ -1,5 +1,8 @@ use std::{ - sync::atomic::{AtomicBool, Ordering}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::Duration, }; @@ -117,3 +120,34 @@ async fn patched_on_second_workflow_task_is_deterministic() { starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); } + +#[tokio::test] +async fn can_remove_deprecated_patch_near_other_patch() { + let wf_name = "can_add_change_markers"; + let mut starter = CoreWfStarter::new(wf_name); + starter.no_remote_activities(); + let mut worker = starter.worker().await; + let did_die = Arc::new(AtomicBool::new(false)); + worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { + let did_die = did_die.clone(); + async move { + ctx.timer(Duration::from_millis(200)).await; + if !did_die.load(Ordering::Acquire) { + assert!(ctx.deprecate_patch("getting-deprecated")); + assert!(ctx.patched("staying")); + } else { + assert!(ctx.patched("staying")); + } + ctx.timer(Duration::from_millis(200)).await; + + if !did_die.load(Ordering::Acquire) { + did_die.store(true, Ordering::Release); + ctx.force_task_fail(anyhow::anyhow!("i'm ded")); + } + Ok(().into()) + } + }); + + starter.start_with_worker(wf_name, &mut worker).await; + worker.run_until_done().await.unwrap(); +}