Skip to content

Commit

Permalink
feat(stepfunctions-tasks): add sns publish with message attributes (a…
Browse files Browse the repository at this point in the history
…ws#14817)

This PR address aws#4702

I am primarily solving for the use case of using SNS publish with
message attributes for sns filtering. If EventBridge integrations have a
fast follow for wait for task token features, that would solve my use
case. Because I'm solving for filtering and based on anecdotal
experience String is the primary attribute type. I've included Binary
because the pattern is the same. I have not implemented Number or
String.Array because they are not Lambda supported and I believe less
common.

I chose to attempt to solve for consumer ergonomics with:

```ts
    const task = new SnsPublish(stack, 'Publish', {
      topic,
      message: sfn.TaskInput.fromText('Publish this message'),
      messageAttributes: {
        cake: {
          type: SnsMessageAttributeType.STRING,
          value: sfn.TaskInput.fromText('chocolate'),
        },
        cakePic: {
          type: SnsMessageAttributeType.BINARY,
          value: sfn.TaskInput.fromDataAt('$.cake.pic'),
        },
      },
    });
```

This results in a `value` member on the message attribute interface.
While I think that maps best for the SNS json definition and does yield
this ugly line in the private render methods:

```
      attrs[a][`${attr.type}Value`] = attr.value.value;
```

Is there a better way?

I did not implement SQS message attributes yet. Based on my limited
knowledge the StepFunction integrations are similar(potentially
identical if we again only support Binary and String). If the answer is
one implementation, should I move the below code snippets into something
generically named MessageAttribute within the stepfunctions package
perhaps as part of base-task?

```ts
/**
 * SNS Message Attribute type enum
 */
export enum SnsMessageAttributeType {

  /**
   * String type attributes.
   *
   * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
   */
  STRING = 'String',

  /**
   * Binary type attributes.
   *
   * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
   */
  BINARY = 'Binary',
}

/**
 * SNS Message Attribute
 */
export interface SnsMessageAttribute {
  /**
   * MessageAttributeType is the type of data being passed.
   *
   *
   * @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
   */
  readonly type: SnsMessageAttributeType;

  /**
   * The value of the data being passed
   */
  readonly value: sfn.TaskInput;
}
```
  • Loading branch information
moofish32 authored and hollanddd committed Aug 26, 2021
1 parent b0bfc5e commit 361284d
Show file tree
Hide file tree
Showing 3 changed files with 319 additions and 3 deletions.
18 changes: 17 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,23 @@ const topic = new sns.Topic(this, 'Topic');
const task1 = new tasks.SnsPublish(this, 'Publish1', {
topic,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
message: sfn.TaskInput.fromJsonPathAt('$.state.message'),
message: sfn.TaskInput.fromDataAt('$.state.message'),
messageAttributes: {
place: {
value: sfn.JsonPath.stringAt('$.place'),
},
pic: {
// BINARY must be explicitly set
type: MessageAttributeDataType.BINARY,
value: sfn.JsonPath.stringAt('$.pic'),
},
people: {
value: 4,
},
handles: {
value: ['@kslater', '@jjf', null, '@mfanning'],
},

});

// Combine a field from the execution data with
Expand Down
152 changes: 152 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/sns/publish.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,63 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Token } from '@aws-cdk/core';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* The data type set for the SNS message attributes
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
export enum MessageAttributeDataType {
/**
* Strings are Unicode with UTF-8 binary encoding
*/
STRING = 'String',

/**
* An array, formatted as a string
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
STRING_ARRAY = 'String.Array',

/**
* Numbers are positive or negative integers or floating-point numbers
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
NUMBER = 'Number',

/**
* Binary type attributes can store any binary data
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
*/
BINARY = 'Binary'
}

/**
* A message attribute to add to the SNS message
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
*/
export interface MessageAttribute {
/**
* The value of the attribute
*/
readonly value: any;

/**
* The data type for the attribute
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html#SNSMessageAttributes.DataTypes
* @default determined by type inspection if possible, fallback is String
*/
readonly dataType?: MessageAttributeDataType
}

/**
* Properties for publishing a message to an SNS topic
*/
Expand All @@ -23,6 +77,17 @@ export interface SnsPublishProps extends sfn.TaskStateBaseProps {
*/
readonly message: sfn.TaskInput;

/**
* Add message attributes when publishing.
*
* These attributes carry additional metadata about the message and may be used
* for subscription filters.
*
* @see https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html
* @default {}
*/
readonly messageAttributes?: { [key: string]: MessageAttribute };

/**
* Send different messages for each transport protocol.
*
Expand Down Expand Up @@ -98,8 +163,95 @@ export class SnsPublish extends sfn.TaskStateBase {
TopicArn: this.props.topic.topicArn,
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? 'json' : undefined,
MessageAttributes: renderMessageAttributes(this.props.messageAttributes),
Subject: this.props.subject,
}),
};
}
}

interface MessageAttributeValue {
DataType: string;
StringValue?: string;
BinaryValue?: string;
}

function renderMessageAttributes(attributes?: { [key: string]: MessageAttribute }): any {
if (attributes === undefined) { return undefined; }
const renderedAttributes: { [key: string]: MessageAttributeValue } = {};
Object.entries(attributes).map(([key, val]) => {
renderedAttributes[key] = renderMessageAttributeValue(val);
});
return sfn.TaskInput.fromObject(renderedAttributes).value;
}

function renderMessageAttributeValue(attribute: MessageAttribute): MessageAttributeValue {
const dataType = attribute.dataType;
if (attribute.value instanceof sfn.TaskInput) {
return {
DataType: dataType ?? MessageAttributeDataType.STRING,
StringValue: dataType !== MessageAttributeDataType.BINARY ? attribute.value.value : undefined,
BinaryValue: dataType === MessageAttributeDataType.BINARY ? attribute.value.value : undefined,
};
}

if (dataType === MessageAttributeDataType.BINARY) {
return { DataType: dataType, BinaryValue: `${attribute.value}` };
}

if (Token.isUnresolved(attribute.value)) {
return { DataType: dataType ?? MessageAttributeDataType.STRING, StringValue: attribute.value };
}

validateMessageAttribute(attribute);
if (Array.isArray(attribute.value)) {
return { DataType: MessageAttributeDataType.STRING_ARRAY, StringValue: JSON.stringify(attribute.value) };
}
const value = attribute.value;
if (typeof value === 'number') {
return { DataType: MessageAttributeDataType.NUMBER, StringValue: `${value}` };
} else {
return { DataType: MessageAttributeDataType.STRING, StringValue: `${value}` };
}
}

function validateMessageAttribute(attribute: MessageAttribute): void {
const dataType = attribute.dataType;
const value = attribute.value;
if (dataType === undefined) {
return;
}
if (Array.isArray(value)) {
if (dataType !== MessageAttributeDataType.STRING_ARRAY) {
throw new Error(`Requested SNS message attribute type was ${dataType} but ${value} was of type Array`);
}
const validArrayTypes = ['string', 'boolean', 'number'];
value.forEach((v) => {
if (v !== null || !validArrayTypes.includes(typeof v)) {
throw new Error(`Requested SNS message attribute type was ${typeof value} but Array values must be one of ${validArrayTypes}`);
}
});
return;
}
const error = new Error(`Requested SNS message attribute type was ${dataType} but ${value} was of type ${typeof value}`);
switch (typeof value) {
case 'string':
// trust the user or will default to string
if (sfn.JsonPath.isEncodedJsonPath(attribute.value)) {
return;
}
if (dataType === MessageAttributeDataType.STRING ||
dataType === MessageAttributeDataType.BINARY) {
return;
}
throw error;
case 'number':
if (dataType === MessageAttributeDataType.NUMBER) { return; }
throw error;
case 'boolean':
if (dataType === MessageAttributeDataType.STRING) { return; }
throw error;
default:
throw error;
}
}
152 changes: 150 additions & 2 deletions packages/@aws-cdk/aws-stepfunctions-tasks/test/sns/publish.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as sns from '@aws-cdk/aws-sns';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { SnsPublish } from '../../lib/sns/publish';
import { SnsPublish, MessageAttributeDataType, MessageAttribute } from '../../lib/sns/publish';

describe('Publish', () => {

Expand Down Expand Up @@ -38,6 +38,154 @@ describe('Publish', () => {
},
});
});
test('with message attributes', () => {
// GIVEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');
const token = cdk.Token.asString('cakes can be resolved');

// WHEN
const task = new SnsPublish(stack, 'Publish', {
topic,
message: sfn.TaskInput.fromText('Publish this message'),
messageAttributes: {
cake: {
value: 'chocolate',
},
cakeCount: {
value: 2,
},
resolvable: {
value: token,
},
binary: {
value: 'a2345',
dataType: MessageAttributeDataType.BINARY,
},
binaryNumberIsString: {
value: 123456987,
dataType: MessageAttributeDataType.BINARY,
},
taskInput: {
value: sfn.TaskInput.fromJsonPathAt('$$.StateMachine.Name'),
},
executionId: {
value: sfn.JsonPath.stringAt('$$.Execution.Id'),
},
vendors: {
value: ['Great Cakes', true, false, null, 3, 'Local Cakes'],
},
},
});

// THEN
expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::sns:publish',
],
],
},
End: true,
Parameters: {
TopicArn: { Ref: 'TopicBFC7AF6E' },
Message: 'Publish this message',
MessageAttributes: {
binary: {
DataType: 'Binary',
BinaryValue: 'a2345',
},
binaryNumberIsString: {
DataType: 'Binary',
BinaryValue: '123456987',
},
cake: {
DataType: 'String',
StringValue: 'chocolate',
},
cakeCount: {
DataType: 'Number',
StringValue: '2',
},
resolvable: {
DataType: 'String',
StringValue: 'cakes can be resolved',
},
executionId: {
'DataType': 'String',
'StringValue.$': '$$.Execution.Id',
},
taskInput: {
'DataType': 'String',
'StringValue.$': '$$.StateMachine.Name',
},
vendors: {
DataType: 'String.Array',
StringValue: '["Great Cakes",true,false,null,3,"Local Cakes"]',
},
},
},
});
});
describe('invalid message attribute configurations', () => {
// GIVEN
const attributes: MessageAttribute[] = [
{
value: 2,
dataType: MessageAttributeDataType.STRING,
},
{
value: 'foo',
dataType: MessageAttributeDataType.NUMBER,
},
{
value: 'foo',
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: ['foo', undefined, 2, true],
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: ['foo', { bar: 2 }, 2, true],
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: false,
dataType: MessageAttributeDataType.STRING_ARRAY,
},
{
value: false,
dataType: MessageAttributeDataType.NUMBER,
},
];
attributes.forEach((a) => {
test(`${JSON.stringify(a)} is not valid`, () => {
// WHEN
const stack = new cdk.Stack();
const topic = new sns.Topic(stack, 'Topic');
const task = new SnsPublish(stack, 'Publish', {
topic,
message: sfn.TaskInput.fromText('Publish this message'),
messageAttributes: {
test: a,
},
});

// THEN
expect(() => {
stack.resolve(task.toStateJson());
}).toThrow(/Requested SNS message attribute type was/);
});
});
});

test('publish SNS message and wait for task token', () => {
// GIVEN
Expand Down Expand Up @@ -188,4 +336,4 @@ describe('Publish', () => {
});
}).toThrow(/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE,WAIT_FOR_TASK_TOKEN. Received: RUN_JOB/);
});
});
});

0 comments on commit 361284d

Please sign in to comment.