Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Implement ISubscriptionDestination for Kinesis Stream.
  • Loading branch information
Rico Huijbers committed Jul 11, 2018
1 parent 352cc82 commit 8ad77fe
Show file tree
Hide file tree
Showing 22 changed files with 498 additions and 206 deletions.
61 changes: 58 additions & 3 deletions packages/@aws-cdk/kinesis/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Construct, Output, PolicyStatement, Token } from '@aws-cdk/core';
import { IIdentityResource } from '@aws-cdk/iam';
import { Construct, FnConcat, FnSelect, FnSplit, FnSub, Output, PolicyStatement, ServicePrincipal, Stack, Token } from '@aws-cdk/core';
import { IIdentityResource, Role } from '@aws-cdk/iam';
import * as kms from '@aws-cdk/kms';
import { CrossAccountDestination, ISubscriptionDestination, LogGroup, SubscriptionDestination } from '@aws-cdk/logs';
import { cloudformation, StreamArn } from './kinesis.generated';

/**
Expand Down Expand Up @@ -37,7 +38,7 @@ export interface StreamRefProps {
* StreamRef.import(this, 'MyImportedStream', ref);
*
*/
export abstract class StreamRef extends Construct {
export abstract class StreamRef extends Construct implements ISubscriptionDestination {
/**
* Creates a Stream construct that represents an external stream.
*
Expand All @@ -55,11 +56,21 @@ export abstract class StreamRef extends Construct {
*/
public abstract readonly streamArn: StreamArn;

/**
* The name of the stream
*/
public abstract readonly streamName: StreamName;

/**
* Optional KMS encryption key associated with this stream.
*/
public abstract readonly encryptionKey?: kms.EncryptionKeyRef;

/**
* The role that can be used by CloudWatch logs to write to this stream
*/
private cloudWatchLogsRole?: Role;

/**
* Exports this stream from the stack.
*/
Expand Down Expand Up @@ -159,6 +170,45 @@ export abstract class StreamRef extends Construct {
);
}

public subscriptionDestination(sourceLogGroup: LogGroup): SubscriptionDestination {
// Following example from https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
if (!this.cloudWatchLogsRole) {
// Create a role to be assumed by CWL that can write to this stream and pass itself.
this.cloudWatchLogsRole = new Role(this, 'CloudWatchLogsCanPutRecords', {
assumedBy: new ServicePrincipal(new FnSub('logs.${AWS::Region}.amazonaws.com')),
});
this.cloudWatchLogsRole.addToPolicy(new PolicyStatement().addAction('kinesis:PutRecord').addResource(this.streamArn));
this.cloudWatchLogsRole.addToPolicy(new PolicyStatement().addAction('iam:PassRole').addResource(this.cloudWatchLogsRole.roleArn));
}

// We've now made it possible for CloudWatch events to write to us. In case the LogGroup is in a
// different account, we must add a Destination in between as well.
const sourceStack = Stack.find(sourceLogGroup);
const thisStack = Stack.find(this);

// Case considered: if both accounts are undefined, we can't make any assumptions. Better
// to assume we don't need to do anything special.
const sameAccount = sourceStack.env.account === thisStack.env.account;

if (sameAccount) {
return { arn: this.streamArn, role: this.cloudWatchLogsRole };
}

// The destination lives in the target account
const dest = new CrossAccountDestination(this, 'CloudWatchCrossAccountDestination', {
// Unfortunately destinationName is required so we have to invent one that won't conflict.
destinationName: new FnConcat(sourceLogGroup.logGroupName, 'To', this.streamName) as any,
targetArn: this.streamArn,
role: this.cloudWatchLogsRole
});
dest.addToPolicy(new PolicyStatement()
.addAction('logs:PutSubscriptionFilter')
.addAwsAccountPrincipal(sourceStack.env.account)
.addAllResources());

return dest.subscriptionDestination(sourceLogGroup);
}

private grant(identity: IIdentityResource, actions: { streamActions: string[], keyActions: string[] }) {
identity.addToPolicy(new PolicyStatement()
.addResource(this.streamArn)
Expand Down Expand Up @@ -307,12 +357,17 @@ export class StreamName extends Token {}

class ImportedStreamRef extends StreamRef {
public readonly streamArn: StreamArn;
public readonly streamName: StreamName;
public readonly encryptionKey?: kms.EncryptionKeyRef;

constructor(parent: Construct, name: string, props: StreamRefProps) {
super(parent, name);

this.streamArn = props.streamArn;
// ARN always looks like: arn:aws:kinesis:us-east-2:123456789012:stream/mystream
// so we can get the name from the ARN.
this.streamName = new FnSelect(1, new FnSplit('/', this.streamArn));

if (props.encryptionKey) {
this.encryptionKey = kms.EncryptionKeyRef.import(parent, 'Key', props.encryptionKey);
} else {
Expand Down
3 changes: 2 additions & 1 deletion packages/@aws-cdk/kinesis/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"dependencies": {
"@aws-cdk/core": "^0.7.3-beta",
"@aws-cdk/iam": "^0.7.3-beta",
"@aws-cdk/kms": "^0.7.3-beta"
"@aws-cdk/kms": "^0.7.3-beta",
"@aws-cdk/logs": "^0.7.3-beta"
}
}
80 changes: 80 additions & 0 deletions packages/@aws-cdk/kinesis/test/test.subscriptiondestination.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { expect, haveResource } from '@aws-cdk/assert';
import { Stack } from '@aws-cdk/core';
import { FilterPattern, LogGroup, SubscriptionFilter } from '@aws-cdk/logs';
import { Test } from 'nodeunit';
import { Stream } from '../lib';

export = {
'stream can be subscription destination'(test: Test) {
// GIVEN
const stack = new Stack();
const stream = new Stream(stack, 'MyStream');
const logGroup = new LogGroup(stack, 'LogGroup');

// WHEN
new SubscriptionFilter(stack, 'Subscription', {
logGroup,
destination: stream,
filterPattern: FilterPattern.allEvents()
});

// THEN: subscription target is Stream
expect(stack).to(haveResource('AWS::Logs::SubscriptionFilter', {
DestinationArn: { "Fn::GetAtt": [ "MyStream5C050E93", "Arn" ] },
RoleArn: { "Fn::GetAtt": [ "MyStreamCloudWatchLogsCanPutRecords58498490", "Arn" ] },
}));

// THEN: we have a role to write to the Lambda
expect(stack).to(haveResource('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Statement: [{
Action: "sts:AssumeRole",
Principal: { Service: { "Fn::Sub": "logs.${AWS::Region}.amazonaws.com" }}
}],
}
}));

expect(stack).to(haveResource('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: "kinesis:PutRecord",
Effect: "Allow",
Resource: { "Fn::GetAtt": [ "MyStream5C050E93", "Arn" ] }
},
{
Action: "iam:PassRole",
Effect: "Allow",
Resource: { "Fn::GetAtt": [ "MyStreamCloudWatchLogsCanPutRecords58498490", "Arn" ] }
}
],
}
}));

test.done();
},

'cross-account stream can be subscription destination with Destination'(test: Test) {
// GIVEN
const sourceStack = new Stack(undefined, undefined, { env: { account: '12345' }});
const logGroup = new LogGroup(sourceStack, 'LogGroup');

const destStack = new Stack(undefined, undefined, { env: { account: '67890' }});
const stream = new Stream(destStack, 'MyStream');

// WHEN
new SubscriptionFilter(sourceStack, 'Subscription', {
logGroup,
destination: stream,
filterPattern: FilterPattern.allEvents()
});

// THEN: the source stack has a Destination object that the subscription points to
expect(destStack).to(haveResource('AWS::Logs::Destination', {
TargetArn: { "Fn::GetAtt": [ "MyStream5C050E93", "Arn" ] },
RoleArn: { "Fn::GetAtt": [ "MyStreamCloudWatchLogsCanPutRecords58498490", "Arn" ] },
}));

test.done();
}
};
25 changes: 13 additions & 12 deletions packages/@aws-cdk/lambda/lib/lambda-ref.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AccountPrincipal, Arn, Construct, FnSelect, FnSplit, FnSub,
PolicyPrincipal, PolicyStatement, resolve, ServicePrincipal, Token } from '@aws-cdk/core';
import { EventRuleTarget, IEventRuleTarget } from '@aws-cdk/events';
import { Role } from '@aws-cdk/iam';
import { ISubscriptionDestination, SubscriptionDestinationProps } from '@aws-cdk/logs';
import { ISubscriptionDestination, LogGroup, LogGroupArn, SubscriptionDestination } from '@aws-cdk/logs';
import { cloudformation, FunctionArn } from './lambda.generated';
import { LambdaPermission } from './permission';

Expand Down Expand Up @@ -117,7 +117,7 @@ export abstract class LambdaRef extends Construct implements IEventRuleTarget, I
/**
* Indicates if the policy that allows CloudWatch logs to publish to this topic has been added.
*/
private logSubscriptionDestinationPolicyAdded = false;
private logSubscriptionDestinationPolicyAddedFor: LogGroupArn[] = [];

/**
* Adds a permission to the Lambda resource policy.
Expand Down Expand Up @@ -218,20 +218,21 @@ export abstract class LambdaRef extends Construct implements IEventRuleTarget, I
return this.metric('Throttles', { statistic: 'sum', ...props });
}

public get subscriptionDestinationProps(): SubscriptionDestinationProps {
if (!this.logSubscriptionDestinationPolicyAdded) {
// FIXME: this limits to the same region, which shouldn't really be an issue.
// Wildcards in principals are unfortunately not supported.
public subscriptionDestination(sourceLogGroup: LogGroup): SubscriptionDestination {
const arn = sourceLogGroup.logGroupArn;

if (this.logSubscriptionDestinationPolicyAddedFor.indexOf(arn) === -1) {
// NOTE: the use of {AWS::Region} limits this to the same region, which shouldn't really be an issue,
// since the Lambda must be in the same region as the SubscriptionFilter anyway.
//
// Whitelisting the whole of CWL is not as secure as the example in
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#LambdaFunctionExample
// (which also limits on source ARN) but this is far simpler and we trust CloudWatch Logs.
// (Wildcards in principals are unfortunately not supported.
this.addPermission('InvokedByCloudWatchLogs', {
principal: new ServicePrincipal(new FnSub('logs.${AWS::Region}.amazonaws.com'))
principal: new ServicePrincipal(new FnSub('logs.${AWS::Region}.amazonaws.com')),
sourceArn: arn
});
this.logSubscriptionDestinationPolicyAdded = true;
this.logSubscriptionDestinationPolicyAddedFor.push(arn);
}
return new SubscriptionDestinationProps(this.functionArn);
return { arn: this.functionArn };
}

private parsePermissionPrincipal(principal?: PolicyPrincipal) {
Expand Down
4 changes: 2 additions & 2 deletions packages/@aws-cdk/lambda/test/test.subscriptiondestination.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect, haveResource } from '@aws-cdk/assert';
import { Stack } from '@aws-cdk/core';
import { LogGroup, LogPattern, SubscriptionFilter } from '@aws-cdk/logs';
import { FilterPattern, LogGroup, SubscriptionFilter } from '@aws-cdk/logs';
import { Test } from 'nodeunit';
import { Lambda, LambdaInlineCode, LambdaRuntime } from '../lib';

Expand All @@ -19,7 +19,7 @@ export = {
new SubscriptionFilter(stack, 'Subscription', {
logGroup,
destination: lambda,
logPattern: LogPattern.allEvents()
filterPattern: FilterPattern.allEvents()
});

// THEN: subscription target is Lambda
Expand Down
58 changes: 34 additions & 24 deletions packages/@aws-cdk/logs/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## AWS CloudWatch Logs Construct Library

This library supplies Constructs for working with CloudWatch Logs.
This library supplies constructs for working with CloudWatch Logs.

### Log Groups/Streams

Expand Down Expand Up @@ -43,7 +43,7 @@ const logGroup = new LogGroup(this, 'LogGroup', { ... });
new SubscriptionFilter(this, 'Subscription', {
logGroup,
destination: lambda,
logPattern: LogPattern.allTerms("ERROR", "MainThread")
filterPattern: FilterPattern.allTerms("ERROR", "MainThread")
});
```

Expand Down Expand Up @@ -73,13 +73,13 @@ are three types of patterns:
* JSON patterns
* Space-delimited table patterns

All patterns are constructed by using static functions on the `LogPattern`
All patterns are constructed by using static functions on the `FilterPattern`
class.

In addition to the patterns above, the following special patterns exist:

* `LogPattern.allEvents()`: matches all log events.
* `LogPattern.literal(string)`: if you already know what pattern expression to
* `FilterPattern.allEvents()`: matches all log events.
* `FilterPattern.literal(string)`: if you already know what pattern expression to
use, this function takes a string and will use that as the log pattern. For
more information, see the [Filter and Pattern
Syntax](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html).
Expand All @@ -89,19 +89,24 @@ In addition to the patterns above, the following special patterns exist:
Text patterns match if the literal strings appear in the text form of the log
line.

* `LogPattern.allTerms(term, term, ...)`: matches if all of the given terms
* `FilterPattern.allTerms(term, term, ...)`: matches if all of the given terms
(substrings) appear in the log event.
* `LogPattern.anyGroup([term, term, ...], [term, term, ...], ...)`: matches if
* `FilterPattern.anyTerm(term, term, ...)`: matches if all of the given terms
(substrings) appear in the log event.
* `FilterPattern.anyGroup([term, term, ...], [term, term, ...], ...)`: matches if
all of the terms in any of the groups (specified as arrays) matches. This is
an OR match.


Examples:

```ts
const pattern1 = LogPattern.allTerms('ERROR', 'MainThread');
// Search for lines that contain both "ERROR" and "MainThread"
const pattern1 = FilterPattern.allTerms('ERROR', 'MainThread');

const pattern2 = LogPattern.anyGroup(
// Search for lines that either contain both "ERROR" and "MainThread", or
// both "WARN" and "Deadlock".
const pattern2 = FilterPattern.anyGroup(
['ERROR', 'MainThread'],
['WARN', 'Deadlock'],
);
Expand All @@ -122,34 +127,37 @@ fields.
Fields in the JSON structure are identified by identifier the complete object as `$`
and then descending into it, such as `$.field` or `$.list[0].field`.

* `LogPattern.stringValue(field, comparison, string)`: matches if the given
* `FilterPattern.stringValue(field, comparison, string)`: matches if the given
field compares as indicated with the given string value.
* `LogPattern.numberValue(field, comparison, number)`: matches if the given
* `FilterPattern.numberValue(field, comparison, number)`: matches if the given
field compares as indicated with the given numerical value.
* `LogPattern.isNull(field)`: matches if the given field exists and has the
* `FilterPattern.isNull(field)`: matches if the given field exists and has the
value `null`.
* `LogPattern.notExists(field)`: matches if the given field is not in the JSON
* `FilterPattern.notExists(field)`: matches if the given field is not in the JSON
structure.
* `LogPattern.exists(field)`: matches if the given field is in the JSON
* `FilterPattern.exists(field)`: matches if the given field is in the JSON
structure.
* `LogPattern.booleanValue(field, boolean)`: matches if the given field
* `FilterPattern.booleanValue(field, boolean)`: matches if the given field
is exactly the given boolean value.
* `LogPattern.all(jsonPattern, jsonPattern, ...)`: matches if all of the
* `FilterPattern.all(jsonPattern, jsonPattern, ...)`: matches if all of the
given JSON patterns match. This makes an AND combination of the given
patterns.
* `LogPattern.any(jsonPattern, jsonPattern, ...)`: matches if any of the
* `FilterPattern.any(jsonPattern, jsonPattern, ...)`: matches if any of the
given JSON patterns match. This makes an OR combination of the given
patterns.


Example:

```ts
const pattern = LogPattern.all(
LogPattern.stringValue('$.component', '=', 'HttpServer'),
LogPattern.any(
LogPattern.booleanValue('$.error', true),
LogPattern.numberValue('$.latency', '>', 1000)
// Search for all events where the component field is equal to
// "HttpServer" and either error is true or the latency is higher
// than 1000.
const pattern = FilterPattern.all(
FilterPattern.stringValue('$.component', '=', 'HttpServer'),
FilterPattern.any(
FilterPattern.booleanValue('$.error', true),
FilterPattern.numberValue('$.latency', '>', 1000)
));
```

Expand All @@ -163,7 +171,7 @@ logs.
Text that is surrounded by `"..."` quotes or `[...]` square brackets will
be treated as one column.

* `LogPattern.spaceDelimited(column, column, ...)`: construct a
* `FilterPattern.spaceDelimited(column, column, ...)`: construct a
`SpaceDelimitedTextPattern` object with the indicated columns. The columns
map one-by-one the columns found in the log event. The string `"..."` may
be used to specify an arbitrary number of unnamed columns anywhere in the
Expand All @@ -182,7 +190,9 @@ Multiple restrictions can be added on the same column; they must all apply.
Example:

```ts
const pattern = LogPattern.spaceDelimited('time', 'component', '...', 'result_code', 'latency')
// Search for all events where the component is "HttpServer" and the
// result code is not equal to 200.
const pattern = FilterPattern.spaceDelimited('time', 'component', '...', 'result_code', 'latency')
.whereString('component', '=', 'HttpServer')
.whereNumber('result_code', '!=', 200);
```
Loading

0 comments on commit 8ad77fe

Please sign in to comment.