Skip to content

Commit

Permalink
feat(events-targets): add support for Kinesis Firehose as a target (#…
Browse files Browse the repository at this point in the history
…10400)

closes #10349


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
cheruvian authored Oct 20, 2020
1 parent 6d04fd2 commit b93cda6
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 7 deletions.
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-events-targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Currently supported are:
* Queue a Batch job
* Make an AWS API call
* Put a record to a Kinesis stream
* Put a record to a Kinesis Data Firehose stream

See the README of the `@aws-cdk/aws-events` library for more information on
EventBridge.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-events-targets/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './ecs-task-properties';
export * from './ecs-task';
export * from './state-machine';
export * from './kinesis-stream';
export * from './kinesis-firehose-stream';
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import { singletonEventRole } from './util';

/**
* Customize the Firehose Stream Event Target
*/
export interface KinesisFirehoseStreamProps {
/**
* The message to send to the stream.
*
* Must be a valid JSON text passed to the target stream.
*
* @default - the entire Event Bridge event
*/
readonly message?: events.RuleTargetInput;
}


/**
* Customize the Firehose Stream Event Target
*/
export class KinesisFirehoseStream implements events.IRuleTarget {

constructor(private readonly stream: firehose.CfnDeliveryStream, private readonly props: KinesisFirehoseStreamProps = {}) {
}

/**
* Returns a RuleTarget that can be used to trigger this Firehose Stream as a
* result from a Event Bridge event.
*/
public bind(_rule: events.IRule, _id?: string): events.RuleTargetConfig {
const policyStatements = [new iam.PolicyStatement({
actions: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
resources: [this.stream.attrArn],
})];

return {
id: '',
arn: this.stream.attrArn,
role: singletonEventRole(this.stream, policyStatements),
input: this.props.message,
targetResource: this.stream,
};
}
}
17 changes: 10 additions & 7 deletions packages/@aws-cdk/aws-events-targets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"devDependencies": {
"@aws-cdk/assert": "0.0.0",
"@aws-cdk/aws-codecommit": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"aws-sdk": "^2.739.0",
"aws-sdk-mock": "^5.1.0",
"cdk-build-tools": "0.0.0",
Expand All @@ -82,39 +83,41 @@
"pkglint": "0.0.0"
},
"dependencies": {
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/aws-codebuild": "0.0.0",
"@aws-cdk/aws-codepipeline": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-stepfunctions": "0.0.0",
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/core": "0.0.0",
"constructs": "^3.0.4"
},
"homepage": "https://github.com/aws/aws-cdk",
"peerDependencies": {
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/aws-codebuild": "0.0.0",
"@aws-cdk/aws-codepipeline": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-stepfunctions": "0.0.0",
"@aws-cdk/aws-batch": "0.0.0",
"@aws-cdk/core": "0.0.0",
"constructs": "^3.0.4",
"@aws-cdk/aws-kinesis": "0.0.0"
"constructs": "^3.0.4"
},
"engines": {
"node": ">= 10.13.0 <13 || >=13.7.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{
"Resources": {
"firehosebucket84C8AE0B": {
"Type": "AWS::S3::Bucket",
"UpdateReplacePolicy": "Retain",
"DeletionPolicy": "Retain"
},
"firehoseroleDDC4CF0E": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"firehoseroleDefaultPolicy3F3F850D": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*",
"s3:DeleteObject*",
"s3:PutObject*",
"s3:Abort*"
],
"Effect": "Allow",
"Resource": [
{
"Fn::GetAtt": [
"firehosebucket84C8AE0B",
"Arn"
]
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"firehosebucket84C8AE0B",
"Arn"
]
},
"/*"
]
]
}
]
}
],
"Version": "2012-10-17"
},
"PolicyName": "firehoseroleDefaultPolicy3F3F850D",
"Roles": [
{
"Ref": "firehoseroleDDC4CF0E"
}
]
}
},
"MyStream": {
"Type": "AWS::KinesisFirehose::DeliveryStream",
"Properties": {
"ExtendedS3DestinationConfiguration": {
"BucketARN": {
"Fn::GetAtt": [
"firehosebucket84C8AE0B",
"Arn"
]
},
"RoleARN": {
"Fn::GetAtt": [
"firehoseroleDDC4CF0E",
"Arn"
]
}
}
}
},
"MyStreamEventsRole5B6CC6AF": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"MyStreamEventsRoleDefaultPolicy2089B49E": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "MyStreamEventsRoleDefaultPolicy2089B49E",
"Roles": [
{
"Ref": "MyStreamEventsRole5B6CC6AF"
}
]
}
},
"EveryMinute2BBCEA8F": {
"Type": "AWS::Events::Rule",
"Properties": {
"ScheduleExpression": "rate(1 minute)",
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"MyStream",
"Arn"
]
},
"Id": "Target0",
"RoleArn": {
"Fn::GetAtt": [
"MyStreamEventsRole5B6CC6AF",
"Arn"
]
}
}
]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as s3 from '@aws-cdk/aws-s3';
import * as cdk from '@aws-cdk/core';
import * as targets from '../../lib';

// ---------------------------------
// Define a rule that triggers a put to a Kinesis stream every 1min.

const app = new cdk.App();

const stack = new cdk.Stack(app, 'aws-cdk-firehose-event-target');

const bucket = new s3.Bucket(stack, 'firehose-bucket');
const firehoseRole = new iam.Role(stack, 'firehose-role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
});
const stream = new firehose.CfnDeliveryStream(stack, 'MyStream', {
extendedS3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: firehoseRole.roleArn,
},
});
bucket.grantReadWrite(firehoseRole);

const event = new events.Rule(stack, 'EveryMinute', {
schedule: events.Schedule.rate(cdk.Duration.minutes(1)),
});

event.addTarget(new targets.KinesisFirehoseStream(stream, {}));

app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { expect, haveResource, haveResourceLike } from '@aws-cdk/assert';
import * as events from '@aws-cdk/aws-events';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import { Stack } from '@aws-cdk/core';
import * as targets from '../../lib';

describe('KinesisFirehoseStream event target', () => {
let stack: Stack;
let stream: firehose.CfnDeliveryStream;
let streamArn: any;

beforeEach(() => {
stack = new Stack();
stream = new firehose.CfnDeliveryStream(stack, 'MyStream');
streamArn = { 'Fn::GetAtt': ['MyStream', 'Arn'] };
});

describe('when added to an event rule as a target', () => {
let rule: events.Rule;

beforeEach(() => {
rule = new events.Rule(stack, 'rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});
});

describe('with default settings', () => {
beforeEach(() => {
rule.addTarget(new targets.KinesisFirehoseStream(stream));
});

test("adds the stream's ARN and role to the targets of the rule", () => {
expect(stack).to(haveResource('AWS::Events::Rule', {
Targets: [
{
Arn: streamArn,
Id: 'Target0',
RoleArn: { 'Fn::GetAtt': ['MyStreamEventsRole5B6CC6AF', 'Arn'] },
},
],
}));
});

test("creates a policy that has PutRecord and PutRecords permissions on the stream's ARN", () => {
expect(stack).to(haveResource('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
Effect: 'Allow',
Resource: streamArn,
},
],
Version: '2012-10-17',
},
}));
});
});

describe('with an explicit message', () => {
beforeEach(() => {
rule.addTarget(new targets.KinesisFirehoseStream(stream, {
message: events.RuleTargetInput.fromText('fooBar'),
}));
});

test('sets the input', () => {
expect(stack).to(haveResourceLike('AWS::Events::Rule', {
Targets: [
{
Arn: streamArn,
Id: 'Target0',
Input: '"fooBar"',
},
],
}));
});
});
});
});

0 comments on commit b93cda6

Please sign in to comment.