Skip to content

Commit

Permalink
simplify rendering of task json. move common state json into base class
Browse files Browse the repository at this point in the history
  • Loading branch information
shivlaks committed May 16, 2020
1 parent 193b048 commit e87f1bf
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 70 deletions.
12 changes: 5 additions & 7 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for invoking a Lambda function with LambdaInvoke
Expand Down Expand Up @@ -90,18 +90,16 @@ export class LambdaInvoke extends sfn.TaskStateBase {
* Provides the service integration task configuration
*/
protected renderTask(): any {
const taskStateConfig: TaskStateConfig = {
resourceArn: getResourceArn('lambda', 'invoke', this.integrationPattern),
parameters: {
return {
Resource: integrationResourceArn('lambda', 'invoke', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
FunctionName: this.props.lambdaFunction.functionArn,
Payload: this.props.payload ? this.props.payload.value : sfn.TaskInput.fromDataAt('$').value,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
Qualifier: this.props.qualifier,
},
}),
};

return taskStateJson(taskStateConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,8 @@
import {
FieldUtils,
IntegrationPattern,
renderJsonPath,
TaskStateBaseProps,
} from '@aws-cdk/aws-stepfunctions';
import { Aws } from '@aws-cdk/core';

/**
* Represents a service integration call to Step Functions
*/
export interface TaskStateConfig extends TaskStateBaseProps {
/**
* The ARN of resource that represents the work to be executed
*/
readonly resourceArn: string;

/**
* Parameters pass a collection of key-value pairs, either static values or
* JSON path expressions that select from the input.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-parameters
*
* @default - No parameters
*/
readonly parameters?: { [name: string]: any };
}

/**
* Generates the State JSON to define a task state
*/
export function taskStateJson(config: TaskStateConfig): any {
return {
Type: 'Task',
Comment: config.comment,
Resource: config.resourceArn,
Parameters: config.parameters && FieldUtils.renderObject(config.parameters),
TimeoutSeconds: config.timeout?.toSeconds(),
HeartbeatSeconds: config.heartbeat?.toSeconds(),
InputPath: renderJsonPath(config.inputPath),
OutputPath: renderJsonPath(config.outputPath),
ResultPath: renderJsonPath(config.resultPath),
};
}

/**
* Verifies that a validation pattern is supported for a service integration
*
Expand All @@ -60,15 +20,16 @@ export function validatePatternSupported(integrationPattern: IntegrationPattern,
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
*/
const resourceArnSuffix = new Map<IntegrationPattern, string>();
resourceArnSuffix.set(IntegrationPattern.REQUEST_RESPONSE, '');
resourceArnSuffix.set(IntegrationPattern.RUN_JOB, '.sync');
resourceArnSuffix.set(IntegrationPattern.WAIT_FOR_TASK_TOKEN, '.waitForTaskToken');
const resourceArnSuffix: Record<IntegrationPattern, string> = {
[IntegrationPattern.REQUEST_RESPONSE]: '',
[IntegrationPattern.RUN_JOB]: '.sync',
[IntegrationPattern.WAIT_FOR_TASK_TOKEN]: '.waitForTaskToken',
};

export function getResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string {
export function integrationResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string {
if (!service || !api) {
throw new Error("Both 'service' and 'api' must be provided to build the resource ARN.");
}
return `arn:${Aws.PARTITION}:states:::${service}:${api}` +
(integrationPattern ? resourceArnSuffix.get(integrationPattern) : '');
(integrationPattern ? resourceArnSuffix[integrationPattern] : '');
}
12 changes: 5 additions & 7 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam';
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for PublishTask
Expand Down Expand Up @@ -87,16 +87,14 @@ export class SnsPublish extends sfn.TaskStateBase {
}

protected renderTask(): any {
const taskConfig: TaskStateConfig = {
resourceArn: getResourceArn('sns', 'publish', this.integrationPattern),
parameters: {
return {
Resource: integrationResourceArn('sns', 'publish', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
TopicArn: this.props.topic.topicArn,
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined,
Subject: this.props.subject,
},
}),
};

return taskStateJson(taskConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam';
import * as sqs from '@aws-cdk/aws-sqs';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for SendMessageTask
Expand Down Expand Up @@ -92,17 +92,15 @@ export class SqsSendMessage extends sfn.TaskStateBase {
}

protected renderTask(): any {
const taskConfig: TaskStateConfig = {
resourceArn: getResourceArn('sqs', 'sendMessage', this.integrationPattern),
parameters: {
return {
Resource: integrationResourceArn('sqs', 'sendMessage', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueueUrl: this.props.queue.queueUrl,
MessageBody: this.props.messageBody.value,
DelaySeconds: this.props.delay && this.props.delay.toSeconds(),
MessageDeduplicationId: this.props.messageDeduplicationId,
MessageGroupId: this.props.messageGroupId,
},
}),
};

return taskStateJson(taskConfig);
}
}
24 changes: 21 additions & 3 deletions packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as iam from '@aws-cdk/aws-iam';
import { Chain } from '../chain';
import { StateGraph } from '../state-graph';
import { CatchProps, IChainable, INextable, RetryProps } from '../types';
import { State } from './state';
import { renderJsonPath, State } from './state';

import * as cdk from '@aws-cdk/core';

Expand Down Expand Up @@ -89,12 +89,17 @@ export abstract class TaskStateBase extends State implements INextable {

public readonly endStates: INextable[];

protected abstract readonly taskMetrics: TaskMetricsConfig | undefined;
protected abstract readonly taskPolicies: iam.PolicyStatement[] | undefined;
protected abstract readonly taskMetrics?: TaskMetricsConfig;
protected abstract readonly taskPolicies?: iam.PolicyStatement[];

private readonly timeout?: cdk.Duration;
private readonly heartbeat?: cdk.Duration;

constructor(scope: cdk.Construct, id: string, props: TaskStateBaseProps) {
super(scope, id, props);
this.endStates = [this];
this.timeout = props.timeout;
this.heartbeat = props.heartbeat;
}

/**
Expand Down Expand Up @@ -134,6 +139,7 @@ export abstract class TaskStateBase extends State implements INextable {
return {
...this.renderNextEnd(),
...this.renderRetryCatch(),
...this.renderTaskBase(),
...this.renderTask(),
};
}
Expand Down Expand Up @@ -249,6 +255,18 @@ export abstract class TaskStateBase extends State implements INextable {
}
return this.metric(prefix + suffix, props);
}

private renderTaskBase() {
return {
Type: 'Task',
Comment: this.comment,
TimeoutSeconds: this.timeout?.toSeconds(),
HeartbeatSeconds: this.heartbeat?.toSeconds(),
InputPath: renderJsonPath(this.inputPath),
OutputPath: renderJsonPath(this.outputPath),
ResultPath: renderJsonPath(this.resultPath),
};
}
}

/**
Expand Down

0 comments on commit e87f1bf

Please sign in to comment.