From 7bc6c6eb14ee73c490caa649aeff509e34eb2c52 Mon Sep 17 00:00:00 2001 From: Ross Date: Mon, 21 Jun 2021 21:10:17 +0100 Subject: [PATCH] feat(dynamodb): allow using Kinesis stream in Table (#15199) Add support for Kinesis Streams in DynamoDB tables with a new `kinesisStreamArn` table option. Closes #14534 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- packages/@aws-cdk/aws-dynamodb/README.md | 16 ++++ packages/@aws-cdk/aws-dynamodb/lib/table.ts | 9 +++ packages/@aws-cdk/aws-dynamodb/package.json | 2 + .../aws-dynamodb/test/dynamodb.test.ts | 8 ++ ...nteg.dynamodb.kinesis-stream.expected.json | 76 +++++++++++++++++++ .../test/integ.dynamodb.kinesis-stream.ts | 16 ++++ 6 files changed, 127 insertions(+) create mode 100644 packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json create mode 100644 packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.ts diff --git a/packages/@aws-cdk/aws-dynamodb/README.md b/packages/@aws-cdk/aws-dynamodb/README.md index a9391d2534673..3355f8b41b508 100644 --- a/packages/@aws-cdk/aws-dynamodb/README.md +++ b/packages/@aws-cdk/aws-dynamodb/README.md @@ -185,3 +185,19 @@ const { partitionKey, sortKey } = table.schema(); const { partitionKey, sortKey } = table.schema(INDEX_NAME); ``` + +## Kinesis Stream + +A Kinesis Data Stream can be configured on the DynamoDB table to capture item-level changes. + +```ts +import * as dynamodb from '@aws-cdk/aws-dynamodb'; +import * as kinesis from '@aws-cdk/aws-kinesis'; + +const stream = new kinesis.Stream(this, 'Stream'); + +const table = new dynamodb.Table(this, 'Table', { + partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING }, + kinesisStream: stream, +}); +``` diff --git a/packages/@aws-cdk/aws-dynamodb/lib/table.ts b/packages/@aws-cdk/aws-dynamodb/lib/table.ts index 4d5b31460d686..a2b1aae5ee2eb 100644 --- a/packages/@aws-cdk/aws-dynamodb/lib/table.ts +++ b/packages/@aws-cdk/aws-dynamodb/lib/table.ts @@ -1,6 +1,7 @@ import * as appscaling from '@aws-cdk/aws-applicationautoscaling'; import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; import * as iam from '@aws-cdk/aws-iam'; +import * as kinesis from '@aws-cdk/aws-kinesis'; import * as kms from '@aws-cdk/aws-kms'; import { Aws, CfnCondition, CfnCustomResource, CfnResource, CustomResource, Duration, @@ -247,6 +248,13 @@ export interface TableProps extends TableOptions { * @default */ readonly tableName?: string; + + /** + * Kinesis Data Stream to capture item-level changes for the table. + * + * @default - no Kinesis Data Stream + */ + readonly kinesisStream?: kinesis.IStream; } /** @@ -1116,6 +1124,7 @@ export class Table extends TableBase { streamSpecification, timeToLiveSpecification: props.timeToLiveAttribute ? { attributeName: props.timeToLiveAttribute, enabled: true } : undefined, contributorInsightsSpecification: props.contributorInsightsEnabled !== undefined ? { enabled: props.contributorInsightsEnabled } : undefined, + kinesisStreamSpecification: props.kinesisStream ? { streamArn: props.kinesisStream.streamArn } : undefined, }); this.table.applyRemovalPolicy(props.removalPolicy); diff --git a/packages/@aws-cdk/aws-dynamodb/package.json b/packages/@aws-cdk/aws-dynamodb/package.json index 82e23dd8e6c86..84015fdc372fc 100644 --- a/packages/@aws-cdk/aws-dynamodb/package.json +++ b/packages/@aws-cdk/aws-dynamodb/package.json @@ -91,6 +91,7 @@ "@aws-cdk/aws-applicationautoscaling": "0.0.0", "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/core": "0.0.0", @@ -102,6 +103,7 @@ "@aws-cdk/aws-applicationautoscaling": "0.0.0", "@aws-cdk/aws-cloudwatch": "0.0.0", "@aws-cdk/aws-iam": "0.0.0", + "@aws-cdk/aws-kinesis": "0.0.0", "@aws-cdk/aws-kms": "0.0.0", "@aws-cdk/aws-lambda": "0.0.0", "@aws-cdk/core": "0.0.0", diff --git a/packages/@aws-cdk/aws-dynamodb/test/dynamodb.test.ts b/packages/@aws-cdk/aws-dynamodb/test/dynamodb.test.ts index c41771b59b078..de83b596ea1e4 100644 --- a/packages/@aws-cdk/aws-dynamodb/test/dynamodb.test.ts +++ b/packages/@aws-cdk/aws-dynamodb/test/dynamodb.test.ts @@ -2,6 +2,7 @@ import { arrayWith, ABSENT, ResourcePart, SynthUtils } from '@aws-cdk/assert-int import '@aws-cdk/assert-internal/jest'; import * as appscaling from '@aws-cdk/aws-applicationautoscaling'; import * as iam from '@aws-cdk/aws-iam'; +import * as kinesis from '@aws-cdk/aws-kinesis'; import * as kms from '@aws-cdk/aws-kms'; import { App, Aws, CfnDeletionPolicy, ConstructNode, Duration, PhysicalName, RemovalPolicy, Resource, Stack, Tags } from '@aws-cdk/core'; import * as cr from '@aws-cdk/custom-resources'; @@ -320,6 +321,7 @@ describe('default properties', () => { test('when specifying every property', () => { const stack = new Stack(); + const stream = new kinesis.Stream(stack, 'MyStream'); const table = new Table(stack, CONSTRUCT_NAME, { tableName: TABLE_NAME, readCapacity: 42, @@ -332,6 +334,7 @@ test('when specifying every property', () => { partitionKey: TABLE_PARTITION_KEY, sortKey: TABLE_SORT_KEY, contributorInsightsEnabled: true, + kinesisStream: stream, }); Tags.of(table).add('Environment', 'Production'); @@ -356,6 +359,11 @@ test('when specifying every property', () => { Tags: [{ Key: 'Environment', Value: 'Production' }], TimeToLiveSpecification: { AttributeName: 'timeToLive', Enabled: true }, ContributorInsightsSpecification: { Enabled: true }, + KinesisStreamSpecification: { + StreamArn: { + 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'], + }, + }, }, ); }); diff --git a/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json new file mode 100644 index 0000000000000..77d522466ccf5 --- /dev/null +++ b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json @@ -0,0 +1,76 @@ +{ + "Resources": { + "Stream790BDEE4": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "ShardCount": 1, + "RetentionPeriodHours": 24, + "StreamEncryption": { + "Fn::If": [ + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + { + "Ref": "AWS::NoValue" + }, + { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + ] + } + } + }, + "TableCD117FA1": { + "Type": "AWS::DynamoDB::Table", + "Properties": { + "KeySchema": [ + { + "AttributeName": "hashKey", + "KeyType": "HASH" + } + ], + "AttributeDefinitions": [ + { + "AttributeName": "hashKey", + "AttributeType": "S" + } + ], + "KinesisStreamSpecification": { + "StreamArn": { + "Fn::GetAtt": [ + "Stream790BDEE4", + "Arn" + ] + } + }, + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } + }, + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + } + }, + "Conditions": { + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { + "Fn::Or": [ + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-north-1" + ] + }, + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-northwest-1" + ] + } + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.ts b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.ts new file mode 100644 index 0000000000000..ea1c2650c7758 --- /dev/null +++ b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.ts @@ -0,0 +1,16 @@ +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as cdk from '@aws-cdk/core'; +import * as dynamodb from '../lib'; + +const app = new cdk.App(); +const stack = new cdk.Stack(app, 'aws-cdk-dynamodb-kinesis-stream'); + +const stream = new kinesis.Stream(stack, 'Stream'); + +new dynamodb.Table(stack, 'Table', { + partitionKey: { name: 'hashKey', type: dynamodb.AttributeType.STRING }, + removalPolicy: cdk.RemovalPolicy.DESTROY, + kinesisStream: stream, +}); + +app.synth();