From 58b3fe1fbfdf282d3927a541259911299dcdbb10 Mon Sep 17 00:00:00 2001 From: Madeline Kusters Date: Wed, 14 Jul 2021 11:04:48 -0700 Subject: [PATCH] feat(aws-kinesisfirehose): specific metrics functions for DeliveryStream --- .../@aws-cdk/aws-kinesisfirehose/README.md | 37 ++++++++ .../lib/delivery-stream.ts | 65 +++++++++++++ .../test/delivery-stream.test.ts | 95 +++++++++++++++++++ 3 files changed, 197 insertions(+) diff --git a/packages/@aws-cdk/aws-kinesisfirehose/README.md b/packages/@aws-cdk/aws-kinesisfirehose/README.md index f04fe56e8f3da..45c89bd702ad6 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose/README.md +++ b/packages/@aws-cdk/aws-kinesisfirehose/README.md @@ -145,6 +145,43 @@ new DeliveryStream(this, 'Delivery Stream', { See: [Monitoring using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-logs.html) in the *Kinesis Data Firehose Developer Guide*. +### Metrics + +Kinesis Data Firehose sends metrics to CloudWatch so that you can collect and analyze the +performance of the delivery stream, including data delivery, data ingestion, data +transformation, format conversion, API usage, encryption, and resource usage. You can then +use CloudWatch alarms to alert you, for example, when data freshness (the age of the +oldest record in the delivery stream) exceeds the buffering limit (indicating that data is +not being delivered to your destination), or when the rate of incoming records exceeds the +limit of records per second (indicating data is flowing into your delivery stream faster +than it is configured to process). + +CDK provides methods for accessing delivery stream metrics with default configuration, +such as `metricIncomingBytes`, and `metricIncomingRecords` (see [`IDeliveryStream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesisfirehose.IDeliveryStream.html) +for a full list). CDK also provides a generic `metric` method that can be used to produce +metric configurations for any metric provided by Kinesis Data Firehose; the configurations +are pre-populated with the correct dimensions for the delivery stream. + +```ts fixture=with-delivery-stream +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; +// Alarm that triggers when the per-second average of incoming bytes exceeds 90% of the current service limit +const incomingBytesPercentOfLimit = new cloudwatch.MathExpression({ + expression: 'incomingBytes / 300 / bytePerSecLimit', + usingMetrics: { + incomingBytes: deliveryStream.metricIncomingBytes({ statistic: cloudwatch.Statistic.SUM }), + bytePerSecLimit: deliveryStream.metric('BytesPerSecondLimit'), + }, +}); +new Alarm(this, 'Alarm', { + metric: incomingBytesPercentOfLimit, + threshold: 0.9, + evaluationPeriods: 3, +}); +``` + +See: [Monitoring Using CloudWatch Metrics](https://docs.aws.amazon.com/firehose/latest/dev/monitoring-with-cloudwatch-metrics.html) +in the *Kinesis Data Firehose Developer Guide*. + ## Specifying an IAM role The DeliveryStream class automatically creates an IAM role with all the minimum necessary diff --git a/packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts b/packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts index 6a7d40e434507..d319611290e9f 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts @@ -5,6 +5,7 @@ import * as cdk from '@aws-cdk/core'; import { RegionInfo } from '@aws-cdk/region-info'; import { Construct } from 'constructs'; import { IDestination } from './destination'; +import { FirehoseMetrics } from './kinesisfirehose-canned-metrics.generated'; import { CfnDeliveryStream } from './kinesisfirehose.generated'; const PUT_RECORD_ACTIONS = [ @@ -44,6 +45,43 @@ export interface IDeliveryStream extends cdk.IResource, iam.IGrantable, ec2.ICon * Return the given named metric for this delivery stream. */ metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Metric for the number of bytes ingested successfully into the delivery stream over the specified time period after throttling. + * + * By default, this metric will be calculated as an average over a period of 5 minutes. + */ + metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Metric for the number of records ingested successfully into the delivery stream over the specified time period after throttling. + * + * By default, this metric will be calculated as an average over a period of 5 minutes. + */ + metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Metric for the number of bytes delivered to Amazon S3 for backup over the specified time period. + * + * By default, this metric will be calculated as an average over a period of 5 minutes. + */ + metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Metric for the age (from getting into Kinesis Data Firehose to now) of the oldest record in Kinesis Data Firehose. + * + * Any record older than this age has been delivered to the Amazon S3 bucket for backup. + * + * By default, this metric will be calculated as an average over a period of 5 minutes. + */ + metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric; + + /** + * Metric for the number of records delivered to Amazon S3 for backup over the specified time period. + * + * By default, this metric will be calculated as an average over a period of 5 minutes. + */ + metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric; } /** @@ -92,6 +130,33 @@ export abstract class DeliveryStreamBase extends cdk.Resource implements IDelive ...props, }).attachTo(this); } + + public metricIncomingBytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return this.cannedMetric(FirehoseMetrics.incomingBytesAverage, props); + } + + public metricIncomingRecords(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return this.cannedMetric(FirehoseMetrics.incomingRecordsAverage, props); + } + + public metricBackupToS3Bytes(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return this.cannedMetric(FirehoseMetrics.backupToS3BytesAverage, props); + } + + public metricBackupToS3DataFreshness(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return this.cannedMetric(FirehoseMetrics.backupToS3DataFreshnessAverage, props); + } + + public metricBackupToS3Records(props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return this.cannedMetric(FirehoseMetrics.backupToS3RecordsAverage, props); + } + + private cannedMetric(fn: (dims: { DeliveryStreamName: string }) => cloudwatch.MetricProps, props?: cloudwatch.MetricOptions): cloudwatch.Metric { + return new cloudwatch.Metric({ + ...fn({ DeliveryStreamName: this.deliveryStreamName }), + ...props, + }).attachTo(this); + } } /** diff --git a/packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts b/packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts index ae76cc9af7e4c..7b5644d3f4717 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts +++ b/packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts @@ -1,5 +1,6 @@ import '@aws-cdk/assert-internal/jest'; import { ABSENT } from '@aws-cdk/assert-internal'; +import * as cloudwatch from '@aws-cdk/aws-cloudwatch'; import * as ec2 from '@aws-cdk/aws-ec2'; import * as iam from '@aws-cdk/aws-iam'; import * as cdk from '@aws-cdk/core'; @@ -156,6 +157,100 @@ describe('delivery stream', () => { }, }); }); + test('metricIncomingBytes', () => { + const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', { + destinations: [mockS3Destination], + }); + + const metric = deliveryStream.metricIncomingBytes(); + + expect(metric).toMatchObject({ + account: stack.account, + region: stack.region, + namespace: 'AWS/Firehose', + metricName: 'IncomingBytes', + statistic: cloudwatch.Statistic.AVERAGE, + dimensions: { + DeliveryStreamName: deliveryStream.deliveryStreamName, + }, + }); + }); + + test('metricIncomingRecords', () => { + const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', { + destinations: [mockS3Destination], + }); + + const metric = deliveryStream.metricIncomingRecords(); + + expect(metric).toMatchObject({ + account: stack.account, + region: stack.region, + namespace: 'AWS/Firehose', + metricName: 'IncomingRecords', + statistic: cloudwatch.Statistic.AVERAGE, + dimensions: { + DeliveryStreamName: deliveryStream.deliveryStreamName, + }, + }); + }); + + test('metricBackupToS3Bytes', () => { + const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', { + destinations: [mockS3Destination], + }); + + const metric = deliveryStream.metricBackupToS3Bytes(); + + expect(metric).toMatchObject({ + account: stack.account, + region: stack.region, + namespace: 'AWS/Firehose', + metricName: 'BackupToS3.Bytes', + statistic: cloudwatch.Statistic.AVERAGE, + dimensions: { + DeliveryStreamName: deliveryStream.deliveryStreamName, + }, + }); + }); + + test('metricBackupToS3DataFreshness', () => { + const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', { + destinations: [mockS3Destination], + }); + + const metric = deliveryStream.metricBackupToS3DataFreshness(); + + expect(metric).toMatchObject({ + account: stack.account, + region: stack.region, + namespace: 'AWS/Firehose', + metricName: 'BackupToS3.DataFreshness', + statistic: cloudwatch.Statistic.AVERAGE, + dimensions: { + DeliveryStreamName: deliveryStream.deliveryStreamName, + }, + }); + }); + + test('metricBackupToS3Records', () => { + const deliveryStream = new firehose.DeliveryStream(stack, 'Delivery Stream', { + destinations: [mockS3Destination], + }); + + const metric = deliveryStream.metricBackupToS3Records(); + + expect(metric).toMatchObject({ + account: stack.account, + region: stack.region, + namespace: 'AWS/Firehose', + metricName: 'BackupToS3.Records', + statistic: cloudwatch.Statistic.AVERAGE, + dimensions: { + DeliveryStreamName: deliveryStream.deliveryStreamName, + }, + }); + }); }); test('allows connections for Firehose IP addresses using map when region not specified', () => {