Skip to content

Commit

Permalink
Make startUpdate require waitForStage=ACCEPTED
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Jun 21, 2024
1 parent a1f94e5 commit 84e2c11
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 23 deletions.
43 changes: 30 additions & 13 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,49 @@ export interface WorkflowHandle<T extends Workflow = Workflow> extends BaseWorkf
): Promise<Ret>;

/**
* Start an Update and receive a handle to the Update.
* The Update validator (if present) is run before the handle is returned.
* Start an Update and receive a handle to the Update. The Update validator (if present) is run
* before the handle is returned.
*
* @experimental Update is an experimental feature.
*
* @throws {@link WorkflowUpdateFailedError} if Update validation fails.
*
* @param def an Update definition as returned from {@link defineUpdate}
* @param options Update arguments
* @param options update arguments, and update lifecycle stage to wait for
*
* Currently, startUpdate always waits until a worker is accepting tasks for the workflow and the
* update is accepted or rejected, and the options object must be at least
* ```ts
* {
* waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
* }
* ```
* If the update takes arguments, then the options object must additionally contain an `args`
* property with an array of argument values.
*
* @example
* ```ts
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
* const updateHandle = await handle.startUpdate(incrementAndGetValueUpdate, {
* args: [2],
* waitForStage: UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
* });
* const updateResult = await updateHandle.result();
* ```
*/
startUpdate<Ret, Args extends [any, ...any[]], Name extends string = string>(
def: UpdateDefinition<Ret, Args, Name> | string,
options: WorkflowUpdateOptions & { args: Args }
options: WorkflowUpdateOptions & {
args: Args;
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>>;

startUpdate<Ret, Args extends [], Name extends string = string>(
def: UpdateDefinition<Ret, Args, Name> | string,
options?: WorkflowUpdateOptions & { args?: Args }
options: WorkflowUpdateOptions & {
args?: Args;
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>>;

/**
Expand Down Expand Up @@ -1131,14 +1150,12 @@ export class WorkflowClient extends BaseClient {
},
async startUpdate<Ret, Args extends any[]>(
def: UpdateDefinition<Ret, Args> | string,
options?: WorkflowUpdateOptions & { args?: Args }
options: WorkflowUpdateOptions & {
args?: Args;
waitForStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED;
}
): Promise<WorkflowUpdateHandle<Ret>> {
return await _startUpdate(
def,
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
options
);
return await _startUpdate(def, options.waitForStage, options);
},
async executeUpdate<Ret, Args extends any[]>(
def: UpdateDefinition<Ret, Args> | string,
Expand Down
7 changes: 6 additions & 1 deletion packages/test/src/test-integration-update-interceptors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { WorkflowStartUpdateInput, WorkflowStartUpdateOutput } from '@temporalio/client';
import { temporal } from '@temporalio/proto';
import * as wf from '@temporalio/workflow';
import { Next, UpdateInput, WorkflowInboundCallsInterceptor, WorkflowInterceptors } from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';
Expand Down Expand Up @@ -66,7 +67,11 @@ test('Update client and inbound interceptors work for startUpdate', async (t) =>
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdate);

const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
const updateHandle = await wfHandle.startUpdate(update, {
args: ['1'],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
const updateResult = await updateHandle.result();
t.deepEqual(updateResult, '1-clientIntercepted-inboundIntercepted');
});
Expand Down
51 changes: 43 additions & 8 deletions packages/test/src/test-integration-update.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { status as grpcStatus } from '@grpc/grpc-js';
import { isGrpcServiceError } from '@temporalio/client';
import { temporal } from '@temporalio/proto';
import * as wf from '@temporalio/workflow';
import { helpers, makeTestFunction } from './helpers-integration';

Expand Down Expand Up @@ -65,11 +66,18 @@ test('Update can be executed via startUpdate() and handle.result()', async (t) =
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdates);

const updateHandle = await wfHandle.startUpdate(update, { args: ['1'] });
const updateHandle = await wfHandle.startUpdate(update, {
args: ['1'],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
const updateResult = await updateHandle.result();
t.deepEqual(updateResult, ['1']);

const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate);
const doneUpdateHandle = await wfHandle.startUpdate(doneUpdate, {
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
const doneUpdateResult = await doneUpdateHandle.result();
t.is(doneUpdateResult, undefined);

Expand All @@ -84,7 +92,12 @@ test('Update handle can be created from identifiers and used to obtain result',
await worker.runUntil(async () => {
const updateId = 'my-update-id';
const wfHandle = await startWorkflow(workflowWithUpdates);
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, { args: ['1'], updateId });
const updateHandleFromStartUpdate = await wfHandle.startUpdate(update, {
args: ['1'],
updateId,
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});

// Obtain update handle on workflow handle from start update.
const updateHandle = wfHandle.getUpdateHandle(updateId);
Expand Down Expand Up @@ -176,9 +189,17 @@ test('Update validator can reject when using handle.result() but handle can be o
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdateValidator);
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['arg'] });
let updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
args: ['arg'],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
t.is(await updateHandle.result(), 'update-result');
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, { args: ['bad-arg'] });
updateHandle = await wfHandle.startUpdate(stringToStringUpdate, {
args: ['bad-arg'],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
await assertWorkflowUpdateFailed(updateHandle.result(), wf.ApplicationFailure, 'Validation failed');
});
});
Expand Down Expand Up @@ -247,7 +268,11 @@ test('Update id can be assigned and is present on returned handle', async (t) =>
const worker = await createWorker();
await worker.runUntil(async () => {
const wfHandle = await startWorkflow(workflowWithUpdates);
const updateHandle = await wfHandle.startUpdate(doneUpdate, { updateId: 'my-update-id' });
const updateHandle = await wfHandle.startUpdate(doneUpdate, {
updateId: 'my-update-id',
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
t.is(updateHandle.updateId, 'my-update-id');
});
});
Expand Down Expand Up @@ -426,7 +451,11 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
t.is(queryResult, 4);
const updateResult = await wfHandle.executeUpdate(incrementAndGetValueUpdate, { args: [2] });
t.is(updateResult, 6);
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, { args: [2] });
const secondUpdateHandle = await wfHandle.startUpdate(incrementAndGetValueUpdate, {
args: [2],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
});
const secondUpdateResult = await secondUpdateHandle.result();
t.is(secondUpdateResult, 8);
await wfHandle.cancel();
Expand All @@ -437,7 +466,13 @@ test('Update/Signal/Query example in WorkflowHandle docstrings works', async (t)
test('startUpdate does not return handle before update has reached requested stage', async (t) => {
const { startWorkflow } = helpers(t);
const wfHandle = await startWorkflow(workflowWithUpdates);
const updatePromise = wfHandle.startUpdate(update, { args: ['1'] }).then(() => 'update');
const updatePromise = wfHandle
.startUpdate(update, {
args: ['1'],
waitForStage:
temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
})
.then(() => 'update');
const timeoutPromise = new Promise<string>((f) =>
setTimeout(() => f('timeout'), 500 + LONG_POLL_EXPIRATION_INTERVAL_SECONDS * 1000)
);
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle';
registerSleepImplementation(sleep);

/**
* Adds default values to `workflowId` and `workflowIdReusePolicy` to given workflow options.
* Adds default values of `workflowId` and `cancellationType` to given workflow options.
*/
export function addDefaultWorkflowOptions<T extends Workflow>(
opts: WithWorkflowArgs<T, ChildWorkflowOptions>
Expand Down

0 comments on commit 84e2c11

Please sign in to comment.