diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts index d9b623dd2b6b1..00e869ea3d271 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/lambda/invoke.ts @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam'; import * as lambda from '@aws-cdk/aws-lambda'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as cdk from '@aws-cdk/core'; -import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils'; +import { integrationResourceArn, validatePatternSupported } from '../private/task-utils'; /** * Properties for invoking a Lambda function with LambdaInvoke @@ -90,18 +90,16 @@ export class LambdaInvoke extends sfn.TaskStateBase { * Provides the service integration task configuration */ protected renderTask(): any { - const taskStateConfig: TaskStateConfig = { - resourceArn: getResourceArn('lambda', 'invoke', this.integrationPattern), - parameters: { + return { + Resource: integrationResourceArn('lambda', 'invoke', this.integrationPattern), + Parameters: sfn.FieldUtils.renderObject({ FunctionName: this.props.lambdaFunction.functionArn, Payload: this.props.payload ? this.props.payload.value : sfn.TaskInput.fromDataAt('$').value, InvocationType: this.props.invocationType, ClientContext: this.props.clientContext, Qualifier: this.props.qualifier, - }, + }), }; - - return taskStateJson(taskStateConfig); } } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/private/task-utils.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/private/task-utils.ts index ccaf3983e04cd..a612833075eaf 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/private/task-utils.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/private/task-utils.ts @@ -1,48 +1,8 @@ import { - FieldUtils, IntegrationPattern, - renderJsonPath, - TaskStateBaseProps, } from '@aws-cdk/aws-stepfunctions'; import { Aws } from '@aws-cdk/core'; -/** - * Represents a service integration call to Step Functions - */ -export interface TaskStateConfig extends TaskStateBaseProps { - /** - * The ARN of resource that represents the work to be executed - */ - readonly resourceArn: string; - - /** - * Parameters pass a collection of key-value pairs, either static values or - * JSON path expressions that select from the input. - * - * @see https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-parameters - * - * @default - No parameters - */ - readonly parameters?: { [name: string]: any }; -} - -/** - * Generates the State JSON to define a task state - */ -export function taskStateJson(config: TaskStateConfig): any { - return { - Type: 'Task', - Comment: config.comment, - Resource: config.resourceArn, - Parameters: config.parameters && FieldUtils.renderObject(config.parameters), - TimeoutSeconds: config.timeout?.toSeconds(), - HeartbeatSeconds: config.heartbeat?.toSeconds(), - InputPath: renderJsonPath(config.inputPath), - OutputPath: renderJsonPath(config.outputPath), - ResultPath: renderJsonPath(config.resultPath), - }; -} - /** * Verifies that a validation pattern is supported for a service integration * @@ -60,15 +20,16 @@ export function validatePatternSupported(integrationPattern: IntegrationPattern, * * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html */ -const resourceArnSuffix = new Map(); -resourceArnSuffix.set(IntegrationPattern.REQUEST_RESPONSE, ''); -resourceArnSuffix.set(IntegrationPattern.RUN_JOB, '.sync'); -resourceArnSuffix.set(IntegrationPattern.WAIT_FOR_TASK_TOKEN, '.waitForTaskToken'); +const resourceArnSuffix: Record = { + [IntegrationPattern.REQUEST_RESPONSE]: '', + [IntegrationPattern.RUN_JOB]: '.sync', + [IntegrationPattern.WAIT_FOR_TASK_TOKEN]: '.waitForTaskToken', +}; -export function getResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string { +export function integrationResourceArn(service: string, api: string, integrationPattern: IntegrationPattern): string { if (!service || !api) { throw new Error("Both 'service' and 'api' must be provided to build the resource ARN."); } return `arn:${Aws.PARTITION}:states:::${service}:${api}` + - (integrationPattern ? resourceArnSuffix.get(integrationPattern) : ''); + (integrationPattern ? resourceArnSuffix[integrationPattern] : ''); } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts index 44d35a16e4c73..67d48ce48b58f 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam'; import * as sns from '@aws-cdk/aws-sns'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as cdk from '@aws-cdk/core'; -import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils'; +import { integrationResourceArn, validatePatternSupported } from '../private/task-utils'; /** * Properties for PublishTask @@ -87,16 +87,14 @@ export class SnsPublish extends sfn.TaskStateBase { } protected renderTask(): any { - const taskConfig: TaskStateConfig = { - resourceArn: getResourceArn('sns', 'publish', this.integrationPattern), - parameters: { + return { + Resource: integrationResourceArn('sns', 'publish', this.integrationPattern), + Parameters: sfn.FieldUtils.renderObject({ TopicArn: this.props.topic.topicArn, Message: this.props.message.value, MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined, Subject: this.props.subject, - }, + }), }; - - return taskStateJson(taskConfig); } } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sqs/send-message.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sqs/send-message.ts index 4cde5029efa92..bb2a49e6a600e 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sqs/send-message.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sqs/send-message.ts @@ -2,7 +2,7 @@ import * as iam from '@aws-cdk/aws-iam'; import * as sqs from '@aws-cdk/aws-sqs'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as cdk from '@aws-cdk/core'; -import { getResourceArn, TaskStateConfig, taskStateJson, validatePatternSupported } from '../private/task-utils'; +import { integrationResourceArn, validatePatternSupported } from '../private/task-utils'; /** * Properties for SendMessageTask @@ -92,17 +92,15 @@ export class SqsSendMessage extends sfn.TaskStateBase { } protected renderTask(): any { - const taskConfig: TaskStateConfig = { - resourceArn: getResourceArn('sqs', 'sendMessage', this.integrationPattern), - parameters: { + return { + Resource: integrationResourceArn('sqs', 'sendMessage', this.integrationPattern), + Parameters: sfn.FieldUtils.renderObject({ QueueUrl: this.props.queue.queueUrl, MessageBody: this.props.messageBody.value, DelaySeconds: this.props.delay && this.props.delay.toSeconds(), MessageDeduplicationId: this.props.messageDeduplicationId, MessageGroupId: this.props.messageGroupId, - }, + }), }; - - return taskStateJson(taskConfig); } } diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts b/packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts index 9cbed133cacaa..c5ccedf5e2fd8 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/states/task-base.ts @@ -3,7 +3,7 @@ import * as iam from '@aws-cdk/aws-iam'; import { Chain } from '../chain'; import { StateGraph } from '../state-graph'; import { CatchProps, IChainable, INextable, RetryProps } from '../types'; -import { State } from './state'; +import { renderJsonPath, State } from './state'; import * as cdk from '@aws-cdk/core'; @@ -89,12 +89,17 @@ export abstract class TaskStateBase extends State implements INextable { public readonly endStates: INextable[]; - protected abstract readonly taskMetrics: TaskMetricsConfig | undefined; - protected abstract readonly taskPolicies: iam.PolicyStatement[] | undefined; + protected abstract readonly taskMetrics?: TaskMetricsConfig; + protected abstract readonly taskPolicies?: iam.PolicyStatement[]; + + private readonly timeout?: cdk.Duration; + private readonly heartbeat?: cdk.Duration; constructor(scope: cdk.Construct, id: string, props: TaskStateBaseProps) { super(scope, id, props); this.endStates = [this]; + this.timeout = props.timeout; + this.heartbeat = props.heartbeat; } /** @@ -134,6 +139,7 @@ export abstract class TaskStateBase extends State implements INextable { return { ...this.renderNextEnd(), ...this.renderRetryCatch(), + ...this.renderTaskBase(), ...this.renderTask(), }; } @@ -249,6 +255,18 @@ export abstract class TaskStateBase extends State implements INextable { } return this.metric(prefix + suffix, props); } + + private renderTaskBase() { + return { + Type: 'Task', + Comment: this.comment, + TimeoutSeconds: this.timeout?.toSeconds(), + HeartbeatSeconds: this.heartbeat?.toSeconds(), + InputPath: renderJsonPath(this.inputPath), + OutputPath: renderJsonPath(this.outputPath), + ResultPath: renderJsonPath(this.resultPath), + }; + } } /**