Skip to content

Commit

Permalink
Honor all non-terminal commands
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jul 17, 2024
1 parent 5e3b274 commit 873a978
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 47 deletions.
2 changes: 1 addition & 1 deletion core/src/core_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn activity_id_or_type_change_is_nondeterministic(
) {
let wf_id = "fakeid";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let mut t = if local_act {
let mut t: TestHistoryBuilder = if local_act {
canned_histories::single_local_activity("1")
} else {
canned_histories::single_activity("1")
Expand Down
74 changes: 68 additions & 6 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use temporal_sdk_core_protos::{
StartWorkflow, UpdateRandomSeed, WorkflowActivationJob,
},
workflow_commands::{
update_response::Response, ActivityCancellationType, CancelTimer,
update_response::Response, workflow_command, ActivityCancellationType, CancelTimer,
CompleteWorkflowExecution, ContinueAsNewWorkflowExecution, FailWorkflowExecution,
RequestCancelActivity, ScheduleActivity, SetPatchMarker, StartChildWorkflowExecution,
UpdateResponse,
Expand All @@ -56,7 +56,7 @@ use temporal_sdk_core_protos::{
temporal::api::{
command::v1::command::Attributes,
common::v1::{Payload, RetryPolicy, WorkerVersionStamp},
enums::v1::{EventType, WorkflowTaskFailedCause},
enums::v1::{CommandType, EventType, WorkflowTaskFailedCause},
failure::v1::Failure,
history::v1::{
history_event, TimerFiredEventAttributes,
Expand Down Expand Up @@ -2415,7 +2415,6 @@ async fn lang_internal_flags() {

#[tokio::test]
async fn lang_internal_flag_with_update() {
crate::telemetry::test_telem_console();
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
Expand Down Expand Up @@ -2504,11 +2503,73 @@ async fn core_internal_flags() {
core.shutdown().await;
}

// A post-terminal command is retained, and placed before the terminal command.
#[tokio::test]
async fn post_terminal_commands_are_discarded() {
async fn post_terminal_commands_are_retained_when_not_replaying() {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_timer_started("1".to_string());
t.add_workflow_execution_completed();

let commands_sent_by_lang = vec![
CompleteWorkflowExecution { result: None }.into(),
start_timer_cmd(1, Duration::from_secs(1)),
];
let expected_command_types_emitted = vec![
CommandType::StartTimer,
CommandType::CompleteWorkflowExecution,
];
_simulate_completion_from_lang_and_assert_commands_emitted_by_core(
commands_sent_by_lang,
expected_command_types_emitted,
t,
)
.await
}

async fn _simulate_completion_from_lang_and_assert_commands_emitted_by_core(
commands_sent_by_lang: Vec<workflow_command::Variant>,
expected_command_types: Vec<CommandType>,
t: TestHistoryBuilder,
) {
let mut mh = MockPollCfg::from_resp_batches(
"fake_wf_id",
t,
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
mock_workflow_client(),
);
mh.completion_mock_fn = Some(Box::new(move |c| {
let command_types: Vec<_> = c.commands.iter().map(|c| c.command_type()).collect();
assert_eq!(command_types, expected_command_types);
Ok(Default::default())
}));
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let act = core.poll_workflow_activation().await.unwrap();

core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
commands_sent_by_lang,
))
.await
.unwrap();

// This just ensures applying the complete history w/ the completion command works, though
// there's no activation.
let act = core.poll_workflow_activation().await;
assert_matches!(act.unwrap_err(), PollWfError::ShutDown);
core.shutdown().await;
}

#[tokio::test]
async fn move_terminal_commands_flag_test_1() {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_timer_started("1".to_string());
t.add_workflow_execution_completed();

let mut mh = MockPollCfg::from_resp_batches(
Expand All @@ -2518,8 +2579,9 @@ async fn post_terminal_commands_are_discarded() {
mock_workflow_client(),
);
mh.completion_mock_fn = Some(Box::new(|c| {
// Only the complete execution command should actually be sent
assert_eq!(c.commands.len(), 1);
// The start timer and complete execution commands should be sent, in
// that order.
assert_eq!(c.commands.len(), 2);
Ok(Default::default())
}));
let mut mock = build_mock_pollers(mh);
Expand Down
24 changes: 17 additions & 7 deletions core/src/internal_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use temporal_sdk_core_protos::temporal::api::{
/// This enumeration contains internal flags that may result in incompatible history changes with
/// older workflows, or other breaking changes.
///
/// When a flag has existed long enough the version it was introduced in is no longer supported, it
/// When a flag has existed long enough that the version it was introduced in is no longer supported, it
/// may be removed from the enum. *Importantly*, all variants must be given explicit values, such
/// that removing older variants does not create any change in existing values. Removed flag
/// variants must be reserved forever (a-la protobuf), and should be called out in a comment.
Expand All @@ -27,6 +27,15 @@ pub(crate) enum CoreInternalFlags {
/// Introduced automatically upserting search attributes for each patched call, and
/// nondeterminism checks for upserts.
UpsertSearchAttributeOnPatch = 2,
/// Prior to this flag, we truncated commands received from lang at the
/// first terminal (i.e. workflow-terminating) command. With this flag, we
/// reorder commands such that all non-terminal commands come first,
/// followed by the first terminal command, if any (it's possible that
/// multiple workflow coroutines generated a terminal command). This has the
/// consequence that all non-terminal commands are sent to the server, even
/// if in the sequence delivered by lang they came after a terminal command.
/// See https://github.com/temporalio/features/issues/481.
MoveTerminalCommands = 3,
/// We received a value higher than this code can understand.
TooHigh = u32::MAX,
}
Expand Down Expand Up @@ -82,18 +91,18 @@ impl InternalFlags {
/// Returns true if this flag may currently be used. If `should_record` is true, always returns
/// true and records the flag as being used, for taking later via
/// [Self::gather_for_wft_complete].
pub(crate) fn try_use(&mut self, core_patch: CoreInternalFlags, should_record: bool) -> bool {
pub(crate) fn try_use(&mut self, flag: CoreInternalFlags, should_record: bool) -> bool {
match self {
Self::Enabled {
core,
core_since_last_complete,
..
} => {
if should_record {
core_since_last_complete.insert(core_patch);
core_since_last_complete.insert(flag);
true
} else {
core.contains(&core_patch)
core.contains(&flag)
}
}
// If the server does not support the metadata field, we must assume we can never use
Expand All @@ -114,9 +123,9 @@ impl InternalFlags {
}
}

/// Wipes the recorded flags used during the current WFT and returns a partially filled
/// sdk metadata message that can be combined with any existing data before sending the WFT
/// complete
/// Return a partially filled sdk metadata message containing core and lang flags added since
/// the last WFT complete. The returned value can be combined with other data before sending the
/// WFT complete.
pub(crate) fn gather_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata {
match self {
Self::Enabled {
Expand Down Expand Up @@ -161,6 +170,7 @@ impl CoreInternalFlags {
match v {
1 => Self::IdAndTypeDeterminismChecks,
2 => Self::UpsertSearchAttributeOnPatch,
3 => Self::MoveTerminalCommands,
_ => Self::TooHigh,
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ pub(crate) fn single_hist_mock_sg(
build_mock_pollers(mh)
}

type WFTCompeltionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result<RespondWorkflowTaskCompletedResponse, tonic::Status>
type WFTCompletionMockFn = dyn FnMut(&WorkflowTaskCompletion) -> Result<RespondWorkflowTaskCompletedResponse, tonic::Status>
+ Send;

#[allow(clippy::type_complexity)]
Expand All @@ -395,7 +395,7 @@ pub(crate) struct MockPollCfg {
/// All calls to fail WFTs must match this predicate
pub(crate) expect_fail_wft_matcher:
Box<dyn Fn(&TaskToken, &WorkflowTaskFailedCause, &Option<Failure>) -> bool + Send>,
pub(crate) completion_mock_fn: Option<Box<WFTCompeltionMockFn>>,
pub(crate) completion_mock_fn: Option<Box<WFTCompletionMockFn>>,
pub(crate) num_expected_completions: Option<TimesRange>,
/// If being used with the Rust SDK, this is set true. It ensures pollers will not error out
/// early with no work, since we cannot know the exact number of times polling will happen.
Expand Down Expand Up @@ -476,7 +476,7 @@ impl MockPollCfg {

#[allow(clippy::type_complexity)]
pub(crate) struct CompletionAssertsBuilder<'a> {
dest: &'a mut Option<Box<WFTCompeltionMockFn>>,
dest: &'a mut Option<Box<WFTCompletionMockFn>>,
assertions: VecDeque<Box<dyn FnOnce(&WorkflowTaskCompletion) + Send>>,
}

Expand Down
8 changes: 7 additions & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
};
use crate::{
abstractions::dbg_panic,
internal_flags::InternalFlags,
internal_flags::{CoreInternalFlags, InternalFlags},
protosext::{
protocol_messages::{IncomingProtocolMessage, IncomingProtocolMessageBody},
CompleteLocalActivityData, HistoryEventExt, ValidScheduleLA,
Expand Down Expand Up @@ -470,6 +470,12 @@ impl WorkflowMachines {
.add_lang_used(flags);
}

pub(crate) fn try_use(&self, flag: CoreInternalFlags, should_record: bool) -> bool {
self.observed_internal_flags
.borrow_mut()
.try_use(flag, should_record)
}

/// Undo a speculative workflow task by resetting to a certain WFT Started ID. This can happen
/// when an update request is rejected.
pub(crate) fn reset_last_started_id(&mut self, id: i64) {
Expand Down
Loading

0 comments on commit 873a978

Please sign in to comment.