Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/worker/workflow/machines/patch_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ impl TryFrom<HistEventData> for PatchMachineEvents {
}
}

impl PatchMachine {
/// Returns true if this patch machine has the same id as the one provided
pub(crate) fn matches_patch(&self, id: &str) -> bool {
self.shared_state.patch_id == id
}
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down
91 changes: 54 additions & 37 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub(crate) struct WorkflowMachines {
current_wf_task_commands: VecDeque<CommandAndMachine>,

/// Information about patch markers we have already seen while replaying history
encountered_change_markers: HashMap<String, ChangeInfo>,
encountered_patch_markers: HashMap<String, ChangeInfo>,

/// Contains extra local-activity related data
local_activity_data: LocalActivityData,
Expand Down Expand Up @@ -255,7 +255,7 @@ impl WorkflowMachines {
id_to_machine: Default::default(),
commands: Default::default(),
current_wf_task_commands: Default::default(),
encountered_change_markers: Default::default(),
encountered_patch_markers: Default::default(),
local_activity_data: LocalActivityData::default(),
have_seen_terminal_event: false,
}
Expand Down Expand Up @@ -547,7 +547,7 @@ impl WorkflowMachines {
.peek_next_wft_sequence(last_handled_wft_started_id)
{
if let Some((patch_id, _)) = e.get_patch_marker_details() {
self.encountered_change_markers.insert(
self.encountered_patch_markers.insert(
patch_id.clone(),
ChangeInfo {
created_command: false,
Expand Down Expand Up @@ -718,7 +718,7 @@ impl WorkflowMachines {
let consumed_cmd = loop {
if let Some(peek_machine) = self.commands.front() {
let mach = self.machine(peek_machine.machine);
match change_marker_handling(event, mach, next_event)? {
match patch_marker_handling(event, mach, next_event)? {
EventHandlingOutcome::SkipCommand => {
self.commands.pop_front();
continue;
Expand Down Expand Up @@ -1138,7 +1138,7 @@ impl WorkflowMachines {
WFCommand::SetPatchMarker(attrs) => {
// Do not create commands for change IDs that we have already created commands
// for.
let encountered_entry = self.encountered_change_markers.get(&attrs.patch_id);
let encountered_entry = self.encountered_patch_markers.get(&attrs.patch_id);
if !matches!(encountered_entry,
Some(ChangeInfo {created_command}) if *created_command)
{
Expand All @@ -1147,17 +1147,17 @@ impl WorkflowMachines {
self.replaying,
attrs.deprecated,
encountered_entry.is_some(),
self.encountered_change_markers.keys().map(|s| s.as_str()),
self.encountered_patch_markers.keys().map(|s| s.as_str()),
self.observed_internal_flags.clone(),
)?;
let mkey =
self.add_cmd_to_wf_task(patch_machine, CommandIdKind::NeverResolves);
self.process_machine_responses(mkey, other_cmds)?;

if let Some(ci) = self.encountered_change_markers.get_mut(&attrs.patch_id) {
if let Some(ci) = self.encountered_patch_markers.get_mut(&attrs.patch_id) {
ci.created_command = true;
} else {
self.encountered_change_markers.insert(
self.encountered_patch_markers.insert(
attrs.patch_id,
ChangeInfo {
created_command: true,
Expand Down Expand Up @@ -1360,45 +1360,62 @@ enum EventHandlingOutcome {

/// Special handling for patch markers, when handling command events as in
/// [WorkflowMachines::handle_command_event]
fn change_marker_handling(
fn patch_marker_handling(
event: &HistoryEvent,
mach: &Machines,
next_event: Option<&HistoryEvent>,
) -> Result<EventHandlingOutcome> {
if !mach.matches_event(event) {
// Version markers can be skipped in the event they are deprecated
if let Some((patch_name, deprecated)) = event.get_patch_marker_details() {
let patch_machine = match mach {
Machines::PatchMachine(pm) => Some(pm),
_ => None,
};
let patch_details = event.get_patch_marker_details();
fn skip_one_or_two_events(next_event: Option<&HistoryEvent>) -> Result<EventHandlingOutcome> {
// Also ignore the subsequent upsert event if present
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced in this PR, but...
Are there cases where upsert search attributes follows a patch but the patch machine is not the one that issued that command?

I think it's only possible if a patch was emitted before we started emitting SAs on patches but still worth supporting for correctness.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably technically possible, but, this check looks to see for the specific attribute key for versioning. So this could only happen if the user explicitly did that themselves somehow, which: A) would be super unlikely and B) means I have no way to differentiate anyways so nothing can be done.

let mut skip_next_event = false;
if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) =
next_event.and_then(|ne| ne.attributes.as_ref())
{
if let Some(ref sa) = atts.search_attributes {
skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY);
}
}

Ok(EventHandlingOutcome::SkipEvent { skip_next_event })
}

if let Some((patch_name, deprecated)) = patch_details {
if let Some(pm) = patch_machine {
// If the next machine *is* a patch machine, but this marker is deprecated, it may
// either apply to this machine (the `deprecate_patch` call is still in workflow code) -
// or it could be another `patched` or `deprecate_patch` call for a *different* patch,
// which we should also permit. In the latter case, we should skip this event.
if !pm.matches_patch(&patch_name) && deprecated {
skip_one_or_two_events(next_event)
} else {
Ok(EventHandlingOutcome::Normal)
}
} else {
// Version markers can be skipped in the event they are deprecated
// Is deprecated. We can simply ignore this event, as deprecated change
// markers are allowed without matching changed calls.
if deprecated {
debug!("Deprecated patch marker tried against wrong machine, skipping.");

// Also ignore the subsequent upsert event if present
let mut skip_next_event = false;
if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) =
next_event.and_then(|ne| ne.attributes.as_ref())
{
if let Some(ref sa) = atts.search_attributes {
skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY);
}
}

return Ok(EventHandlingOutcome::SkipEvent { skip_next_event });
debug!("Deprecated patch marker tried against non-patch machine, skipping.");
skip_one_or_two_events(next_event)
} else {
Err(WFMachinesError::Nondeterminism(format!(
"Non-deprecated patch marker encountered for change {patch_name}, but there is \
no corresponding change command!"
)))
}
return Err(WFMachinesError::Nondeterminism(format!(
"Non-deprecated patch marker encountered for change {patch_name}, \
but there is no corresponding change command!"
)));
}
// Patch machines themselves may also not *have* matching markers, where non-deprecated
// calls take the old path, and deprecated calls assume history is produced by a new-code
// worker.
if matches!(mach, Machines::PatchMachine(_)) {
debug!("Skipping non-matching event against patch machine");
return Ok(EventHandlingOutcome::SkipCommand);
}
} else if patch_machine.is_some() {
debug!("Skipping non-matching event against patch machine");
Ok(EventHandlingOutcome::SkipCommand)
} else {
// Not a patch machine or a patch event
Ok(EventHandlingOutcome::Normal)
}
Ok(EventHandlingOutcome::Normal)
}

#[derive(derive_more::From)]
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl WfContext {

/// Record that this workflow history was created with the provided patch, and it is being
/// phased out.
pub fn deprecate_patch(&self, patch_id: &str) {
self.patch_impl(patch_id, true);
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
self.patch_impl(patch_id, true)
}

fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {
Expand Down
36 changes: 35 additions & 1 deletion tests/integ_tests/workflow_tests/patches.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

Expand Down Expand Up @@ -117,3 +120,34 @@ async fn patched_on_second_workflow_task_is_deterministic() {
starter.start_with_worker(wf_name, &mut worker).await;
worker.run_until_done().await.unwrap();
}

#[tokio::test]
async fn can_remove_deprecated_patch_near_other_patch() {
let wf_name = "can_add_change_markers";
let mut starter = CoreWfStarter::new(wf_name);
starter.no_remote_activities();
let mut worker = starter.worker().await;
let did_die = Arc::new(AtomicBool::new(false));
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
let did_die = did_die.clone();
async move {
ctx.timer(Duration::from_millis(200)).await;
if !did_die.load(Ordering::Acquire) {
assert!(ctx.deprecate_patch("getting-deprecated"));
assert!(ctx.patched("staying"));
} else {
assert!(ctx.patched("staying"));
}
ctx.timer(Duration::from_millis(200)).await;

if !did_die.load(Ordering::Acquire) {
did_die.store(true, Ordering::Release);
ctx.force_task_fail(anyhow::anyhow!("i'm ded"));
}
Ok(().into())
}
});

starter.start_with_worker(wf_name, &mut worker).await;
worker.run_until_done().await.unwrap();
}