Skip to content

Commit 979366a

Browse files
Sushisourcemjameswh
authored andcommitted
Fix query related is_replay flag error (#597)
(cherry picked from commit e132dfe)
1 parent f60b9d3 commit 979366a

File tree

2 files changed

+94
-13
lines changed

2 files changed

+94
-13
lines changed

core/src/core_tests/replay_flag.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1-
use crate::{test_help::canned_histories, worker::ManagedWFFunc};
1+
use crate::{
2+
test_help::{
3+
build_mock_pollers, canned_histories, hist_to_poll_resp, mock_worker, MockPollCfg,
4+
},
5+
worker::{client::mocks::mock_workflow_client, ManagedWFFunc, LEGACY_QUERY_ID},
6+
};
27
use rstest::{fixture, rstest};
3-
use std::time::Duration;
8+
use std::{collections::VecDeque, time::Duration};
49
use temporal_sdk::{WfContext, WorkflowFunction};
5-
use temporal_sdk_core_protos::temporal::api::enums::v1::CommandType;
10+
use temporal_sdk_core_api::Worker;
11+
use temporal_sdk_core_protos::{
12+
coresdk::{
13+
workflow_commands::{workflow_command::Variant::RespondToQuery, QueryResult, QuerySuccess},
14+
workflow_completion::WorkflowActivationCompletion,
15+
},
16+
temporal::api::{enums::v1::CommandType, query::v1::WorkflowQuery},
17+
};
18+
use temporal_sdk_core_test_utils::start_timer_cmd;
619

720
fn timers_wf(num_timers: u32) -> WorkflowFunction {
821
WorkflowFunction::new(move |command_sink: WfContext| async move {
@@ -63,3 +76,60 @@ async fn replay_flag_is_correct_partial_history() {
6376
assert_eq!(commands[0].command_type, CommandType::StartTimer as i32);
6477
wfm.shutdown().await.unwrap();
6578
}
79+
80+
#[tokio::test]
81+
async fn replay_flag_correct_with_query() {
82+
let wfid = "fake_wf_id";
83+
let t = canned_histories::single_timer("1");
84+
let tasks = VecDeque::from(vec![
85+
{
86+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), 2.into());
87+
// Server can issue queries that contain the WFT completion and the subsequent
88+
// commands, but not the consequences yet.
89+
pr.query = Some(WorkflowQuery {
90+
query_type: "query-type".to_string(),
91+
query_args: Some(b"hi".into()),
92+
header: None,
93+
});
94+
let h = pr.history.as_mut().unwrap();
95+
h.events.truncate(5);
96+
pr.started_event_id = 3;
97+
dbg!(&pr.resp);
98+
pr
99+
},
100+
hist_to_poll_resp(&t, wfid.to_owned(), 2.into()),
101+
]);
102+
let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client());
103+
mock.num_expected_legacy_query_resps = 1;
104+
let mut mock = build_mock_pollers(mock);
105+
mock.worker_cfg(|wc| wc.max_cached_workflows = 10);
106+
let core = mock_worker(mock);
107+
108+
let task = core.poll_workflow_activation().await.unwrap();
109+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
110+
task.run_id,
111+
start_timer_cmd(1, Duration::from_secs(1)),
112+
))
113+
.await
114+
.unwrap();
115+
116+
let task = core.poll_workflow_activation().await.unwrap();
117+
assert!(task.is_replaying);
118+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
119+
task.run_id,
120+
RespondToQuery(QueryResult {
121+
query_id: LEGACY_QUERY_ID.to_string(),
122+
variant: Some(
123+
QuerySuccess {
124+
response: Some("hi".into()),
125+
}
126+
.into(),
127+
),
128+
}),
129+
))
130+
.await
131+
.unwrap();
132+
133+
let task = core.poll_workflow_activation().await.unwrap();
134+
assert!(!task.is_replaying);
135+
}

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,17 @@ impl WorkflowMachines {
367367
/// "no work" situation. Possibly, it may know about some work the machines don't, like queries.
368368
pub(crate) fn get_wf_activation(&mut self) -> WorkflowActivation {
369369
let jobs = self.drive_me.drain_jobs();
370+
// Even though technically we may have satisfied all the criteria to be done with replay,
371+
// query only activations are always "replaying" to keep things sane.
372+
let all_query = jobs.iter().all(|j| {
373+
matches!(
374+
j.variant,
375+
Some(workflow_activation_job::Variant::QueryWorkflow(_))
376+
)
377+
});
370378
WorkflowActivation {
371379
timestamp: self.current_wf_time.map(Into::into),
372-
is_replaying: self.replaying,
380+
is_replaying: self.replaying || all_query,
373381
run_id: self.run_id.clone(),
374382
history_length: self.last_processed_event as u32,
375383
jobs,
@@ -488,7 +496,6 @@ impl WorkflowMachines {
488496
}
489497
}
490498

491-
let mut saw_completed = false;
492499
let mut do_handle_event = true;
493500
let mut history = events.into_iter().peekable();
494501
while let Some(event) = history.next() {
@@ -504,17 +511,21 @@ impl WorkflowMachines {
504511
// This definition of replaying here is that we are no longer replaying as soon as we
505512
// see new events that have never been seen or produced by the SDK.
506513
//
507-
// Specifically, replay ends once we have seen the last command-event which was produced
508-
// as a result of the last completed WFT. Thus, replay would be false for things like
509-
// signals which were received and after the last completion, and thus generated the
510-
// current WFT being handled.
511-
if self.replaying && has_final_event && saw_completed && !event.is_command_event() {
514+
// Specifically, replay ends once we have seen any non-command event (IE: events that
515+
// aren't a result of something we produced in the SDK) on a WFT which has the final
516+
// event in history (meaning we are processing the most recent WFT and there are no
517+
// more subsequent WFTs). WFT Completed in this case does not count as a non-command
518+
// event, because that will typically show up as the first event in an incremental
519+
// history, and we want to ignore it and its associated commands since we "produced"
520+
// them.
521+
if self.replaying
522+
&& has_final_event
523+
&& event.event_type() != EventType::WorkflowTaskCompleted
524+
&& !event.is_command_event()
525+
{
512526
// Replay is finished
513527
self.replaying = false;
514528
}
515-
if event.event_type() == EventType::WorkflowTaskCompleted {
516-
saw_completed = true;
517-
}
518529

519530
if do_handle_event {
520531
let eho = self.handle_event(

0 commit comments

Comments
 (0)