Skip to content

Commit b20293b

Browse files
authored
Separate activations for queries (#603)
* Always issue queries in their own activation * Fix replay flag when signal exists in final WFT + Query
1 parent 28f8b2b commit b20293b

File tree

8 files changed

+342
-236
lines changed

8 files changed

+342
-236
lines changed

core/src/core_tests/local_activities.rs

Lines changed: 142 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
replay::{default_wes_attribs, TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE},
44
test_help::{
55
hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, MockPollCfg,
6-
ResponseType,
6+
ResponseType, WorkerExt,
77
},
88
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
99
};
@@ -32,9 +32,7 @@ use temporal_sdk_core_protos::{
3232
coresdk::{
3333
activity_result::ActivityExecutionResult,
3434
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
35-
workflow_commands::{
36-
ActivityCancellationType, QueryResult, QuerySuccess, ScheduleLocalActivity,
37-
},
35+
workflow_commands::{ActivityCancellationType, ScheduleLocalActivity},
3836
workflow_completion::WorkflowActivationCompletion,
3937
ActivityTaskCompletion, AsJsonPayloadExt,
4038
},
@@ -47,7 +45,7 @@ use temporal_sdk_core_protos::{
4745
DEFAULT_ACTIVITY_TYPE,
4846
};
4947
use temporal_sdk_core_test_utils::{
50-
schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers,
48+
query_ok, schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers,
5149
};
5250
use tokio::{join, select, sync::Barrier};
5351

@@ -527,16 +525,7 @@ async fn query_during_wft_heartbeat_doesnt_accidentally_fail_to_continue_heartbe
527525
barrier.wait().await;
528526
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
529527
task.run_id,
530-
QueryResult {
531-
query_id: query.query_id.clone(),
532-
variant: Some(
533-
QuerySuccess {
534-
response: Some("whatever".into()),
535-
}
536-
.into(),
537-
),
538-
}
539-
.into(),
528+
query_ok(&query.query_id, "whatev"),
540529
))
541530
.await
542531
.unwrap();
@@ -582,10 +571,8 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
582571
t.add_timer_fired(timer_started_event_id, "1".to_string());
583572

584573
// nonlegacy query got here & LA started here
574+
// then next task is incremental w/ legacy query (for impossible query case)
585575
t.add_full_wf_task();
586-
// legacy query got here, at the same time that the LA is resolved
587-
t.add_local_activity_result_marker(1, "1", "whatever".into());
588-
t.add_workflow_execution_completed();
589576

590577
let barr = Arc::new(Barrier::new(2));
591578
let barr_c = barr.clone();
@@ -612,9 +599,6 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
612599
ResponseType::UntilResolved(
613600
async move {
614601
barr_c.wait().await;
615-
// This sleep is the only not-incredibly-invasive way to ensure the LA
616-
// resolves & updates machines before we process this task
617-
tokio::time::sleep(Duration::from_secs(1)).await;
618602
}
619603
.boxed(),
620604
2,
@@ -662,42 +646,26 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
662646
))
663647
.await
664648
.unwrap();
649+
665650
let task = core.poll_workflow_activation().await.unwrap();
666651
assert_matches!(
667652
task.jobs.as_slice(),
668-
&[
669-
WorkflowActivationJob {
670-
variant: Some(workflow_activation_job::Variant::FireTimer(_)),
671-
},
672-
WorkflowActivationJob {
673-
variant: Some(workflow_activation_job::Variant::QueryWorkflow(_)),
674-
}
675-
]
653+
&[WorkflowActivationJob {
654+
variant: Some(workflow_activation_job::Variant::FireTimer(_)),
655+
},]
676656
);
677-
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
657+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
678658
task.run_id,
679-
vec![
680-
schedule_local_activity_cmd(
681-
1,
682-
"act-id",
683-
ActivityCancellationType::TryCancel,
684-
Duration::from_secs(60),
685-
),
686-
QueryResult {
687-
query_id: "q1".to_string(),
688-
variant: Some(
689-
QuerySuccess {
690-
response: Some("whatev".into()),
691-
}
692-
.into(),
693-
),
694-
}
695-
.into(),
696-
],
659+
schedule_local_activity_cmd(
660+
1,
661+
"act-id",
662+
ActivityCancellationType::TryCancel,
663+
Duration::from_secs(60),
664+
),
697665
))
698666
.await
699667
.unwrap();
700-
barr.wait().await;
668+
701669
let task = core.poll_workflow_activation().await.unwrap();
702670
// The next task needs to be resolve, since the LA is completed immediately
703671
assert_matches!(
@@ -708,21 +676,30 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
708676
);
709677
// Complete workflow
710678
core.complete_execution(&task.run_id).await;
679+
680+
// Now we will get the query
681+
let task = core.poll_workflow_activation().await.unwrap();
682+
assert_matches!(
683+
task.jobs.as_slice(),
684+
&[WorkflowActivationJob {
685+
variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)),
686+
}]
687+
if q.query_id == "q1"
688+
);
689+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
690+
task.run_id,
691+
query_ok("q1", "whatev"),
692+
))
693+
.await
694+
.unwrap();
695+
barr.wait().await;
696+
711697
if impossible_query_in_task {
712698
// finish last query
713699
let task = core.poll_workflow_activation().await.unwrap();
714-
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
700+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
715701
task.run_id,
716-
vec![QueryResult {
717-
query_id: LEGACY_QUERY_ID.to_string(),
718-
variant: Some(
719-
QuerySuccess {
720-
response: Some("whatev".into()),
721-
}
722-
.into(),
723-
),
724-
}
725-
.into()],
702+
query_ok(LEGACY_QUERY_ID, "whatev"),
726703
))
727704
.await
728705
.unwrap();
@@ -738,8 +715,8 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
738715
.unwrap();
739716
};
740717

741-
tokio::join!(wf_fut, act_fut);
742-
core.shutdown().await;
718+
join!(wf_fut, act_fut);
719+
core.drain_pollers_and_shutdown().await;
743720
}
744721

745722
#[tokio::test]
@@ -1215,3 +1192,106 @@ async fn local_activities_can_be_delivered_during_shutdown() {
12151192
assert_matches!(wf_r.unwrap_err(), PollWfError::ShutDown);
12161193
assert_matches!(act_r.unwrap_err(), PollActivityError::ShutDown);
12171194
}
1195+
1196+
#[tokio::test]
1197+
async fn queries_can_be_received_while_heartbeating() {
1198+
let wfid = "fake_wf_id";
1199+
let mut t = TestHistoryBuilder::default();
1200+
t.add_wfe_started_with_wft_timeout(Duration::from_millis(200));
1201+
t.add_full_wf_task();
1202+
t.add_full_wf_task();
1203+
t.add_full_wf_task();
1204+
1205+
let tasks = [
1206+
hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)),
1207+
{
1208+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(2));
1209+
pr.queries = HashMap::new();
1210+
pr.queries.insert(
1211+
"q1".to_string(),
1212+
WorkflowQuery {
1213+
query_type: "query-type".to_string(),
1214+
query_args: Some(b"hi".into()),
1215+
header: None,
1216+
},
1217+
);
1218+
pr
1219+
},
1220+
{
1221+
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(3));
1222+
pr.query = Some(WorkflowQuery {
1223+
query_type: "query-type".to_string(),
1224+
query_args: Some(b"hi".into()),
1225+
header: None,
1226+
});
1227+
pr
1228+
},
1229+
];
1230+
let mut mock = mock_workflow_client();
1231+
mock.expect_respond_legacy_query()
1232+
.times(1)
1233+
.returning(move |_, _| Ok(Default::default()));
1234+
let mut mock = single_hist_mock_sg(wfid, t, tasks, mock, true);
1235+
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
1236+
let core = mock_worker(mock);
1237+
1238+
let task = core.poll_workflow_activation().await.unwrap();
1239+
assert_matches!(
1240+
task.jobs.as_slice(),
1241+
&[WorkflowActivationJob {
1242+
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
1243+
},]
1244+
);
1245+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
1246+
task.run_id,
1247+
schedule_local_activity_cmd(
1248+
1,
1249+
"act-id",
1250+
ActivityCancellationType::TryCancel,
1251+
Duration::from_secs(60),
1252+
),
1253+
))
1254+
.await
1255+
.unwrap();
1256+
1257+
let task = core.poll_workflow_activation().await.unwrap();
1258+
assert_matches!(
1259+
task.jobs.as_slice(),
1260+
&[WorkflowActivationJob {
1261+
variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)),
1262+
}]
1263+
if q.query_id == "q1"
1264+
);
1265+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
1266+
task.run_id,
1267+
query_ok("q1", "whatev"),
1268+
))
1269+
.await
1270+
.unwrap();
1271+
1272+
let task = core.poll_workflow_activation().await.unwrap();
1273+
assert_matches!(
1274+
task.jobs.as_slice(),
1275+
&[WorkflowActivationJob {
1276+
variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)),
1277+
}]
1278+
if q.query_id == LEGACY_QUERY_ID
1279+
);
1280+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
1281+
task.run_id,
1282+
query_ok(LEGACY_QUERY_ID, "whatev"),
1283+
))
1284+
.await
1285+
.unwrap();
1286+
1287+
// Handle the activity so we can shut down cleanly
1288+
let act_task = core.poll_activity_task().await.unwrap();
1289+
core.complete_activity_task(ActivityTaskCompletion {
1290+
task_token: act_task.task_token,
1291+
result: Some(ActivityExecutionResult::ok(vec![1].into())),
1292+
})
1293+
.await
1294+
.unwrap();
1295+
1296+
core.drain_pollers_and_shutdown().await;
1297+
}

0 commit comments

Comments
 (0)