diff --git a/packages/@aws-cdk/aws-pipes-alpha/lib/source.ts b/packages/@aws-cdk/aws-pipes-alpha/lib/source.ts index 71899948ee480..b82b44d795f1d 100644 --- a/packages/@aws-cdk/aws-pipes-alpha/lib/source.ts +++ b/packages/@aws-cdk/aws-pipes-alpha/lib/source.ts @@ -1,7 +1,7 @@ import { IRole } from 'aws-cdk-lib/aws-iam'; import { CfnPipe } from 'aws-cdk-lib/aws-pipes'; -import { ITopic, Topic } from 'aws-cdk-lib/aws-sns'; -import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs'; +import { ITopic } from 'aws-cdk-lib/aws-sns'; +import { IQueue } from 'aws-cdk-lib/aws-sqs'; import { IPipe } from './pipe'; /** @@ -135,9 +135,9 @@ export abstract class SourceWithDeadLetterTarget implements ISource { * Grants the pipe role permission to publish to the dead-letter target. */ public grantPush(grantee: IRole, deadLetterTarget?: IQueue | ITopic) { - if (deadLetterTarget instanceof Queue) { + if (this.isIQueue(deadLetterTarget)) { deadLetterTarget.grantSendMessages(grantee); - } else if (deadLetterTarget instanceof Topic) { + } else if (this.isITopic(deadLetterTarget)) { deadLetterTarget.grantPublish(grantee); } } @@ -146,11 +146,19 @@ export abstract class SourceWithDeadLetterTarget implements ISource { * Retrieves the ARN from the dead-letter SQS queue or SNS topic. */ protected getDeadLetterTargetArn(deadLetterTarget?: IQueue | ITopic): string | undefined { - if (deadLetterTarget instanceof Queue) { + if (this.isIQueue(deadLetterTarget)) { return deadLetterTarget.queueArn; - } else if (deadLetterTarget instanceof Topic) { + } else if (this.isITopic(deadLetterTarget)) { return deadLetterTarget.topicArn; } return undefined; } + + private isIQueue(x: any): x is IQueue { + return x && (x as IQueue).queueArn !== undefined; + } + + private isITopic(x: any): x is ITopic { + return x && (x as ITopic).topicArn !== undefined; + } } diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/streamSource.test.ts b/packages/@aws-cdk/aws-pipes-sources-alpha/test/streamSource.test.ts index 9d2d559aeabb0..883eb4601d54a 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/streamSource.test.ts +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/streamSource.test.ts @@ -4,7 +4,7 @@ import { Template } from 'aws-cdk-lib/assertions'; import { AttributeType, ITableV2, StreamViewType, TableV2 } from 'aws-cdk-lib/aws-dynamodb'; import { Topic } from 'aws-cdk-lib/aws-sns'; import { Queue } from 'aws-cdk-lib/aws-sqs'; -import { TestTarget } from './test-classes'; +import { TestQueue, TestTarget, TestTopic } from './test-classes'; import { DynamoDBSourceParameters, DynamoDBStartingPosition } from '../lib'; import { StreamSource } from '../lib/streamSource'; @@ -386,6 +386,86 @@ describe('stream source validations', () => { expect(template.findResources('AWS::IAM::Role')).toMatchSnapshot(); expect(template.findResources('AWS::IAM::Policy')).toMatchSnapshot(); }); + + it('should successfully set DeadLetterTarget for custom class that implements IQueue', () => { + // ARRANGE + const app = new App(); + const stack = new Stack(app, 'TestStack'); + const table = new TableV2(stack, 'MyTable', { + partitionKey: { + name: 'PK', + type: AttributeType.STRING, + }, + dynamoStream: StreamViewType.OLD_IMAGE, + }); + const queue = new TestQueue(stack, 'TestQueue'); + const source = new FakeStreamSource(table, { + startingPosition: DynamoDBStartingPosition.LATEST, + deadLetterTarget: queue, + }); + + new Pipe(stack, 'MyPipe', { + source, + target: new TestTarget(), + }); + + // ACT + const template = Template.fromStack(stack); + + // ASSERT + template.hasResource('AWS::Pipes::Pipe', { + Properties: { + SourceParameters: { + DynamoDBStreamParameters: { + DeadLetterConfig: { + Arn: 'queue-arn', + }, + }, + }, + }, + } ); + expect(queue.grantSendMessages).toHaveBeenCalled(); + }); + + it('should successfully set DeadLetterTarget for custom class that implements ITopic', () => { + // ARRANGE + const app = new App(); + const stack = new Stack(app, 'TestStack'); + const table = new TableV2(stack, 'MyTable', { + partitionKey: { + name: 'PK', + type: AttributeType.STRING, + }, + dynamoStream: StreamViewType.OLD_IMAGE, + }); + const topic = new TestTopic(stack, 'TestTopic'); + const source = new FakeStreamSource(table, { + startingPosition: DynamoDBStartingPosition.LATEST, + deadLetterTarget: topic, + }); + + new Pipe(stack, 'MyPipe', { + source, + target: new TestTarget(), + }); + + // ACT + const template = Template.fromStack(stack); + + // ASSERT + template.hasResource('AWS::Pipes::Pipe', { + Properties: { + SourceParameters: { + DynamoDBStreamParameters: { + DeadLetterConfig: { + Arn: 'test-topic-arn', + }, + }, + }, + }, + } ); + expect(topic.grantPublish).toHaveBeenCalled(); + }); }); class FakeStreamSource extends StreamSource { diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/test-classes.ts b/packages/@aws-cdk/aws-pipes-sources-alpha/test/test-classes.ts index cb523e0c85302..8ba51863c443a 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/test-classes.ts +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/test-classes.ts @@ -1,5 +1,12 @@ import { IPipe, ITarget } from '@aws-cdk/aws-pipes-alpha'; +import { Resource } from 'aws-cdk-lib'; +import { Metric, MetricOptions } from 'aws-cdk-lib/aws-cloudwatch'; +import { NotificationRuleTargetConfig } from 'aws-cdk-lib/aws-codestarnotifications'; +import { AddToResourcePolicyResult, Grant, IGrantable, PolicyStatement } from 'aws-cdk-lib/aws-iam'; import { CfnPipe } from 'aws-cdk-lib/aws-pipes'; +import { ITopic, ITopicSubscription, Subscription } from 'aws-cdk-lib/aws-sns'; +import { IQueue } from 'aws-cdk-lib/aws-sqs'; +import { Construct } from 'constructs'; export class TestTarget implements ITarget { readonly targetArn: string = 'target-arn'; @@ -16,3 +23,61 @@ export class TestTarget implements ITarget { }); } +export class TestQueue extends Resource implements IQueue { + public readonly queueArn: string = 'queue-arn'; + public readonly queueUrl: string = 'https://sqs.fake-region.amazonaws.com/123456789012/fake-queue'; + public readonly queueName: string = 'fake-queue'; + public readonly fifo: boolean = false; + + // Jest mocks for interface methods + grantConsumeMessages = jest.fn(); + grant = jest.fn(); + grantPurge = jest.fn(); + grantSendMessages = jest.fn(); + + addToResourcePolicy = jest.fn(); + + metric = jest.fn(); + metricApproximateAgeOfOldestMessage = jest.fn(); + metricApproximateNumberOfMessagesDelayed = jest.fn(); + metricApproximateNumberOfMessagesNotVisible = jest.fn(); + metricApproximateNumberOfMessagesVisible = jest.fn(); + metricNumberOfMessagesDeleted = jest.fn(); + metricNumberOfEmptyReceives = jest.fn(); + metricNumberOfMessagesReceived = jest.fn(); + metricNumberOfMessagesSent = jest.fn(); + metricSentMessageSize = jest.fn(); + + constructor(scope: Construct, id: string) { + super(scope, id); + } +} + +export class TestTopic extends Resource implements ITopic { + public readonly topicArn = 'test-topic-arn'; + public readonly topicName = 'mock-topic'; + public readonly fifo = false; + public readonly contentBasedDeduplication = false; + + // Jest mock methods to simulate SNS topic behavior + grantPublish = jest.fn(); + grantSubscribe = jest.fn(); + addToResourcePolicy = jest.fn(); + addSubscription = jest.fn(); + bindAsNotificationRuleTarget = jest.fn(); + + metric = jest.fn(); + metricPublishSize = jest.fn(); + metricNumberOfMessagesPublished = jest.fn(); + metricNumberOfNotificationsDelivered = jest.fn(); + metricNumberOfNotificationsFailed = jest.fn(); + metricNumberOfNotificationsFilteredOut = jest.fn(); + metricNumberOfNotificationsFilteredOutInvalidAttributes = jest.fn(); + metricNumberOfNotificationsFilteredOutNoMessageAttributes = jest.fn(); + metricSMSMonthToDateSpentUSD = jest.fn(); + metricSMSSuccessRate = jest.fn(); + + constructor(scope: Construct, id: string) { + super(scope, id); + } +}