Skip to content

Commit

Permalink
feat(dynamodb): allow using Kinesis stream in Table (#15199)
Browse files Browse the repository at this point in the history
Add support for Kinesis Streams in DynamoDB tables with a new `kinesisStreamArn` table option.

Closes #14534

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
rrhodes authored Jun 21, 2021
1 parent a760173 commit 7bc6c6e
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 0 deletions.
16 changes: 16 additions & 0 deletions packages/@aws-cdk/aws-dynamodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,19 @@ const { partitionKey, sortKey } = table.schema();

const { partitionKey, sortKey } = table.schema(INDEX_NAME);
```

## Kinesis Stream

A Kinesis Data Stream can be configured on the DynamoDB table to capture item-level changes.

```ts
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as kinesis from '@aws-cdk/aws-kinesis';

const stream = new kinesis.Stream(this, 'Stream');

const table = new dynamodb.Table(this, 'Table', {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
kinesisStream: stream,
});
```
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-dynamodb/lib/table.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as appscaling from '@aws-cdk/aws-applicationautoscaling';
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import {
Aws, CfnCondition, CfnCustomResource, CfnResource, CustomResource, Duration,
Expand Down Expand Up @@ -247,6 +248,13 @@ export interface TableProps extends TableOptions {
* @default <generated>
*/
readonly tableName?: string;

/**
* Kinesis Data Stream to capture item-level changes for the table.
*
* @default - no Kinesis Data Stream
*/
readonly kinesisStream?: kinesis.IStream;
}

/**
Expand Down Expand Up @@ -1116,6 +1124,7 @@ export class Table extends TableBase {
streamSpecification,
timeToLiveSpecification: props.timeToLiveAttribute ? { attributeName: props.timeToLiveAttribute, enabled: true } : undefined,
contributorInsightsSpecification: props.contributorInsightsEnabled !== undefined ? { enabled: props.contributorInsightsEnabled } : undefined,
kinesisStreamSpecification: props.kinesisStream ? { streamArn: props.kinesisStream.streamArn } : undefined,
});
this.table.applyRemovalPolicy(props.removalPolicy);

Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-dynamodb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
"@aws-cdk/aws-applicationautoscaling": "0.0.0",
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/core": "0.0.0",
Expand All @@ -102,6 +103,7 @@
"@aws-cdk/aws-applicationautoscaling": "0.0.0",
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/core": "0.0.0",
Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-dynamodb/test/dynamodb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { arrayWith, ABSENT, ResourcePart, SynthUtils } from '@aws-cdk/assert-int
import '@aws-cdk/assert-internal/jest';
import * as appscaling from '@aws-cdk/aws-applicationautoscaling';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import { App, Aws, CfnDeletionPolicy, ConstructNode, Duration, PhysicalName, RemovalPolicy, Resource, Stack, Tags } from '@aws-cdk/core';
import * as cr from '@aws-cdk/custom-resources';
Expand Down Expand Up @@ -320,6 +321,7 @@ describe('default properties', () => {

test('when specifying every property', () => {
const stack = new Stack();
const stream = new kinesis.Stream(stack, 'MyStream');
const table = new Table(stack, CONSTRUCT_NAME, {
tableName: TABLE_NAME,
readCapacity: 42,
Expand All @@ -332,6 +334,7 @@ test('when specifying every property', () => {
partitionKey: TABLE_PARTITION_KEY,
sortKey: TABLE_SORT_KEY,
contributorInsightsEnabled: true,
kinesisStream: stream,
});
Tags.of(table).add('Environment', 'Production');

Expand All @@ -356,6 +359,11 @@ test('when specifying every property', () => {
Tags: [{ Key: 'Environment', Value: 'Production' }],
TimeToLiveSpecification: { AttributeName: 'timeToLive', Enabled: true },
ContributorInsightsSpecification: { Enabled: true },
KinesisStreamSpecification: {
StreamArn: {
'Fn::GetAtt': ['MyStream5C050E93', 'Arn'],
},
},
},
);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"Resources": {
"Stream790BDEE4": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions",
{
"Ref": "AWS::NoValue"
},
{
"EncryptionType": "KMS",
"KeyId": "alias/aws/kinesis"
}
]
}
}
},
"TableCD117FA1": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"AttributeName": "hashKey",
"KeyType": "HASH"
}
],
"AttributeDefinitions": [
{
"AttributeName": "hashKey",
"AttributeType": "S"
}
],
"KinesisStreamSpecification": {
"StreamArn": {
"Fn::GetAtt": [
"Stream790BDEE4",
"Arn"
]
}
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
},
"Conditions": {
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions": {
"Fn::Or": [
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-north-1"
]
},
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-northwest-1"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as cdk from '@aws-cdk/core';
import * as dynamodb from '../lib';

const app = new cdk.App();
const stack = new cdk.Stack(app, 'aws-cdk-dynamodb-kinesis-stream');

const stream = new kinesis.Stream(stack, 'Stream');

new dynamodb.Table(stack, 'Table', {
partitionKey: { name: 'hashKey', type: dynamodb.AttributeType.STRING },
removalPolicy: cdk.RemovalPolicy.DESTROY,
kinesisStream: stream,
});

app.synth();

0 comments on commit 7bc6c6e

Please sign in to comment.