Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws-kinesisfirehose): specific metrics functions for DeliveryStream #15545

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,43 @@ new DeliveryStream(this, 'Delivery Stream', {
See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html)
in the *Kinesis Data Firehose Developer Guide*.

### Metrics

Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the
performance of the delivery stream, including data delivery, data ingestion, data
transformation, format conversion, API usage, encryption, and resource usage. You can then
use CloudWatch alarms to alert you, for example, when data freshness (the age of the
oldest record in the delivery stream) exceeds the buffering limit (indicating that data is
not being delivered to your destination), or when the rate of incoming records exceeds the
limit of records per second (indicating data is flowing into your delivery stream faster
than it is configured to process).

CDK provides methods for accessing delivery stream metrics with default configuration,
such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html)
for a full list). CDK also provides a generic `metric` method that can be used to produce
metric configurations for any metric provided by Kinesis Data Firehose; the configurations
are pre-populated with the correct dimensions for the delivery stream.

```ts fixture=with-delivery-stream
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
// Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit
const incomingBytesPercentOfLimit = new cloudwatch.MathExpression({
expression: 'incomingBytes / 300 / bytePerSecLimit',
usingMetrics: {
incomingBytes: deliveryStream.metricIncomingBytes({ statistic: cloudwatch.Statistic.SUM }),
bytePerSecLimit: deliveryStream.metric('BytesPerSecondLimit'),
},
});
new Alarm(this, 'Alarm', {
metric: incomingBytesPercentOfLimit,
threshold: 0.9,
evaluationPeriods: 3,
});
```

See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html)
in the *Kinesis Data Firehose Developer Guide*.

## Specifying an IAM role

The DeliveryStream class automatically creates an IAM role with all the minimum necessary
Expand Down
65 changes: 65 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as cdk from '@aws-cdk/core';
import { RegionInfo } from '@aws-cdk/region-info';
import { Construct } from 'constructs';
import { IDestination } from './destination';
import { FirehoseMetrics } from './kinesisfirehose-canned-metrics.generated';
import { CfnDeliveryStream } from './kinesisfirehose.generated';

const PUT_RECORD_ACTIONS = [
Expand Down Expand Up @@ -44,6 +45,43 @@ export interface IDeliveryStream extends cdk.IResource, iam.IGrantable, ec2.ICon
* Return the given named metric for this delivery stream.
*/
metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of bytes ingested successfully into the delivery stream over the specified time period after throttling.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of records ingested successfully into the delivery stream over the specified time period after throttling.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of bytes delivered to Amazon S3 for backup over the specified time period.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the age (from getting into Kinesis Data Firehose to now) of the oldest record in Kinesis Data Firehose.
*
* Any record older than this age has been delivered to the Amazon S3 bucket for backup.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of records delivered to Amazon S3 for backup over the specified time period.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
}

/**
Expand Down Expand Up @@ -90,6 +128,33 @@ export abstract class DeliveryStreamBase extends cdk.Resource implements IDelive
...props,
}).attachTo(this);
}

public metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.incomingBytesAverage, props);
}

public metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.incomingRecordsAverage, props);
}

public metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3BytesAverage, props);
}

public metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3DataFreshnessAverage, props);
}

public metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3RecordsAverage, props);
}

private cannedMetric(fn: (dims: { DeliveryStreamName: string }) => cloudwatch.MetricProps, props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return new cloudwatch.Metric({
...fn({ DeliveryStreamName: this.deliveryStreamName }),
...props,
}).attachTo(this);
}
}

/**
Expand Down
95 changes: 95 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import '@aws-cdk/assert-internal/jest';
import { ABSENT } from '@aws-cdk/assert-internal';
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as ec2 from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as cdk from '@aws-cdk/core';
Expand Down Expand Up @@ -156,6 +157,100 @@ describe('delivery stream', () => {
},
});
});
test('metricIncomingBytes', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricIncomingBytes();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'IncomingBytes',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricIncomingRecords', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricIncomingRecords();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'IncomingRecords',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3Bytes', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3Bytes();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.Bytes',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3DataFreshness', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3DataFreshness();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.DataFreshness',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3Records', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3Records();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.Records',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});
});

test('allows connections for Firehose IP addresses using map when region not specified', () => {
Expand Down