Skip to content

Commit 87af320

Browse files
Sushisourcemjameswh
authored andcommitted
Fix removal of deprecated patch call adjacent to other patch (#587)
(cherry picked from commit 8411990)
1 parent 7f9755b commit 87af320

File tree

4 files changed

+98
-40
lines changed

4 files changed

+98
-40
lines changed

core/src/worker/workflow/machines/patch_state_machine.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,13 @@ impl TryFrom<HistEventData> for PatchMachineEvents {
270270
}
271271
}
272272

273+
impl PatchMachine {
274+
/// Returns true if this patch machine has the same id as the one provided
275+
pub(crate) fn matches_patch(&self, id: &str) -> bool {
276+
self.shared_state.patch_id == id
277+
}
278+
}
279+
273280
#[cfg(test)]
274281
mod tests {
275282
use crate::{

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub(crate) struct WorkflowMachines {
138138
current_wf_task_commands: VecDeque<CommandAndMachine>,
139139

140140
/// Information about patch markers we have already seen while replaying history
141-
encountered_change_markers: HashMap<String, ChangeInfo>,
141+
encountered_patch_markers: HashMap<String, ChangeInfo>,
142142

143143
/// Contains extra local-activity related data
144144
local_activity_data: LocalActivityData,
@@ -255,7 +255,7 @@ impl WorkflowMachines {
255255
id_to_machine: Default::default(),
256256
commands: Default::default(),
257257
current_wf_task_commands: Default::default(),
258-
encountered_change_markers: Default::default(),
258+
encountered_patch_markers: Default::default(),
259259
local_activity_data: LocalActivityData::default(),
260260
have_seen_terminal_event: false,
261261
}
@@ -547,7 +547,7 @@ impl WorkflowMachines {
547547
.peek_next_wft_sequence(last_handled_wft_started_id)
548548
{
549549
if let Some((patch_id, _)) = e.get_patch_marker_details() {
550-
self.encountered_change_markers.insert(
550+
self.encountered_patch_markers.insert(
551551
patch_id.clone(),
552552
ChangeInfo {
553553
created_command: false,
@@ -718,7 +718,7 @@ impl WorkflowMachines {
718718
let consumed_cmd = loop {
719719
if let Some(peek_machine) = self.commands.front() {
720720
let mach = self.machine(peek_machine.machine);
721-
match change_marker_handling(event, mach, next_event)? {
721+
match patch_marker_handling(event, mach, next_event)? {
722722
EventHandlingOutcome::SkipCommand => {
723723
self.commands.pop_front();
724724
continue;
@@ -1138,7 +1138,7 @@ impl WorkflowMachines {
11381138
WFCommand::SetPatchMarker(attrs) => {
11391139
// Do not create commands for change IDs that we have already created commands
11401140
// for.
1141-
let encountered_entry = self.encountered_change_markers.get(&attrs.patch_id);
1141+
let encountered_entry = self.encountered_patch_markers.get(&attrs.patch_id);
11421142
if !matches!(encountered_entry,
11431143
Some(ChangeInfo {created_command}) if *created_command)
11441144
{
@@ -1147,17 +1147,17 @@ impl WorkflowMachines {
11471147
self.replaying,
11481148
attrs.deprecated,
11491149
encountered_entry.is_some(),
1150-
self.encountered_change_markers.keys().map(|s| s.as_str()),
1150+
self.encountered_patch_markers.keys().map(|s| s.as_str()),
11511151
self.observed_internal_flags.clone(),
11521152
)?;
11531153
let mkey =
11541154
self.add_cmd_to_wf_task(patch_machine, CommandIdKind::NeverResolves);
11551155
self.process_machine_responses(mkey, other_cmds)?;
11561156

1157-
if let Some(ci) = self.encountered_change_markers.get_mut(&attrs.patch_id) {
1157+
if let Some(ci) = self.encountered_patch_markers.get_mut(&attrs.patch_id) {
11581158
ci.created_command = true;
11591159
} else {
1160-
self.encountered_change_markers.insert(
1160+
self.encountered_patch_markers.insert(
11611161
attrs.patch_id,
11621162
ChangeInfo {
11631163
created_command: true,
@@ -1360,45 +1360,62 @@ enum EventHandlingOutcome {
13601360

13611361
/// Special handling for patch markers, when handling command events as in
13621362
/// [WorkflowMachines::handle_command_event]
1363-
fn change_marker_handling(
1363+
fn patch_marker_handling(
13641364
event: &HistoryEvent,
13651365
mach: &Machines,
13661366
next_event: Option<&HistoryEvent>,
13671367
) -> Result<EventHandlingOutcome> {
1368-
if !mach.matches_event(event) {
1369-
// Version markers can be skipped in the event they are deprecated
1370-
if let Some((patch_name, deprecated)) = event.get_patch_marker_details() {
1368+
let patch_machine = match mach {
1369+
Machines::PatchMachine(pm) => Some(pm),
1370+
_ => None,
1371+
};
1372+
let patch_details = event.get_patch_marker_details();
1373+
fn skip_one_or_two_events(next_event: Option<&HistoryEvent>) -> Result<EventHandlingOutcome> {
1374+
// Also ignore the subsequent upsert event if present
1375+
let mut skip_next_event = false;
1376+
if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) =
1377+
next_event.and_then(|ne| ne.attributes.as_ref())
1378+
{
1379+
if let Some(ref sa) = atts.search_attributes {
1380+
skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY);
1381+
}
1382+
}
1383+
1384+
Ok(EventHandlingOutcome::SkipEvent { skip_next_event })
1385+
}
1386+
1387+
if let Some((patch_name, deprecated)) = patch_details {
1388+
if let Some(pm) = patch_machine {
1389+
// If the next machine *is* a patch machine, but this marker is deprecated, it may
1390+
// either apply to this machine (the `deprecate_patch` call is still in workflow code) -
1391+
// or it could be another `patched` or `deprecate_patch` call for a *different* patch,
1392+
// which we should also permit. In the latter case, we should skip this event.
1393+
if !pm.matches_patch(&patch_name) && deprecated {
1394+
skip_one_or_two_events(next_event)
1395+
} else {
1396+
Ok(EventHandlingOutcome::Normal)
1397+
}
1398+
} else {
1399+
// Version markers can be skipped in the event they are deprecated
13711400
// Is deprecated. We can simply ignore this event, as deprecated change
13721401
// markers are allowed without matching changed calls.
13731402
if deprecated {
1374-
debug!("Deprecated patch marker tried against wrong machine, skipping.");
1375-
1376-
// Also ignore the subsequent upsert event if present
1377-
let mut skip_next_event = false;
1378-
if let Some(Attributes::UpsertWorkflowSearchAttributesEventAttributes(atts)) =
1379-
next_event.and_then(|ne| ne.attributes.as_ref())
1380-
{
1381-
if let Some(ref sa) = atts.search_attributes {
1382-
skip_next_event = sa.indexed_fields.contains_key(VERSION_SEARCH_ATTR_KEY);
1383-
}
1384-
}
1385-
1386-
return Ok(EventHandlingOutcome::SkipEvent { skip_next_event });
1403+
debug!("Deprecated patch marker tried against non-patch machine, skipping.");
1404+
skip_one_or_two_events(next_event)
1405+
} else {
1406+
Err(WFMachinesError::Nondeterminism(format!(
1407+
"Non-deprecated patch marker encountered for change {patch_name}, but there is \
1408+
no corresponding change command!"
1409+
)))
13871410
}
1388-
return Err(WFMachinesError::Nondeterminism(format!(
1389-
"Non-deprecated patch marker encountered for change {patch_name}, \
1390-
but there is no corresponding change command!"
1391-
)));
1392-
}
1393-
// Patch machines themselves may also not *have* matching markers, where non-deprecated
1394-
// calls take the old path, and deprecated calls assume history is produced by a new-code
1395-
// worker.
1396-
if matches!(mach, Machines::PatchMachine(_)) {
1397-
debug!("Skipping non-matching event against patch machine");
1398-
return Ok(EventHandlingOutcome::SkipCommand);
13991411
}
1412+
} else if patch_machine.is_some() {
1413+
debug!("Skipping non-matching event against patch machine");
1414+
Ok(EventHandlingOutcome::SkipCommand)
1415+
} else {
1416+
// Not a patch machine or a patch event
1417+
Ok(EventHandlingOutcome::Normal)
14001418
}
1401-
Ok(EventHandlingOutcome::Normal)
14021419
}
14031420

14041421
#[derive(derive_more::From)]

sdk/src/workflow_context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ impl WfContext {
249249

250250
/// Record that this workflow history was created with the provided patch, and it is being
251251
/// phased out.
252-
pub fn deprecate_patch(&self, patch_id: &str) {
253-
self.patch_impl(patch_id, true);
252+
pub fn deprecate_patch(&self, patch_id: &str) -> bool {
253+
self.patch_impl(patch_id, true)
254254
}
255255

256256
fn patch_impl(&self, patch_id: &str, deprecated: bool) -> bool {

tests/integ_tests/workflow_tests/patches.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use std::{
2-
sync::atomic::{AtomicBool, Ordering},
2+
sync::{
3+
atomic::{AtomicBool, Ordering},
4+
Arc,
5+
},
36
time::Duration,
47
};
58

@@ -117,3 +120,34 @@ async fn patched_on_second_workflow_task_is_deterministic() {
117120
starter.start_with_worker(wf_name, &mut worker).await;
118121
worker.run_until_done().await.unwrap();
119122
}
123+
124+
#[tokio::test]
125+
async fn can_remove_deprecated_patch_near_other_patch() {
126+
let wf_name = "can_add_change_markers";
127+
let mut starter = CoreWfStarter::new(wf_name);
128+
starter.no_remote_activities();
129+
let mut worker = starter.worker().await;
130+
let did_die = Arc::new(AtomicBool::new(false));
131+
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
132+
let did_die = did_die.clone();
133+
async move {
134+
ctx.timer(Duration::from_millis(200)).await;
135+
if !did_die.load(Ordering::Acquire) {
136+
assert!(ctx.deprecate_patch("getting-deprecated"));
137+
assert!(ctx.patched("staying"));
138+
} else {
139+
assert!(ctx.patched("staying"));
140+
}
141+
ctx.timer(Duration::from_millis(200)).await;
142+
143+
if !did_die.load(Ordering::Acquire) {
144+
did_die.store(true, Ordering::Release);
145+
ctx.force_task_fail(anyhow::anyhow!("i'm ded"));
146+
}
147+
Ok(().into())
148+
}
149+
});
150+
151+
starter.start_with_worker(wf_name, &mut worker).await;
152+
worker.run_until_done().await.unwrap();
153+
}

0 commit comments

Comments
 (0)