-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iot): add Action to republish MQTT messages to another MQTT topic #18661
Changes from 5 commits
0c2063e
63dfc71
8eed4d9
4bf88c8
8c5aaa6
4868f69
0a8ad71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import * as iam from '@aws-cdk/aws-iam'; | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import { CommonActionProps } from './common-action-props'; | ||
import { singletonActionRole } from './private/role'; | ||
|
||
/** | ||
* MQTT Quality of Service (QoS) indicates the level of assurance for delivery of an MQTT Message. | ||
* | ||
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos | ||
*/ | ||
export enum MqttQualityOfService { | ||
/** | ||
* QoS level 0. Sent zero or more times. | ||
* This level should be used for messages that are sent over reliable communication links or that can be missed without a problem. | ||
*/ | ||
ZERO_OR_MORE_TIMES, | ||
|
||
/** | ||
* QoS level 1. Sent at least one time, and then repeatedly until a PUBACK response is received. | ||
* The message is not considered complete until the sender receives a PUBACK response to indicate successful delivery. | ||
*/ | ||
AT_LEAST_ONCE, | ||
} | ||
|
||
/** | ||
* Configuration properties of an action to republish MQTT messages. | ||
*/ | ||
export interface IotRepublishMqttActionProps extends CommonActionProps { | ||
/** | ||
* The Quality of Service (QoS) level to use when republishing messages. | ||
* | ||
* @see https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt-qos | ||
* | ||
* @default MqttQualityOfService.ZERO_OR_MORE_TIMES | ||
*/ | ||
readonly qos?: MqttQualityOfService; | ||
} | ||
|
||
/** | ||
* The action to put the record from an MQTT message to republish another MQTT topic. | ||
*/ | ||
export class IotRepublishMqttAction implements iot.IAction { | ||
private readonly qos?: MqttQualityOfService; | ||
private readonly role?: iam.IRole; | ||
|
||
/** | ||
* @param topic The MQTT topic to which to republish the message. | ||
* @param props Optional properties to not use default. | ||
*/ | ||
constructor(private readonly topic: string, props: IotRepublishMqttActionProps = {}) { | ||
this.qos = props.qos; | ||
this.role = props.role; | ||
} | ||
|
||
bind(rule: iot.ITopicRule): iot.ActionConfig { | ||
const role = this.role ?? singletonActionRole(rule); | ||
role.addToPrincipalPolicy(new iam.PolicyStatement({ | ||
actions: ['iot:Publish'], | ||
resources: ['*'], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... does this have to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think too, it is maybe less restrictive. But I have no good idea.. When the passed topic is literal, The topic's ARN Users can use more restrictive permission with that they provide property There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Let's leave it as-is for now (if you want to add a quick blurb in the ReadMe about this, feel free, but I won't require it). |
||
})); | ||
|
||
return { | ||
configuration: { | ||
republish: { | ||
topic: this.topic, | ||
qos: this.qos, | ||
roleArn: role.roleArn, | ||
}, | ||
}, | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
{ | ||
"Resources": { | ||
"TopicRule40A4EA44": { | ||
"Type": "AWS::IoT::TopicRule", | ||
"Properties": { | ||
"TopicRulePayload": { | ||
"Actions": [ | ||
{ | ||
"Republish": { | ||
"Qos": 1, | ||
"RoleArn": { | ||
"Fn::GetAtt": [ | ||
"TopicRuleTopicRuleActionRole246C4F77", | ||
"Arn" | ||
] | ||
}, | ||
"Topic": "${topic()}/republish" | ||
} | ||
} | ||
], | ||
"AwsIotSqlVersion": "2016-03-23", | ||
"Sql": "SELECT * FROM 'device/+/data'" | ||
} | ||
} | ||
}, | ||
"TopicRuleTopicRuleActionRole246C4F77": { | ||
"Type": "AWS::IAM::Role", | ||
"Properties": { | ||
"AssumeRolePolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": "sts:AssumeRole", | ||
"Effect": "Allow", | ||
"Principal": { | ||
"Service": "iot.amazonaws.com" | ||
} | ||
} | ||
], | ||
"Version": "2012-10-17" | ||
} | ||
} | ||
}, | ||
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": { | ||
"Type": "AWS::IAM::Policy", | ||
"Properties": { | ||
"PolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": "iot:Publish", | ||
"Effect": "Allow", | ||
"Resource": "*" | ||
} | ||
], | ||
"Version": "2012-10-17" | ||
}, | ||
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687", | ||
"Roles": [ | ||
{ | ||
"Ref": "TopicRuleTopicRuleActionRole246C4F77" | ||
} | ||
] | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import * as cdk from '@aws-cdk/core'; | ||
import * as actions from '../../lib'; | ||
|
||
class TestStack extends cdk.Stack { | ||
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { | ||
super(scope, id, props); | ||
|
||
const topicRule = new iot.TopicRule(this, 'TopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323( | ||
"SELECT * FROM 'device/+/data'", | ||
), | ||
}); | ||
|
||
topicRule.addAction( | ||
new actions.IotRepublishMqttAction('${topic()}/republish', { | ||
qos: actions.MqttQualityOfService.AT_LEAST_ONCE, | ||
}), | ||
); | ||
} | ||
} | ||
|
||
const app = new cdk.App(); | ||
new TestStack(app, 'iot-republish-action-test-stack'); | ||
app.synth(); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import { Template, Match } from '@aws-cdk/assertions'; | ||
import * as iam from '@aws-cdk/aws-iam'; | ||
import * as iot from '@aws-cdk/aws-iot'; | ||
import * as cdk from '@aws-cdk/core'; | ||
import * as actions from '../../lib'; | ||
|
||
let stack: cdk.Stack; | ||
let topicRule:iot.TopicRule; | ||
beforeEach(() => { | ||
stack = new cdk.Stack(); | ||
topicRule = new iot.TopicRule(stack, 'MyTopicRule', { | ||
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"), | ||
}); | ||
}); | ||
|
||
test('Default IoT republish action', () => { | ||
// WHEN | ||
topicRule.addAction( | ||
new actions.IotRepublishMqttAction('test-topic'), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
{ | ||
Republish: { | ||
Topic: 'test-topic', | ||
RoleArn: { | ||
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'], | ||
}, | ||
}, | ||
}, | ||
], | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', { | ||
AssumeRolePolicyDocument: { | ||
Statement: [ | ||
{ | ||
Action: 'sts:AssumeRole', | ||
Effect: 'Allow', | ||
Principal: { | ||
Service: 'iot.amazonaws.com', | ||
}, | ||
}, | ||
], | ||
Version: '2012-10-17', | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { | ||
PolicyDocument: { | ||
Statement: [ | ||
{ | ||
Action: 'iot:Publish', | ||
Effect: 'Allow', | ||
Resource: '*', | ||
}, | ||
], | ||
Version: '2012-10-17', | ||
}, | ||
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7', | ||
Roles: [ | ||
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' }, | ||
], | ||
}); | ||
}); | ||
|
||
test('can set qos', () => { | ||
// WHEN | ||
topicRule.addAction( | ||
new actions.IotRepublishMqttAction('test-topic', { qos: actions.MqttQualityOfService.AT_LEAST_ONCE }), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
Match.objectLike({ Republish: { Qos: 1 } }), | ||
], | ||
}, | ||
}); | ||
}); | ||
|
||
test('can set role', () => { | ||
// WHEN | ||
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest'); | ||
topicRule.addAction( | ||
new actions.IotRepublishMqttAction('test-topic', { role }), | ||
); | ||
|
||
// THEN | ||
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', { | ||
TopicRulePayload: { | ||
Actions: [ | ||
Match.objectLike({ Republish: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }), | ||
], | ||
}, | ||
}); | ||
|
||
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { | ||
PolicyName: 'MyRolePolicy64AB00A5', | ||
Roles: ['ForTest'], | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this in the initial review.
Let's rename this to
qualityOfService
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops... It's my missing of fix. Sorry.