Skip to content

Commit

Permalink
feat(events-targets): kinesis stream as event rule target (#8176)
Browse files Browse the repository at this point in the history
closes #2997


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
gcacace authored Jun 2, 2020
1 parent e257dc8 commit 21ebc2d
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 1 deletion.
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-events-targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Currently supported are:
* Start a StepFunctions state machine
* Queue a Batch job
* Make an AWS API call
* Put a record to a Kinesis stream

See the README of the `@aws-cdk/aws-events` library for more information on
CloudWatch Events.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-events-targets/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export * from './lambda';
export * from './ecs-task-properties';
export * from './ecs-task';
export * from './state-machine';
export * from './kinesis-stream';
63 changes: 63 additions & 0 deletions packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import { singletonEventRole } from './util';

/**
* Customize the Kinesis Stream Event Target
*/
export interface KinesisStreamProps {
/**
* Partition Key Path for records sent to this stream
*
* @default - eventId as the partition key
*/
readonly partitionKeyPath?: string;

/**
* The message to send to the stream.
*
* Must be a valid JSON text passed to the target stream.
*
* @default - the entire CloudWatch event
*/
readonly message?: events.RuleTargetInput;

}

/**
* Use a Kinesis Stream as a target for AWS CloudWatch event rules.
*
* @example
*
* // put to a Kinesis stream every time code is committed
* // to a CodeCommit repository
* repository.onCommit(new targets.KinesisStream(stream));
*
*/
export class KinesisStream implements events.IRuleTarget {

constructor(private readonly stream: kinesis.IStream, private readonly props: KinesisStreamProps = {}) {
}

/**
* Returns a RuleTarget that can be used to trigger this Kinesis Stream as a
* result from a CloudWatch event.
*/
public bind(_rule: events.IRule, _id?: string): events.RuleTargetConfig {
const policyStatements = [new iam.PolicyStatement({
actions: ['kinesis:PutRecord', 'kinesis:PutRecords'],
resources: [this.stream.streamArn],
})];

return {
id: '',
arn: this.stream.streamArn,
role: singletonEventRole(this.stream, policyStatements),
input: this.props.message,
targetResource: this.stream,
kinesisParameters: this.props.partitionKeyPath ? { partitionKeyPath: this.props.partitionKeyPath } : undefined,
};
}

}
4 changes: 3 additions & 1 deletion packages/@aws-cdk/aws-events-targets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"@aws-cdk/aws-sqs": "0.0.0",
"@aws-cdk/aws-stepfunctions": "0.0.0",
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/core": "0.0.0",
"constructs": "^3.0.2"
},
Expand All @@ -106,7 +107,8 @@
"@aws-cdk/aws-stepfunctions": "0.0.0",
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/core": "0.0.0",
"constructs": "^3.0.2"
"constructs": "^3.0.2",
"@aws-cdk/aws-kinesis": "0.0.0"
},
"engines": {
"node": ">= 10.13.0 <13 || >=13.7.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
{
"Resources":{
"MyStream5C050E93":{
"Type":"AWS::Kinesis::Stream",
"Properties":{
"ShardCount":1,
"RetentionPeriodHours":24,
"StreamEncryption":{
"Fn::If":[
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions",
{
"Ref":"AWS::NoValue"
},
{
"EncryptionType":"KMS",
"KeyId":"alias/aws/kinesis"
}
]
}
}
},
"MyStreamEventsRole5B6CC6AF":{
"Type":"AWS::IAM::Role",
"Properties":{
"AssumeRolePolicyDocument":{
"Statement":[
{
"Action":"sts:AssumeRole",
"Effect":"Allow",
"Principal":{
"Service":"events.amazonaws.com"
}
}
],
"Version":"2012-10-17"
}
}
},
"MyStreamEventsRoleDefaultPolicy2089B49E":{
"Type":"AWS::IAM::Policy",
"Properties":{
"PolicyDocument":{
"Statement":[
{
"Action":[
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Effect":"Allow",
"Resource":{
"Fn::GetAtt":[
"MyStream5C050E93",
"Arn"
]
}
}
],
"Version":"2012-10-17"
},
"PolicyName":"MyStreamEventsRoleDefaultPolicy2089B49E",
"Roles":[
{
"Ref":"MyStreamEventsRole5B6CC6AF"
}
]
}
},
"EveryMinute2BBCEA8F":{
"Type":"AWS::Events::Rule",
"Properties":{
"ScheduleExpression":"rate(1 minute)",
"State":"ENABLED",
"Targets":[
{
"Arn":{
"Fn::GetAtt":[
"MyStream5C050E93",
"Arn"
]
},
"Id":"Target0",
"KinesisParameters":{
"PartitionKeyPath":"$.id"
},
"RoleArn":{
"Fn::GetAtt":[
"MyStreamEventsRole5B6CC6AF",
"Arn"
]
}
}
]
}
}
},
"Conditions":{
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions":{
"Fn::Or":[
{
"Fn::Equals":[
{
"Ref":"AWS::Region"
},
"cn-north-1"
]
},
{
"Fn::Equals":[
{
"Ref":"AWS::Region"
},
"cn-northwest-1"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import * as events from '@aws-cdk/aws-events';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as cdk from '@aws-cdk/core';
import * as targets from '../../lib';

// ---------------------------------
// Define a rule that triggers a put to a Kinesis stream every 1min.

const app = new cdk.App();

const stack = new cdk.Stack(app, 'aws-cdk-kinesis-event-target');

const stream = new kinesis.Stream(stack, 'MyStream');
const event = new events.Rule(stack, 'EveryMinute', {
schedule: events.Schedule.rate(cdk.Duration.minutes(1)),
});

event.addTarget(new targets.KinesisStream(stream, {
partitionKeyPath: events.EventField.eventId,
}));

app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { expect, haveResource, haveResourceLike } from '@aws-cdk/assert';
import * as events from '@aws-cdk/aws-events';
import * as kinesis from '@aws-cdk/aws-kinesis';
import { Stack } from '@aws-cdk/core';
import * as targets from '../../lib';

describe('KinesisStream event target', () => {
let stack: Stack;
let stream: kinesis.Stream;
let streamArn: any;

beforeEach(() => {
stack = new Stack();
stream = new kinesis.Stream(stack, 'MyStream');
streamArn = { 'Fn::GetAtt': [ 'MyStream5C050E93', 'Arn' ] };
});

describe('when added to an event rule as a target', () => {
let rule: events.Rule;

beforeEach(() => {
rule = new events.Rule(stack, 'rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});
});

describe('with default settings', () => {
beforeEach(() => {
rule.addTarget(new targets.KinesisStream(stream));
});

test("adds the stream's ARN and role to the targets of the rule", () => {
expect(stack).to(haveResource('AWS::Events::Rule', {
Targets: [
{
Arn: streamArn,
Id: 'Target0',
RoleArn: { 'Fn::GetAtt': [ 'MyStreamEventsRole5B6CC6AF', 'Arn' ] },
},
],
}));
});

test("creates a policy that has PutRecord and PutRecords permissions on the stream's ARN", () => {
expect(stack).to(haveResource('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: [ 'kinesis:PutRecord', 'kinesis:PutRecords' ],
Effect: 'Allow',
Resource: streamArn,
},
],
Version: '2012-10-17',
},
}));
});
});

describe('with an explicit partition key path', () => {
beforeEach(() => {
rule.addTarget(new targets.KinesisStream(stream, {
partitionKeyPath: events.EventField.eventId,
}));
});

test('sets the partition key path', () => {
expect(stack).to(haveResourceLike('AWS::Events::Rule', {
Targets: [
{
Arn: streamArn,
Id: 'Target0',
RoleArn: { 'Fn::GetAtt': [ 'MyStreamEventsRole5B6CC6AF', 'Arn' ] },
KinesisParameters: {
PartitionKeyPath: '$.id',
},
},
],
}));
});
});

describe('with an explicit message', () => {
beforeEach(() => {
rule.addTarget(new targets.KinesisStream(stream, {
message: events.RuleTargetInput.fromText('fooBar'),
}));
});

test('sets the input', () => {
expect(stack).to(haveResourceLike('AWS::Events::Rule', {
Targets: [
{
Arn: streamArn,
Id: 'Target0',
Input: '"fooBar"',
},
],
}));
});
});
});
});

0 comments on commit 21ebc2d

Please sign in to comment.