Skip to content

Commit

Permalink
Update int test
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Nov 1, 2024
1 parent 3224450 commit 3bd6986
Showing 1 changed file with 43 additions and 9 deletions.
52 changes: 43 additions & 9 deletions packages/@aws-cdk/aws-pipes-alpha/test/integ.pipe.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,56 @@
import { randomUUID } from 'crypto';
import { DeliveryStream, DestinationBindOptions, DestinationConfig, IDestination } from '@aws-cdk/aws-kinesisfirehose-alpha';
// eslint-disable-next-line import/no-extraneous-dependencies
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
// eslint-disable-next-line import/no-extraneous-dependencies
import { SqsTarget } from '@aws-cdk/aws-pipes-targets-alpha';
import { ExpectedResult, IntegTest } from '@aws-cdk/integ-tests-alpha';
import * as cdk from 'aws-cdk-lib';
import { Code } from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import { CloudwatchLogsLogDestination, DynamicInput, EnrichmentParametersConfig, FirehoseLogDestination, IEnrichment, IPipe, IncludeExecutionData, InputTransformation, LogLevel, Pipe, S3LogDestination, S3OutputFormat } from '../lib';
import { CloudwatchLogsLogDestination, DynamicInput, EnrichmentParametersConfig, FirehoseLogDestination, IEnrichment, IPipe, ISource, ITarget, IncludeExecutionData, InputTransformation, LogLevel, Pipe, S3LogDestination, S3OutputFormat, SourceConfig, TargetConfig } from '../lib';

const app = new cdk.App();
const stack = new cdk.Stack(app, 'aws-cdk-pipes');
const sourceQueue = new cdk.aws_sqs.Queue(stack, 'SourceQueue');
const targetQueue = new cdk.aws_sqs.Queue(stack, 'TargetQueue');

// When this module is promoted from alpha, TestSource should
// be replaced with SqsSource from @aws-cdk/aws-pipes-sources-alpha
class TestSource implements ISource {
sourceArn: string;
sourceParameters = undefined;
constructor(private readonly queue: cdk.aws_sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}
bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}
grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}

// When this module is promoted from alpha, TestTarget should
// be replaced with SqsTarget from @aws-cdk/aws-pipes-targets-alpha
class TestTarget implements ITarget {
targetArn: string;
inputTransformation: InputTransformation = InputTransformation.fromEventPath('$.body');
constructor(private readonly queue: cdk.aws_sqs.Queue) {
this.queue = queue;
this.targetArn = queue.queueArn;
}
bind(_pipe: Pipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation.bind(_pipe).inputTemplate,
},
};
}
grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantSendMessages(pipeRole);
}
}

const enrichmentHandlerCode = 'exports.handler = async (event) => { return event.map( record => ({...record, body: `${record.body}-${record.name}-${record.static}` }) ) };';
const enrichmentLambda = new cdk.aws_lambda.Function(stack, 'EnrichmentLambda', {
code: Code.fromInline(enrichmentHandlerCode),
Expand Down Expand Up @@ -82,10 +118,8 @@ class TestEnrichment implements IEnrichment {

const pipe = new Pipe(stack, 'Pipe', {
pipeName: 'BaseTestPipe',
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue, {
inputTransformation: InputTransformation.fromEventPath('$.body'),
}),
source: new TestSource(sourceQueue),
target: new TestTarget(targetQueue),
enrichment: new TestEnrichment(enrichmentLambda),
logLevel: LogLevel.TRACE,
logIncludeExecutionData: [IncludeExecutionData.ALL],
Expand Down

0 comments on commit 3bd6986

Please sign in to comment.