Skip to content

Commit

Permalink
Honor all non-terminal commands
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jul 15, 2024
1 parent 5e3b274 commit f046bc9
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 27 deletions.
9 changes: 9 additions & 0 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ pub(crate) enum CoreInternalFlags {
UpsertSearchAttributeOnPatch = 2,
/// We received a value higher than this code can understand.
TooHigh = u32::MAX,
/// Prior to this flag, we truncated commands received from lang at the
/// first terminal (i.e. workflow-terminating) command. With this flag, we
/// reorder commands such that all non-terminal commands come first,
/// followed by the first terminal command, if any (it's possible that
/// multiple workflow coroutines generated a terminal command). This has the
/// consequence that all non-terminal commands are sent to the server, even
/// if in the sequence delivered by lang they came after a terminal command.
/// See https://github.com/temporalio/features/issues/481.
ReorderCommands = 3,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub(crate) struct WorkflowMachines {
current_wf_time: Option<SystemTime>,
/// The internal flags which have been seen so far during this run's execution and thus are
/// usable during replay.
observed_internal_flags: InternalFlagsRef,
pub(crate) observed_internal_flags: InternalFlagsRef,
/// Set on each WFT started event, the most recent size of history in bytes
history_size_bytes: u64,
/// Set on each WFT started event
Expand Down
253 changes: 238 additions & 15 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
abstractions::dbg_panic,
internal_flags::CoreInternalFlags,
protosext::{protocol_messages::IncomingProtocolMessage, WorkflowActivationExt},
telemetry::metrics,
worker::{
Expand Down Expand Up @@ -366,7 +367,7 @@ impl ManagedRun {
pub(super) fn successful_completion(
&mut self,
mut commands: Vec<WFCommand>,
used_flags: Vec<u32>,
mut used_flags: Vec<u32>,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
) -> Result<RunUpdateAct, NextPageReq> {
let activation_was_only_eviction = self.activation_is_eviction();
Expand Down Expand Up @@ -422,20 +423,8 @@ impl ManagedRun {
);
Ok(None)
} else {
// First strip out query responses from other commands that actually affect machines
// Would be prettier with `drain_filter`
let mut query_responses = vec![];
commands = std::mem::take(&mut commands)
.into_iter()
.filter_map(|x| {
if let WFCommand::QueryResponse(qr) = x {
query_responses.push(qr);
None
} else {
Some(x)
}
})
.collect();
let (commands, query_responses) =
self.preprocess_command_sequence(commands, &mut used_flags);

if activation_was_only_eviction && !commands.is_empty() {
dbg_panic!("Reply to an eviction included commands");
Expand Down Expand Up @@ -479,6 +468,47 @@ impl ManagedRun {
}
}

/// Core has received from lang a sequence containing all commands generated
/// by all workflow coroutines. Return a command sequence containing all
/// non-terminal (i.e. non-workflow-terminating) commands, followed by a
/// single terminal command if there is one. Also strip out and return query
/// results (these don't affect machines and are handled separately
/// downstream)
///
/// The reordering is done in order that all non-terminal commands generated
/// by workflow coroutines are given a chance for the server to honor them.
/// For example, in order to deliver an update result to a client as the
/// workflow completes. See
/// https://github.com/temporalio/features/issues/481. Behavior here has
/// changed backwards-incompatibly, so a flag is set if the outcome differs
/// from what the outcome would have been previously. Note that multiple
/// coroutines may have generated a terminal (i.e. workflow-terminating)
/// command; if so, the first is used.
fn preprocess_command_sequence(
&mut self,
commands: Vec<WFCommand>,
used_flags: &mut Vec<u32>,
) -> (Vec<WFCommand>, Vec<QueryResult>) {
let reorder_commands_flag_in_effect = self
.wfm
.machines
.observed_internal_flags
.borrow_mut()
.try_use(CoreInternalFlags::ReorderCommands, false);

if self.wfm.machines.replaying && !reorder_commands_flag_in_effect {
preprocess_command_sequence_old_behavior(commands)
} else {
let (commands, query_results, any_reordered) = preprocess_command_sequence(commands);
if any_reordered {
// See comment on CoreInternalFlags::ReorderCommands.
used_flags.append(&mut vec![CoreInternalFlags::ReorderCommands as u32]);
};

(commands, query_results)
}
}

/// Called after the higher-up machinery has fetched more pages of event history needed to apply
/// the next workflow task. The history update and paginator used to perform the fetch are
/// passed in, with the update being used to apply the task, and the paginator stored to be
Expand Down Expand Up @@ -1169,6 +1199,65 @@ impl ManagedRun {
}
}

// Remove query responses and terminal commands; append first terminal command
// (if any) to end of sequence. Return resulting command sequence, query
// commands, and a boolean recording whether the result differs from the old
// behavior (i.e. whether there were any non-terminal, non-query commands after
// the first terminal).
fn preprocess_command_sequence(
mut commands: Vec<WFCommand>,
) -> (Vec<WFCommand>, Vec<QueryResult>, bool) {
let mut query_results = vec![];
let mut terminals = vec![];
let (mut seen_terminal, mut any_moved) = (false, false);

commands = std::mem::take(&mut commands)
.into_iter()
.filter_map(|c| {
if let WFCommand::QueryResponse(qr) = c {
query_results.push(qr);
None
} else if c.is_terminal() {
terminals.push(c);
seen_terminal = true;
None
} else {
any_moved |= seen_terminal;
Some(c)
}
})
.collect();
if let Some(first_terminal) = terminals.into_iter().next() {
commands.push(first_terminal);
}
(commands, query_results, any_moved)
}

fn preprocess_command_sequence_old_behavior(
mut commands: Vec<WFCommand>,
) -> (Vec<WFCommand>, Vec<QueryResult>) {
let mut query_results = vec![];
let mut seen_terminal = false;

commands = std::mem::take(&mut commands)
.into_iter()
.filter_map(|c| {
if let WFCommand::QueryResponse(qr) = c {
query_results.push(qr);
None
} else if seen_terminal {
None
} else {
if c.is_terminal() {
seen_terminal = true;
}
Some(c)
}
})
.collect();
(commands, query_results)
}

/// Drains pending queries from the workflow task and appends them to the activation's jobs
fn put_queries_in_act(act: &mut WorkflowActivation, wft: &mut OutstandingTask) {
// Nothing to do if there are no pending queries
Expand Down Expand Up @@ -1397,3 +1486,137 @@ impl From<WFMachinesError> for RunUpdateErr {
}
}
}

#[cfg(test)]
mod tests {
use crate::worker::workflow::WFCommand;
use std::mem::{discriminant, Discriminant};

use command_utils::*;

#[rstest::rstest]
#[case::empty(
vec![],
vec![],
false)]
#[case::non_terminal_is_retained(
vec![update_response()],
vec![update_response()],
false)]
#[case::terminal_is_retained(
vec![complete()],
vec![complete()],
false)]
#[case::post_terminal_is_retained(
vec![complete(), update_response()],
vec![update_response(), complete()],
true)]
#[case::second_terminal_is_discarded(
vec![cancel(), complete()],
vec![cancel()],
false)]
#[case::move_terminals_to_end_and_retain_first(
vec![update_response(), complete(), update_response(), cancel(), update_response()],
vec![update_response(), update_response(), update_response(), complete()],
true)]
#[test]
fn preprocess_command_sequence(
#[case] commands_in: Vec<WFCommand>,
#[case] expected_commands: Vec<WFCommand>,
#[case] expected_any_reordered: bool,
) {
let (commands, _, any_reordered) = super::preprocess_command_sequence(commands_in);
assert_eq!(command_types(&commands), command_types(&expected_commands));
assert_eq!(any_reordered, expected_any_reordered);
}

#[rstest::rstest]
#[case::query_responses_extracted(
vec![query_response(), update_response(), query_response(), complete(), query_response()],
3,
)]
#[test]
fn preprocess_command_sequence_extracts_queries(
#[case] commands_in: Vec<WFCommand>,
#[case] expected_queries_out: usize,
) {
let (_, query_responses_out, _) = super::preprocess_command_sequence(commands_in);
assert_eq!(query_responses_out.len(), expected_queries_out);
}

#[rstest::rstest]
#[case::empty(
vec![],
vec![])]
#[case::non_terminal_is_retained(
vec![update_response()],
vec![update_response()])]
#[case::terminal_is_retained(
vec![complete()],
vec![complete()])]
#[case::post_terminal_is_discarded(
vec![complete(), update_response()],
vec![complete()])]
#[case::second_terminal_is_discarded(
vec![cancel(), complete()],
vec![cancel()])]
#[case::truncate_at_first_complete(
vec![update_response(), complete(), update_response(), cancel()],
vec![update_response(), complete()])]
#[test]
fn preprocess_command_sequence_old_behavior(
#[case] commands_in: Vec<WFCommand>,
#[case] expected_out: Vec<WFCommand>,
) {
let (commands_out, _) = super::preprocess_command_sequence_old_behavior(commands_in);
assert_eq!(command_types(&commands_out), command_types(&expected_out));
}

#[rstest::rstest]
#[case::query_responses_extracted(
vec![query_response(), update_response(), query_response(), complete(), query_response()],
3,
)]
#[test]
fn preprocess_command_sequence_old_behavior_extracts_queries(
#[case] commands_in: Vec<WFCommand>,
#[case] expected_queries_out: usize,
) {
let (_, query_responses_out) = super::preprocess_command_sequence_old_behavior(commands_in);
assert_eq!(query_responses_out.len(), expected_queries_out);
}

mod command_utils {
use temporal_sdk_core_protos::coresdk::workflow_commands::{
CancelWorkflowExecution, CompleteWorkflowExecution, QueryResult, UpdateResponse,
};

use super::*;

pub(crate) fn complete() -> WFCommand {
WFCommand::CompleteWorkflow(CompleteWorkflowExecution { result: None })
}

pub(crate) fn cancel() -> WFCommand {
WFCommand::CancelWorkflow(CancelWorkflowExecution {})
}

pub(crate) fn query_response() -> WFCommand {
WFCommand::QueryResponse(QueryResult {
query_id: "".into(),
variant: None,
})
}

pub(crate) fn update_response() -> WFCommand {
WFCommand::UpdateResponse(UpdateResponse {
protocol_instance_id: "".into(),
response: None,
})
}

pub(crate) fn command_types(commands: &[WFCommand]) -> Vec<Discriminant<WFCommand>> {
commands.iter().map(discriminant).collect()
}
}
}
12 changes: 1 addition & 11 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ fn validate_completion(
match completion.status {
Some(workflow_activation_completion::Status::Successful(success)) => {
// Convert to wf commands
let mut commands = success
let commands = success
.commands
.into_iter()
.map(|c| c.try_into())
Expand All @@ -1070,16 +1070,6 @@ fn validate_completion(
});
}

// Any non-query-response commands after a terminal command should be ignored
if let Some(term_cmd_pos) = commands.iter().position(|c| c.is_terminal()) {
// Query responses are just fine, so keep them.
let queries = commands
.split_off(term_cmd_pos + 1)
.into_iter()
.filter(|c| matches!(c, WFCommand::QueryResponse(_)));
commands.extend(queries);
}

Ok(ValidatedCompletion::Success {
run_id: completion.run_id,
commands,
Expand Down

0 comments on commit f046bc9

Please sign in to comment.