Skip to content

Commit

Permalink
feat(aws-kinesisfirehose): specific metrics functions for DeliveryStr…
Browse files Browse the repository at this point in the history
…eam (#15545)

Closes #15543 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
madeline-k authored Jul 16, 2021
1 parent 16a54e5 commit b011392
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
37 changes: 37 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,43 @@ new DeliveryStream(this, 'Delivery Stream', {
See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html)
in the *Kinesis Data Firehose Developer Guide*.

### Metrics

Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the
performance of the delivery stream, including data delivery, data ingestion, data
transformation, format conversion, API usage, encryption, and resource usage. You can then
use CloudWatch alarms to alert you, for example, when data freshness (the age of the
oldest record in the delivery stream) exceeds the buffering limit (indicating that data is
not being delivered to your destination), or when the rate of incoming records exceeds the
limit of records per second (indicating data is flowing into your delivery stream faster
than it is configured to process).

CDK provides methods for accessing delivery stream metrics with default configuration,
such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html)
for a full list). CDK also provides a generic `metric` method that can be used to produce
metric configurations for any metric provided by Kinesis Data Firehose; the configurations
are pre-populated with the correct dimensions for the delivery stream.

```ts fixture=with-delivery-stream
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
// Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit
const incomingBytesPercentOfLimit = new cloudwatch.MathExpression({
expression: 'incomingBytes / 300 / bytePerSecLimit',
usingMetrics: {
incomingBytes: deliveryStream.metricIncomingBytes({ statistic: cloudwatch.Statistic.SUM }),
bytePerSecLimit: deliveryStream.metric('BytesPerSecondLimit'),
},
});
new Alarm(this, 'Alarm', {
metric: incomingBytesPercentOfLimit,
threshold: 0.9,
evaluationPeriods: 3,
});
```

See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html)
in the *Kinesis Data Firehose Developer Guide*.

## Specifying an IAM role

The DeliveryStream class automatically creates an IAM role with all the minimum necessary
Expand Down
65 changes: 65 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as cdk from '@aws-cdk/core';
import { RegionInfo } from '@aws-cdk/region-info';
import { Construct } from 'constructs';
import { IDestination } from './destination';
import { FirehoseMetrics } from './kinesisfirehose-canned-metrics.generated';
import { CfnDeliveryStream } from './kinesisfirehose.generated';

const PUT_RECORD_ACTIONS = [
Expand Down Expand Up @@ -44,6 +45,43 @@ export interface IDeliveryStream extends cdk.IResource, iam.IGrantable, ec2.ICon
* Return the given named metric for this delivery stream.
*/
metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of bytes ingested successfully into the delivery stream over the specified time period after throttling.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of records ingested successfully into the delivery stream over the specified time period after throttling.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of bytes delivered to Amazon S3 for backup over the specified time period.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the age (from getting into Kinesis Data Firehose to now) of the oldest record in Kinesis Data Firehose.
*
* Any record older than this age has been delivered to the Amazon S3 bucket for backup.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric;

/**
* Metric for the number of records delivered to Amazon S3 for backup over the specified time period.
*
* By default, this metric will be calculated as an average over a period of 5 minutes.
*/
metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric;
}

/**
Expand Down Expand Up @@ -90,6 +128,33 @@ export abstract class DeliveryStreamBase extends cdk.Resource implements IDelive
...props,
}).attachTo(this);
}

public metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.incomingBytesAverage, props);
}

public metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.incomingRecordsAverage, props);
}

public metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3BytesAverage, props);
}

public metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3DataFreshnessAverage, props);
}

public metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.cannedMetric(FirehoseMetrics.backupToS3RecordsAverage, props);
}

private cannedMetric(fn: (dims: { DeliveryStreamName: string }) => cloudwatch.MetricProps, props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return new cloudwatch.Metric({
...fn({ DeliveryStreamName: this.deliveryStreamName }),
...props,
}).attachTo(this);
}
}

/**
Expand Down
95 changes: 95 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import '@aws-cdk/assert-internal/jest';
import { ABSENT } from '@aws-cdk/assert-internal';
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as ec2 from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as cdk from '@aws-cdk/core';
Expand Down Expand Up @@ -156,6 +157,100 @@ describe('delivery stream', () => {
},
});
});
test('metricIncomingBytes', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricIncomingBytes();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'IncomingBytes',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricIncomingRecords', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricIncomingRecords();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'IncomingRecords',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3Bytes', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3Bytes();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.Bytes',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3DataFreshness', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3DataFreshness();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.DataFreshness',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});

test('metricBackupToS3Records', () => {
const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
});

const metric = deliveryStream.metricBackupToS3Records();

expect(metric).toMatchObject({
account: stack.account,
region: stack.region,
namespace: 'AWS/Firehose',
metricName: 'BackupToS3.Records',
statistic: cloudwatch.Statistic.AVERAGE,
dimensions: {
DeliveryStreamName: deliveryStream.deliveryStreamName,
},
});
});
});

test('allows connections for Firehose IP addresses using map when region not specified', () => {
Expand Down

0 comments on commit b011392

Please sign in to comment.