Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): add sns publish with message attributes #14817

Merged
merged 39 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
60e2086
feat(aws-stepfunctions-task): add sns publish with message attributes
moofish32 May 21, 2021
531458c
chore: line cleanup
moofish32 May 21, 2021
81a87db
chore: update readme
moofish32 May 21, 2021
76005c8
chore: propose using sfn.TaskInput.fromObject(...)
moofish32 May 28, 2021
d400fa9
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jun 9, 2021
fdff2ce
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 8, 2021
dcb4b2e
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 8, 2021
728a060
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 8, 2021
e4c3c5b
feat: support data types
moofish32 Jul 8, 2021
4b9a1a3
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
14d8579
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
e962bc9
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
3747cb7
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
62902c2
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
3dce02a
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
fda5232
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
1ba916b
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
0df8714
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 17, 2021
037d6d1
chore: review feedback
moofish32 Jul 17, 2021
2a69b72
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 17, 2021
5a079a3
chore: refactor enums & test cases
moofish32 Jul 17, 2021
9ab5e64
chore: refactor tests to include token, jsonpath, task input
moofish32 Jul 18, 2021
73f24e4
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 19, 2021
34d2725
chore: reverting back to interface for message attributes
moofish32 Jul 19, 2021
99c3606
chore: add separate validation function
moofish32 Jul 21, 2021
dbfa4ce
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 21, 2021
2b619a2
chore: update readme for latest changes
moofish32 Jul 21, 2021
9dacadf
chore: readme updates again
moofish32 Jul 21, 2021
a73bebc
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 22, 2021
6160747
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 27, 2021
eaf591f
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 27, 2021
306018e
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 27, 2021
32119bd
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 27, 2021
5e9ce21
Update packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
moofish32 Jul 27, 2021
aeaf386
Merge branch 'master' into feat/sfn-sns-message-attributes
moofish32 Jul 27, 2021
c07e698
chore: refactoring for pr comments
moofish32 Jul 27, 2021
11fbdc4
Add actual value to array type mismatch error
BenChaimberg Jul 28, 2021
2d127f0
Merge branch 'master' into feat/sfn-sns-message-attributes
BenChaimberg Jul 28, 2021
0b6dbc7
Merge branch 'master' into feat/sfn-sns-message-attributes
mergify[bot] Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,23 @@ const topic = new sns.Topic(this, 'Topic');
const task1 = new tasks.SnsPublish(this, 'Publish1', {
topic,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
message: sfn.TaskInput.fromJsonPathAt('$.state.message'),
message: sfn.TaskInput.fromDataAt('$.state.message'),
messageAttributes: {
place: {
value: sfn.JsonPath.stringAt('$.place'),
},
pic: {
// BINARY must be explicitly set
type: MessageAttributeDataType.BINARY,
value: sfn.JsonPath.stringAt('$.pic'),
},
people: {
value: 4,
},
handles: {
value: ['@kslater', '@jjf', null, '@mfanning'],
},

});

// Combine a field from the execution data with
Expand Down
147 changes: 147 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,63 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Token } from '@aws-cdk/core';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* The data type set for the SNS message attributes
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
export enum MessageAttributeDataType {
/**
* Strings are Unicode with UTF-8 binary encoding
*/
STRING = 'String',

/**
* An array, formatted as a string
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
STRING_ARRAY = 'String.Array',

/**
* Numbers are positive or negative integers or floating-point numbers
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
NUMBER = 'Number',

/**
* Binary type attributes can store any binary data
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
BINARY = 'Binary'
}

/**
* A message attribute to add to the SNS message
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
*/
export interface MessageAttribute {
/**
* The value of the attribute
*/
readonly value: any;

/**
* The data type for the attribute
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
* @default determined by type inspection if possible, fallback is String
*/
readonly dataType?: MessageAttributeDataType
}

/**
* Properties for publishing a message to an SNS topic
*/
Expand All @@ -23,6 +77,17 @@ export interface SnsPublishProps extends sfn.TaskStateBaseProps {
*/
readonly message: sfn.TaskInput;

/**
* Add message attributes when publishing.
*
* These attributes carry additional metadata about the message and may be used
* for subscription filters.
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
* @default []
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
*/
readonly messageAttributes?: { [key: string]: MessageAttribute };

/**
* Send different messages for each transport protocol.
*
Expand Down Expand Up @@ -98,8 +163,90 @@ export class SnsPublish extends sfn.TaskStateBase {
TopicArn: this.props.topic.topicArn,
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined,
MessageAttributes: renderMessageAttributes(this.props.messageAttributes),
Subject: this.props.subject,
}),
};
}
}

interface MessageAttributeValue {
DataType: string;
StringValue?: string | null;
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
BinaryValue?: string;
}

function renderMessageAttributes(attributes?: { [key: string]: MessageAttribute }): any {
if (attributes === undefined) { return undefined; }
const renderedAttributes: { [key: string]: MessageAttributeValue } = {};
Object.entries(attributes).map(([key, val]) => {
renderedAttributes[key] = renderMessageAttributeValue(val);
});
return sfn.TaskInput.fromObject(renderedAttributes).value;
}

function renderMessageAttributeValue(attribute: MessageAttribute): MessageAttributeValue {
const dataType = attribute.dataType;
if (attribute.value instanceof sfn.TaskInput) {
return { DataType: dataType ?? MessageAttributeDataType.STRING, StringValue: attribute.value.value };
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
}

if (dataType === MessageAttributeDataType.BINARY) {
return { DataType: dataType, BinaryValue: `${attribute.value}` };
}

if (Token.isUnresolved(attribute.value)) {
return { DataType: dataType ?? MessageAttributeDataType.STRING, StringValue: attribute.value };
}

validateMessageAttribute(attribute);
if (Array.isArray(attribute.value)) {
return { DataType: dataType ?? MessageAttributeDataType.STRING_ARRAY, StringValue: JSON.stringify(attribute.value) };
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
}
const value = attribute.value;
if (typeof value === 'number') {
return { DataType: MessageAttributeDataType.NUMBER, StringValue: `${value}` };
} else {
return { DataType: MessageAttributeDataType.STRING, StringValue: `${value}` };
}
}

function validateMessageAttribute(attribute: MessageAttribute): void {
const dataType = attribute.dataType;
const value = attribute.value;
if (dataType === undefined) {
return;
}
if (Array.isArray(value)) {
if (dataType !== MessageAttributeDataType.STRING_ARRAY) {
throw new Error(`Unsupported SNS message attribute type: ${JSON.stringify(value)}`);
BenChaimberg marked this conversation as resolved.
Show resolved Hide resolved
}
const validArrayTypes = ['string', 'boolean', 'number'];
value.forEach((v) => {
if (v !== null || !validArrayTypes.includes(typeof v)) {
throw new Error(`Unsupported SNS message attribute in array: ${v}`);
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
}
});
return;
}
switch (typeof value) {
case 'string':
// trust the user or will default to string
if (sfn.JsonPath.isEncodedJsonPath(attribute.value)) {
return;
}
if (dataType === MessageAttributeDataType.STRING ||
dataType === MessageAttributeDataType.BINARY) {
return;
}
throw new Error(`Unsupported SNS message attribute: ${JSON.stringify(value)}`);
case 'number':
if (dataType === MessageAttributeDataType.NUMBER) { return; }
throw new Error(`Unsupported SNS message attribute: ${JSON.stringify(value)}`);
case 'boolean':
if (dataType === MessageAttributeDataType.STRING) { return; }
throw new Error(`Unsupported SNS message attribute: ${JSON.stringify(value)}`);
default:
throw new Error(`Unsupported SNS message attribute: ${JSON.stringify(value)}`);
}
}
152 changes: 150 additions & 2 deletions packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { SnsPublish } from '../../lib/sns/publish';
import { SnsPublish, MessageAttributeDataType, MessageAttribute } from '../../lib/sns/publish';

describe('Publish', () => {

Expand Down Expand Up @@ -38,6 +38,154 @@ describe('Publish', () => {
},
});
});
test('with message attributes', () => {
// GIVEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');
const token = cdk.Token.asString('cakes can be resolved');

// WHEN
const task = new SnsPublish(stack, 'Publish', {
topic,
message: sfn.TaskInput.fromText('Publish this message'),
messageAttributes: {
moofish32 marked this conversation as resolved.
Show resolved Hide resolved
cake: {
value: 'chocolate',
},
cakeCount: {
value: 2,
},
resolvable: {
value: token,
},
binary: {
value: 'a2345',
dataType: MessageAttributeDataType.BINARY,
},
binaryNumberIsString: {
value: 123456987,
dataType: MessageAttributeDataType.BINARY,
},
taskInput: {
value: sfn.TaskInput.fromJsonPathAt('$$.StateMachine.Name'),
},
executionId: {
value: sfn.JsonPath.stringAt('$$.Execution.Id'),
},
vendors: {
value: ['Great Cakes', true, false, null, 3, 'Local Cakes'],
},
},
});

// THEN
expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::sns:publish',
],
],
},
End: true,
Parameters: {
TopicArn: { Ref: 'TopicBFC7AF6E' },
Message: 'Publish this message',
MessageAttributes: {
binary: {
DataType: 'Binary',
BinaryValue: 'a2345',
},
binaryNumberIsString: {
DataType: 'Binary',
BinaryValue: '123456987',
},
cake: {
DataType: 'String',
StringValue: 'chocolate',
},
cakeCount: {
DataType: 'Number',
StringValue: '2',
},
resolvable: {
DataType: 'String',
StringValue: 'cakes can be resolved',
},
executionId: {
'DataType': 'String',
'StringValue.$': '$$.Execution.Id',
},
taskInput: {
'DataType': 'String',
'StringValue.$': '$$.StateMachine.Name',
},
vendors: {
DataType: 'String.Array',
StringValue: '["Great Cakes",true,false,null,3,"Local Cakes"]',
},
},
},
});
});
describe('invalid message attribute configurations', () => {
// GIVEN
const attributes: MessageAttribute[] = [
{
value: 2,
dataType: MessageAttributeDataType.STRING,
},
{
value: 'foo',
dataType: MessageAttributeDataType.NUMBER,
},
{
value: 'foo',
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: ['foo', undefined, 2, true],
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: ['foo', { bar: 2 }, 2, true],
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: false,
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: false,
dataType: MessageAttributeDataType.NUMBER,
},
];
attributes.forEach((a) => {
test(`${JSON.stringify(a)} is not valid`, () => {
// WHEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');
const task = new SnsPublish(stack, 'Publish', {
topic,
message: sfn.TaskInput.fromText('Publish this message'),
messageAttributes: {
test: a,
},
});

// THEN
expect(() => {
stack.resolve(task.toStateJson());
}).toThrow(/Unsupported SNS message attribute/);
});
});
});

test('publish SNS message and wait for task token', () => {
// GIVEN
Expand Down Expand Up @@ -188,4 +336,4 @@ describe('Publish', () => {
});
}).toThrow(/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE,WAIT_FOR_TASK_TOKEN. Received: RUN_JOB/);
});
});
});