Skip to content

Commit

Permalink
feat(stepfunctions): Add support for ResultSelector (#14648)
Browse files Browse the repository at this point in the history
### Description

Adds support for `ResultSelector`. [ResultSelector](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector) was added to ASL in August 2020 and is currently missing coverage in CDK. 

This change exposes a new `resultSelector` field to Task, Map, and Parallel state props. This is a JSON object that functions similarly to `Parameters` but where `$` refers to the state's raw result instead of the state input. This allows you to reshape the result without using extra Pass states.

The implementation mimics what exists for Parameters. I'm not convinced we need extra types here.

#### Example

```ts
new tasks.LambdaInvoke(this, 'Invoke Handler', {
  lambdaFunction: fn,
  resultSelector: {
    lambdaOutput: sfn.JsonPath.stringAt('$.Payload'),
    invokeRequestId: sfn.JsonPath.stringAt('$.SdkResponseMetadata.RequestId'),
    staticValue: 'foo',
  },
})
```

Which produces the following ASL:

```json
{
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke",
  "Parameters": {
    "FunctionName": ${functionName},
    "Payload.$": "$"
  },
  "ResultSelector": {
      "lambdaOutput.$": "$.Payload",
      "invokeRequestId.$": "$.SdkResponseMetadata.RequestId",
      "staticValue": "foo",
  },
  "Next": ${nextState}
}
```

### Testing
* Unit tests for Map, Task, and Parallel states to include `resultSelector`
* Unit test with ResultSelector for `LambdaInvoke` state updated to include `resultSelector`
* Updated LambdaInvoke integ test to use ResultSelector for one of the states. Executed state machine manually through the AWS console to ensure the example actually works too.

Closes #9904

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
wong-a authored May 17, 2021
1 parent 4da78f6 commit 50d486a
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 9 deletions.
38 changes: 32 additions & 6 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ const submitJob = new tasks.LambdaInvoke(this, 'Invoke Handler', {
});
```

### ResultSelector

You can use [`ResultSelector`](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector)
to manipulate the raw result of a Task, Map or Parallel state before it is
passed to [`ResultPath`](###ResultPath). For service integrations, the raw
result contains metadata in addition to the response payload. You can use
ResultSelector to construct a JSON payload that becomes the effective result
using static values or references to the raw result or context object.

The following example extracts the output payload of a Lambda function Task and combines
it with some static values and the state name from the context object.

```ts
new tasks.LambdaInvoke(this, 'Invoke Handler', {
lambdaFunction: fn,
resultSelector: {
lambdaOutput: sfn.JsonPath.stringAt('$.Payload'),
invokeRequestId: sfn.JsonPath.stringAt('$.SdkResponseMetadata.RequestId'),
staticValue: {
foo: 'bar',
},
stateName: sfn.JsonPath.stringAt('$$.State.Name'),
},
})
```

### ResultPath

The output of a state can be a copy of its input, the result it produces (for
Expand Down Expand Up @@ -226,8 +252,8 @@ of the Node.js family are supported.

Step Functions supports [API Gateway](https://docs.aws.amazon.com/step-functions/latest/dg/connect-api-gateway.html) through the service integration pattern.

HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.
HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.
HTTP APIs are designed for low-latency, cost-effective integrations with AWS services, including AWS Lambda, and HTTP endpoints.
HTTP APIs support OIDC and OAuth 2.0 authorization, and come with built-in support for CORS and automatic deployments.
Previous-generation REST APIs currently offer more features. More details can be found [here](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-vs-rest.html).

### Call REST API Endpoint
Expand Down Expand Up @@ -507,8 +533,8 @@ isolation by design. Learn more about [Fargate](https://aws.amazon.com/fargate/)

The Fargate launch type allows you to run your containerized applications without the need
to provision and manage the backend infrastructure. Just register your task definition and
Fargate launches the container for you. The latest ACTIVE revision of the passed
task definition is used for running the task. Learn more about
Fargate launches the container for you. The latest ACTIVE revision of the passed
task definition is used for running the task. Learn more about
[Fargate Versioning](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_DescribeTaskDefinition.html)

The following example runs a job from a task definition on Fargate
Expand Down Expand Up @@ -718,7 +744,7 @@ You can call the [`StartJobRun`](https://docs.aws.amazon.com/glue/latest/dg/aws-
new tasks.GlueStartJobRun(this, 'Task', {
glueJobName: 'my-glue-job',
arguments: sfn.TaskInput.fromObject({
key: 'value',
key: 'value',
}),
timeout: cdk.Duration.minutes(30),
notifyDelayAfter: cdk.Duration.minutes(5),
Expand Down Expand Up @@ -1020,7 +1046,7 @@ a specific task in a state machine.

When Step Functions reaches an activity task state, the workflow waits for an
activity worker to poll for a task. An activity worker polls Step Functions by
using GetActivityTask, and sending the ARN for the related activity.
using GetActivityTask, and sending the ARN for the related activity.

After the activity worker completes its work, it can provide a report of its
success or failure by using `SendTaskSuccess` or `SendTaskFailure`. These two
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"Arn"
]
},
"\",\"Payload.$\":\"$\"}},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
"\",\"Payload.$\":\"$\"}},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2}],\"Type\":\"Task\",\"ResultSelector\":{\"status.$\":\"$.Payload.status\"},\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ const checkJobStateLambda = new Function(stack, 'checkJobStateLambda', {

const checkJobState = new LambdaInvoke(stack, 'Check the job state', {
lambdaFunction: checkJobStateLambda,
outputPath: '$.Payload',
resultSelector: {
status: sfn.JsonPath.stringAt('$.Payload.status'),
},
});

const isComplete = new sfn.Choice(stack, 'Job Complete?');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,58 @@ describe('LambdaInvoke', () => {
}));
});

test('resultSelector', () => {
// WHEN
const task = new LambdaInvoke(stack, 'Task', {
lambdaFunction,
resultSelector: {
Result: sfn.JsonPath.stringAt('$.output.Payload'),
},
});

// THEN
expect(stack.resolve(task.toStateJson())).toEqual(expect.objectContaining({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::lambda:invoke',
],
],
},
End: true,
Parameters: {
FunctionName: {
'Fn::GetAtt': [
'Fn9270CBC0',
'Arn',
],
},
'Payload.$': '$',
},
ResultSelector: {
'Result.$': '$.output.Payload',
},
Retry: [
{
ErrorEquals: [
'Lambda.ServiceException',
'Lambda.AWSLambdaException',
'Lambda.SdkClientException',
],
IntervalSeconds: 2,
MaxAttempts: 6,
BackoffRate: 2,
},
],
}));
});

test('invoke Lambda function and wait for task token', () => {
// GIVEN
const task = new LambdaInvoke(stack, 'Task', {
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ export interface MapProps {
*/
readonly parameters?: { [key: string]: any };

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* MaxConcurrency
*
Expand Down Expand Up @@ -158,6 +172,7 @@ export class Map extends State implements INextable {
...this.renderNextEnd(),
...this.renderInputOutput(),
...this.renderParameters(),
...this.renderResultSelector(),
...this.renderRetryCatch(),
...this.renderIterator(),
...this.renderItemsPath(),
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ export interface ParallelProps {
* @default $
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };
}

/**
Expand Down Expand Up @@ -117,6 +131,7 @@ export class Parallel extends State implements INextable {
...this.renderInputOutput(),
...this.renderRetryCatch(),
...this.renderBranches(),
...this.renderResultSelector(),
};
}

Expand Down
27 changes: 26 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions/lib/states/state.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IConstruct, Construct, Node } from 'constructs';
import { Condition } from '../condition';
import { JsonPath } from '../fields';
import { FieldUtils, JsonPath } from '../fields';
import { StateGraph } from '../state-graph';
import { CatchProps, Errors, IChainable, INextable, RetryProps } from '../types';

Expand Down Expand Up @@ -58,6 +58,20 @@ export interface StateProps {
* @default $
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };
}

/**
Expand Down Expand Up @@ -149,6 +163,7 @@ export abstract class State extends CoreConstruct implements IChainable {
protected readonly parameters?: object;
protected readonly outputPath?: string;
protected readonly resultPath?: string;
protected readonly resultSelector?: object;
protected readonly branches: StateGraph[] = [];
protected iteration?: StateGraph;
protected defaultChoice?: State;
Expand Down Expand Up @@ -187,6 +202,7 @@ export abstract class State extends CoreConstruct implements IChainable {
this.parameters = props.parameters;
this.outputPath = props.outputPath;
this.resultPath = props.resultPath;
this.resultSelector = props.resultSelector;
}

public get id() {
Expand Down Expand Up @@ -398,6 +414,15 @@ export abstract class State extends CoreConstruct implements IChainable {
};
}

/**
* Render ResultSelector in ASL JSON format
*/
protected renderResultSelector(): any {
return FieldUtils.renderObject({
ResultSelector: this.resultSelector,
});
}

/**
* Called whenever this state is bound to a graph
*
Expand Down
15 changes: 15 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ export interface TaskStateBaseProps {
*/
readonly resultPath?: string;

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* Timeout for the state machine
*
Expand Down Expand Up @@ -269,6 +283,7 @@ export abstract class TaskStateBase extends State implements INextable {
InputPath: renderJsonPath(this.inputPath),
OutputPath: renderJsonPath(this.outputPath),
ResultPath: renderJsonPath(this.resultPath),
...this.renderResultSelector(),
};
}
}
Expand Down
41 changes: 41 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/test/map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,47 @@ describe('Map State', () => {
},
});
}),
test('State Machine With Map State and ResultSelector', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const map = new stepfunctions.Map(stack, 'Map State', {
maxConcurrency: 1,
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
resultSelector: {
buz: 'buz',
baz: stepfunctions.JsonPath.stringAt('$.baz'),
},
});
map.iterator(new stepfunctions.Pass(stack, 'Pass State'));

// THEN
expect(render(map)).toStrictEqual({
StartAt: 'Map State',
States: {
'Map State': {
Type: 'Map',
End: true,
Iterator: {
StartAt: 'Pass State',
States: {
'Pass State': {
Type: 'Pass',
End: true,
},
},
},
ItemsPath: '$.inputForMap',
MaxConcurrency: 1,
ResultSelector: {
'buz': 'buz',
'baz.$': '$.baz',
},
},
},
});
}),
test('synth is successful', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
Expand Down
32 changes: 32 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/test/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,38 @@ describe('Parallel State', () => {
},
});
});

test('State Machine With Parallel State and ResultSelector', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const parallel = new stepfunctions.Parallel(stack, 'Parallel State', {
resultSelector: {
buz: 'buz',
baz: stepfunctions.JsonPath.stringAt('$.baz'),
},
});
parallel.branch(new stepfunctions.Pass(stack, 'Branch 1'));

// THEN
expect(render(parallel)).toStrictEqual({
StartAt: 'Parallel State',
States: {
'Parallel State': {
Type: 'Parallel',
End: true,
Branches: [
{ StartAt: 'Branch 1', States: { 'Branch 1': { Type: 'Pass', End: true } } },
],
ResultSelector: {
'buz': 'buz',
'baz.$': '$.baz',
},
},
},
});
});
});

function render(sm: stepfunctions.IChainable) {
Expand Down
Loading

0 comments on commit 50d486a

Please sign in to comment.