diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/README.md b/packages/@aws-cdk/aws-stepfunctions-tasks/README.md index 0ee1cbaba53f9..5bb2d7a9c4fbb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/README.md +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/README.md @@ -1050,7 +1050,23 @@ const topic = new sns.Topic(this, 'Topic'); const task1 = new tasks.SnsPublish(this, 'Publish1', { topic, integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, - message: sfn.TaskInput.fromJsonPathAt('$.state.message'), + message: sfn.TaskInput.fromDataAt('$.state.message'), + messageAttributes: { + place: { + value: sfn.JsonPath.stringAt('$.place'), + }, + pic: { + // BINARY must be explicitly set + type: MessageAttributeDataType.BINARY, + value: sfn.JsonPath.stringAt('$.pic'), + }, + people: { + value: 4, + }, + handles: { + value: ['@kslater', '@jjf', null, '@mfanning'], + }, + }); // Combine a field from the execution data with diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts index 16f10051f0531..8c37c35c9b9f9 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts @@ -1,9 +1,63 @@ import * as iam from '@aws-cdk/aws-iam'; import * as sns from '@aws-cdk/aws-sns'; import * as sfn from '@aws-cdk/aws-stepfunctions'; +import { Token } from '@aws-cdk/core'; import { Construct } from 'constructs'; import { integrationResourceArn, validatePatternSupported } from '../private/task-utils'; +/** + * The data type set for the SNS message attributes + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes + */ +export enum MessageAttributeDataType { + /** + * Strings are Unicode with UTF-8 binary encoding + */ + STRING = 'String', + + /** + * An array, formatted as a string + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes + */ + STRING_ARRAY = 'String.Array', + + /** + * Numbers are positive or negative integers or floating-point numbers + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes + */ + NUMBER = 'Number', + + /** + * Binary type attributes can store any binary data + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes + */ + BINARY = 'Binary' +} + +/** + * A message attribute to add to the SNS message + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html + */ +export interface MessageAttribute { + /** + * The value of the attribute + */ + readonly value: any; + + /** + * The data type for the attribute + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes + * @default determined by type inspection if possible, fallback is String + */ + readonly dataType?: MessageAttributeDataType +} + /** * Properties for publishing a message to an SNS topic */ @@ -23,6 +77,17 @@ export interface SnsPublishProps extends sfn.TaskStateBaseProps { */ readonly message: sfn.TaskInput; + /** + * Add message attributes when publishing. + * + * These attributes carry additional metadata about the message and may be used + * for subscription filters. + * + * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html + * @default {} + */ + readonly messageAttributes?: { [key: string]: MessageAttribute }; + /** * Send different messages for each transport protocol. * @@ -98,8 +163,95 @@ export class SnsPublish extends sfn.TaskStateBase { TopicArn: this.props.topic.topicArn, Message: this.props.message.value, MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined, + MessageAttributes: renderMessageAttributes(this.props.messageAttributes), Subject: this.props.subject, }), }; } } + +interface MessageAttributeValue { + DataType: string; + StringValue?: string; + BinaryValue?: string; +} + +function renderMessageAttributes(attributes?: { [key: string]: MessageAttribute }): any { + if (attributes === undefined) { return undefined; } + const renderedAttributes: { [key: string]: MessageAttributeValue } = {}; + Object.entries(attributes).map(([key, val]) => { + renderedAttributes[key] = renderMessageAttributeValue(val); + }); + return sfn.TaskInput.fromObject(renderedAttributes).value; +} + +function renderMessageAttributeValue(attribute: MessageAttribute): MessageAttributeValue { + const dataType = attribute.dataType; + if (attribute.value instanceof sfn.TaskInput) { + return { + DataType: dataType ?? MessageAttributeDataType.STRING, + StringValue: dataType !== MessageAttributeDataType.BINARY ? attribute.value.value : undefined, + BinaryValue: dataType === MessageAttributeDataType.BINARY ? attribute.value.value : undefined, + }; + } + + if (dataType === MessageAttributeDataType.BINARY) { + return { DataType: dataType, BinaryValue: `${attribute.value}` }; + } + + if (Token.isUnresolved(attribute.value)) { + return { DataType: dataType ?? MessageAttributeDataType.STRING, StringValue: attribute.value }; + } + + validateMessageAttribute(attribute); + if (Array.isArray(attribute.value)) { + return { DataType: MessageAttributeDataType.STRING_ARRAY, StringValue: JSON.stringify(attribute.value) }; + } + const value = attribute.value; + if (typeof value === 'number') { + return { DataType: MessageAttributeDataType.NUMBER, StringValue: `${value}` }; + } else { + return { DataType: MessageAttributeDataType.STRING, StringValue: `${value}` }; + } +} + +function validateMessageAttribute(attribute: MessageAttribute): void { + const dataType = attribute.dataType; + const value = attribute.value; + if (dataType === undefined) { + return; + } + if (Array.isArray(value)) { + if (dataType !== MessageAttributeDataType.STRING_ARRAY) { + throw new Error(`Requested SNS message attribute type was ${dataType} but ${value} was of type Array`); + } + const validArrayTypes = ['string', 'boolean', 'number']; + value.forEach((v) => { + if (v !== null || !validArrayTypes.includes(typeof v)) { + throw new Error(`Requested SNS message attribute type was ${typeof value} but Array values must be one of ${validArrayTypes}`); + } + }); + return; + } + const error = new Error(`Requested SNS message attribute type was ${dataType} but ${value} was of type ${typeof value}`); + switch (typeof value) { + case 'string': + // trust the user or will default to string + if (sfn.JsonPath.isEncodedJsonPath(attribute.value)) { + return; + } + if (dataType === MessageAttributeDataType.STRING || + dataType === MessageAttributeDataType.BINARY) { + return; + } + throw error; + case 'number': + if (dataType === MessageAttributeDataType.NUMBER) { return; } + throw error; + case 'boolean': + if (dataType === MessageAttributeDataType.STRING) { return; } + throw error; + default: + throw error; + } +} diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts index e336d4c8d8a5d..5b81034e48cc2 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts @@ -1,7 +1,7 @@ import * as sns from '@aws-cdk/aws-sns'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as cdk from '@aws-cdk/core'; -import { SnsPublish } from '../../lib/sns/publish'; +import { SnsPublish, MessageAttributeDataType, MessageAttribute } from '../../lib/sns/publish'; describe('Publish', () => { @@ -38,6 +38,154 @@ describe('Publish', () => { }, }); }); + test('with message attributes', () => { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + const token = cdk.Token.asString('cakes can be resolved'); + + // WHEN + const task = new SnsPublish(stack, 'Publish', { + topic, + message: sfn.TaskInput.fromText('Publish this message'), + messageAttributes: { + cake: { + value: 'chocolate', + }, + cakeCount: { + value: 2, + }, + resolvable: { + value: token, + }, + binary: { + value: 'a2345', + dataType: MessageAttributeDataType.BINARY, + }, + binaryNumberIsString: { + value: 123456987, + dataType: MessageAttributeDataType.BINARY, + }, + taskInput: { + value: sfn.TaskInput.fromJsonPathAt('$$.StateMachine.Name'), + }, + executionId: { + value: sfn.JsonPath.stringAt('$$.Execution.Id'), + }, + vendors: { + value: ['Great Cakes', true, false, null, 3, 'Local Cakes'], + }, + }, + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toEqual({ + Type: 'Task', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':states:::sns:publish', + ], + ], + }, + End: true, + Parameters: { + TopicArn: { Ref: 'TopicBFC7AF6E' }, + Message: 'Publish this message', + MessageAttributes: { + binary: { + DataType: 'Binary', + BinaryValue: 'a2345', + }, + binaryNumberIsString: { + DataType: 'Binary', + BinaryValue: '123456987', + }, + cake: { + DataType: 'String', + StringValue: 'chocolate', + }, + cakeCount: { + DataType: 'Number', + StringValue: '2', + }, + resolvable: { + DataType: 'String', + StringValue: 'cakes can be resolved', + }, + executionId: { + 'DataType': 'String', + 'StringValue.$': '$$.Execution.Id', + }, + taskInput: { + 'DataType': 'String', + 'StringValue.$': '$$.StateMachine.Name', + }, + vendors: { + DataType: 'String.Array', + StringValue: '["Great Cakes",true,false,null,3,"Local Cakes"]', + }, + }, + }, + }); + }); + describe('invalid message attribute configurations', () => { + // GIVEN + const attributes: MessageAttribute[] = [ + { + value: 2, + dataType: MessageAttributeDataType.STRING, + }, + { + value: 'foo', + dataType: MessageAttributeDataType.NUMBER, + }, + { + value: 'foo', + dataType: MessageAttributeDataType.STRING_ARRAY, + }, + { + value: ['foo', undefined, 2, true], + dataType: MessageAttributeDataType.STRING_ARRAY, + }, + { + value: ['foo', { bar: 2 }, 2, true], + dataType: MessageAttributeDataType.STRING_ARRAY, + }, + { + value: false, + dataType: MessageAttributeDataType.STRING_ARRAY, + }, + { + value: false, + dataType: MessageAttributeDataType.NUMBER, + }, + ]; + attributes.forEach((a) => { + test(`${JSON.stringify(a)} is not valid`, () => { + // WHEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + const task = new SnsPublish(stack, 'Publish', { + topic, + message: sfn.TaskInput.fromText('Publish this message'), + messageAttributes: { + test: a, + }, + }); + + // THEN + expect(() => { + stack.resolve(task.toStateJson()); + }).toThrow(/Requested SNS message attribute type was/); + }); + }); + }); test('publish SNS message and wait for task token', () => { // GIVEN @@ -188,4 +336,4 @@ describe('Publish', () => { }); }).toThrow(/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE,WAIT_FOR_TASK_TOKEN. Received: RUN_JOB/); }); -}); \ No newline at end of file +});