diff --git a/packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts b/packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts index 4bd7ad60ed8ba..813ac9ba68ac3 100644 --- a/packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts +++ b/packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts @@ -1,20 +1,56 @@ import { randomUUID } from 'crypto'; import { DeliveryStream, DestinationBindOptions, DestinationConfig, IDestination } from '@aws-cdk/aws-kinesisfirehose-alpha'; -// eslint-disable-next-line import/no-extraneous-dependencies -import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha'; -// eslint-disable-next-line import/no-extraneous-dependencies -import { SqsTarget } from '@aws-cdk/aws-pipes-targets-alpha'; import { ExpectedResult, IntegTest } from '@aws-cdk/integ-tests-alpha'; import * as cdk from 'aws-cdk-lib'; import { Code } from 'aws-cdk-lib/aws-lambda'; import { Construct } from 'constructs'; -import { CloudwatchLogsLogDestination, DynamicInput, EnrichmentParametersConfig, FirehoseLogDestination, IEnrichment, IPipe, IncludeExecutionData, InputTransformation, LogLevel, Pipe, S3LogDestination, S3OutputFormat } from '../lib'; +import { CloudwatchLogsLogDestination, DynamicInput, EnrichmentParametersConfig, FirehoseLogDestination, IEnrichment, IPipe, ISource, ITarget, IncludeExecutionData, InputTransformation, LogLevel, Pipe, S3LogDestination, S3OutputFormat, SourceConfig, TargetConfig } from '../lib'; const app = new cdk.App(); const stack = new cdk.Stack(app, 'aws-cdk-pipes'); const sourceQueue = new cdk.aws_sqs.Queue(stack, 'SourceQueue'); const targetQueue = new cdk.aws_sqs.Queue(stack, 'TargetQueue'); +// When this module is promoted from alpha, TestSource should +// be replaced with SqsSource from @aws-cdk/aws-pipes-sources-alpha +class TestSource implements ISource { + sourceArn: string; + sourceParameters = undefined; + constructor(private readonly queue: cdk.aws_sqs.Queue) { + this.queue = queue; + this.sourceArn = queue.queueArn; + } + bind(_pipe: IPipe): SourceConfig { + return { + sourceParameters: this.sourceParameters, + }; + } + grantRead(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantConsumeMessages(pipeRole); + } +} + +// When this module is promoted from alpha, TestTarget should +// be replaced with SqsTarget from @aws-cdk/aws-pipes-targets-alpha +class TestTarget implements ITarget { + targetArn: string; + inputTransformation: InputTransformation = InputTransformation.fromEventPath('$.body'); + constructor(private readonly queue: cdk.aws_sqs.Queue) { + this.queue = queue; + this.targetArn = queue.queueArn; + } + bind(_pipe: Pipe): TargetConfig { + return { + targetParameters: { + inputTemplate: this.inputTransformation.bind(_pipe).inputTemplate, + }, + }; + } + grantPush(pipeRole: cdk.aws_iam.IRole): void { + this.queue.grantSendMessages(pipeRole); + } +} + const enrichmentHandlerCode = 'exports.handler = async (event) => { return event.map( record => ({...record, body: `${record.body}-${record.name}-${record.static}` }) ) };'; const enrichmentLambda = new cdk.aws_lambda.Function(stack, 'EnrichmentLambda', { code: Code.fromInline(enrichmentHandlerCode), @@ -82,10 +118,8 @@ class TestEnrichment implements IEnrichment { const pipe = new Pipe(stack, 'Pipe', { pipeName: 'BaseTestPipe', - source: new SqsSource(sourceQueue), - target: new SqsTarget(targetQueue, { - inputTransformation: InputTransformation.fromEventPath('$.body'), - }), + source: new TestSource(sourceQueue), + target: new TestTarget(targetQueue), enrichment: new TestEnrichment(enrichmentLambda), logLevel: LogLevel.TRACE, logIncludeExecutionData: [IncludeExecutionData.ALL],