Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions packages/@aws-cdk/aws-pipes-alpha/lib/source.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IRole } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import { ITopic } from 'aws-cdk-lib/aws-sns';
import { IQueue } from 'aws-cdk-lib/aws-sqs';
import { IPipe } from './pipe';

/**
Expand Down Expand Up @@ -135,9 +135,9 @@ export abstract class SourceWithDeadLetterTarget implements ISource {
* Grants the pipe role permission to publish to the dead-letter target.
*/
public grantPush(grantee: IRole, deadLetterTarget?: IQueue | ITopic) {
if (deadLetterTarget instanceof Queue) {
if (this.isIQueue(deadLetterTarget)) {
deadLetterTarget.grantSendMessages(grantee);
} else if (deadLetterTarget instanceof Topic) {
} else if (this.isITopic(deadLetterTarget)) {
deadLetterTarget.grantPublish(grantee);
}
}
Expand All @@ -146,11 +146,19 @@ export abstract class SourceWithDeadLetterTarget implements ISource {
* Retrieves the ARN from the dead-letter SQS queue or SNS topic.
*/
protected getDeadLetterTargetArn(deadLetterTarget?: IQueue | ITopic): string | undefined {
if (deadLetterTarget instanceof Queue) {
if (this.isIQueue(deadLetterTarget)) {
return deadLetterTarget.queueArn;
} else if (deadLetterTarget instanceof Topic) {
} else if (this.isITopic(deadLetterTarget)) {
return deadLetterTarget.topicArn;
}
return undefined;
}

private isIQueue(x: any): x is IQueue {
return x && (x as IQueue).queueArn !== undefined;
}

private isITopic(x: any): x is ITopic {
return x && (x as ITopic).topicArn !== undefined;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Template } from 'aws-cdk-lib/assertions';
import { AttributeType, ITableV2, StreamViewType, TableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { TestTarget } from './test-classes';
import { TestQueue, TestTarget, TestTopic } from './test-classes';
import { DynamoDBSourceParameters, DynamoDBStartingPosition } from '../lib';
import { StreamSource } from '../lib/streamSource';

Expand Down Expand Up @@ -386,6 +386,86 @@ describe('stream source validations', () => {
expect(template.findResources('AWS::IAM::Role')).toMatchSnapshot();
expect(template.findResources('AWS::IAM::Policy')).toMatchSnapshot();
});

it('should successfully set DeadLetterTarget for custom class that implements IQueue', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const table = new TableV2(stack, 'MyTable', {
partitionKey: {
name: 'PK',
type: AttributeType.STRING,
},
dynamoStream: StreamViewType.OLD_IMAGE,
});
const queue = new TestQueue(stack, 'TestQueue');
const source = new FakeStreamSource(table, {
startingPosition: DynamoDBStartingPosition.LATEST,
deadLetterTarget: queue,
});

new Pipe(stack, 'MyPipe', {
source,
target: new TestTarget(),
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResource('AWS::Pipes::Pipe', {
Properties: {
SourceParameters: {
DynamoDBStreamParameters: {
DeadLetterConfig: {
Arn: 'queue-arn',
},
},
},
},
} );
expect(queue.grantSendMessages).toHaveBeenCalled();
});

it('should successfully set DeadLetterTarget for custom class that implements ITopic', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const table = new TableV2(stack, 'MyTable', {
partitionKey: {
name: 'PK',
type: AttributeType.STRING,
},
dynamoStream: StreamViewType.OLD_IMAGE,
});
const topic = new TestTopic(stack, 'TestTopic');
const source = new FakeStreamSource(table, {
startingPosition: DynamoDBStartingPosition.LATEST,
deadLetterTarget: topic,
});

new Pipe(stack, 'MyPipe', {
source,
target: new TestTarget(),
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResource('AWS::Pipes::Pipe', {
Properties: {
SourceParameters: {
DynamoDBStreamParameters: {
DeadLetterConfig: {
Arn: 'test-topic-arn',
},
},
},
},
} );
expect(topic.grantPublish).toHaveBeenCalled();
});
});

class FakeStreamSource extends StreamSource {
Expand Down
65 changes: 65 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/test/test-classes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { IPipe, ITarget } from '@aws-cdk/aws-pipes-alpha';
import { Resource } from 'aws-cdk-lib';
import { Metric, MetricOptions } from 'aws-cdk-lib/aws-cloudwatch';
import { NotificationRuleTargetConfig } from 'aws-cdk-lib/aws-codestarnotifications';
import { AddToResourcePolicyResult, Grant, IGrantable, PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { ITopic, ITopicSubscription, Subscription } from 'aws-cdk-lib/aws-sns';
import { IQueue } from 'aws-cdk-lib/aws-sqs';
import { Construct } from 'constructs';

export class TestTarget implements ITarget {
readonly targetArn: string = 'target-arn';
Expand All @@ -16,3 +23,61 @@ export class TestTarget implements ITarget {
});
}

export class TestQueue extends Resource implements IQueue {
public readonly queueArn: string = 'queue-arn';
public readonly queueUrl: string = 'https://sqs.fake-region.amazonaws.com/123456789012/fake-queue';
public readonly queueName: string = 'fake-queue';
public readonly fifo: boolean = false;

// Jest mocks for interface methods
grantConsumeMessages = jest.fn<Grant, [IGrantable]>();
grant = jest.fn<Grant, [IGrantable, ...string[]]>();
grantPurge = jest.fn<Grant, [IGrantable]>();
grantSendMessages = jest.fn<Grant, [IGrantable]>();

addToResourcePolicy = jest.fn<AddToResourcePolicyResult, [PolicyStatement]>();

metric = jest.fn<Metric, [string, MetricOptions?]>();
metricApproximateAgeOfOldestMessage = jest.fn<Metric, [MetricOptions?]>();
metricApproximateNumberOfMessagesDelayed = jest.fn<Metric, [MetricOptions?]>();
metricApproximateNumberOfMessagesNotVisible = jest.fn<Metric, [MetricOptions?]>();
metricApproximateNumberOfMessagesVisible = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfMessagesDeleted = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfEmptyReceives = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfMessagesReceived = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfMessagesSent = jest.fn<Metric, [MetricOptions?]>();
metricSentMessageSize = jest.fn<Metric, [MetricOptions?]>();

constructor(scope: Construct, id: string) {
super(scope, id);
}
}

export class TestTopic extends Resource implements ITopic {
public readonly topicArn = 'test-topic-arn';
public readonly topicName = 'mock-topic';
public readonly fifo = false;
public readonly contentBasedDeduplication = false;

// Jest mock methods to simulate SNS topic behavior
grantPublish = jest.fn<Grant, [IGrantable]>();
grantSubscribe = jest.fn<Grant, [IGrantable]>();
addToResourcePolicy = jest.fn<AddToResourcePolicyResult, [PolicyStatement]>();
addSubscription = jest.fn<Subscription, [ITopicSubscription]>();
bindAsNotificationRuleTarget = jest.fn<NotificationRuleTargetConfig, []>();

metric = jest.fn<Metric, [string, MetricOptions?]>();
metricPublishSize = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfMessagesPublished = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfNotificationsDelivered = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfNotificationsFailed = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfNotificationsFilteredOut = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfNotificationsFilteredOutInvalidAttributes = jest.fn<Metric, [MetricOptions?]>();
metricNumberOfNotificationsFilteredOutNoMessageAttributes = jest.fn<Metric, [MetricOptions?]>();
metricSMSMonthToDateSpentUSD = jest.fn<Metric, [MetricOptions?]>();
metricSMSSuccessRate = jest.fn<Metric, [MetricOptions?]>();

constructor(scope: Construct, id: string) {
super(scope, id);
}
}
Loading