Skip to content

Commit

Permalink
feat(pipes): add LogDestination implementation (#31672)
Browse files Browse the repository at this point in the history
Closes #31671.
  • Loading branch information
msambol authored and mazyu36 committed Nov 27, 2024
1 parent 5ce9d5e commit 1172781
Show file tree
Hide file tree
Showing 28 changed files with 54,077 additions and 23,400 deletions.
139 changes: 33 additions & 106 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@


EventBridge Pipes let you create source to target connections between several
aws services. While transporting messages from a source to a target the messages
AWS services. While transporting messages from a source to a target the messages
can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the service documentation:

[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand All @@ -34,12 +32,12 @@ is a fully managed service that enables point-to-point integrations between
event producers and consumers. Pipes can be used to connect several AWS services
to each other, or to connect AWS services to external services.

A Pipe has a source and a target. The source events can be filtered and enriched
A pipe has a source and a target. The source events can be filtered and enriched
before reaching the target.

## Example - pipe usage

> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets.
> The following code examples use an example implementation of a [source](#source) and [target](#target).
To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target.

Expand All @@ -58,49 +56,30 @@ Messages from the source are put into the body of the target message.

## Source

A source is a AWS Service that is polled. The following Sources are
possible:
A source is a AWS Service that is polled. The following sources are possible:

- [Amazon DynamoDB stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-dynamodb.html)
- [Amazon Kinesis stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html)
- [Amazon MQ broker](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-mq.html)
- [Amazon MSK stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-msk.html)
- [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)
- [Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)

> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
kindly let us know by opening a GitHub issue or raising a PR.

### Example source implementation
### Example source

```ts fixture=pipes-imports
class SqsSource implements pipes.ISource {
sourceArn: string;
sourceParameters = undefined;

constructor(private readonly queue: sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}

bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}

grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}
```ts
declare const sourceQueue: sqs.Queue;
const pipeSource = new SqsSource(sourceQueue);
```

A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source.

## Filter

A Filter can be used to filter the events from the source before they are
A filter can be used to filter the events from the source before they are
forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible.
If one of the filter expressions matches the event is forwarded to the enrichment or target step.
If one of the filter expressions matches, the event is forwarded to the enrichment or target step.

### Example - filter usage

Expand Down Expand Up @@ -130,7 +109,7 @@ This example shows a filter that only forwards events with the `customerType` B2

You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html)
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

## Input transformation

Expand Down Expand Up @@ -175,7 +154,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following payload.
it is converted into the following payload:

```json
[
Expand All @@ -189,7 +168,7 @@ it is converted into the following payload.
]
```

If the transformation is applied to a target it might be converted to a string representation. E.g. the resulting SQS message body looks like this.
If the transformation is applied to a target it might be converted to a string representation. For example, the resulting SQS message body looks like this:

```json
[
Expand Down Expand Up @@ -237,7 +216,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following target payload.
it is converted into the following target payload:

```json
[
Expand Down Expand Up @@ -283,8 +262,7 @@ This transformation forwards the static text to the target.

## Enrichment

In the enrichment step the (un)filtered payloads from the source can be used to
invoke one of the following services
In the enrichment step the (un)filtered payloads from the source can be used to invoke one of the following services:

- API destination
- Amazon API Gateway
Expand Down Expand Up @@ -420,95 +398,44 @@ targets are supported:
The target event can be transformed before it is forwarded to the target using
the same input transformation as in the enrichment step.

### Example target implementation

> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
```ts fixture=pipes-imports
class SqsTarget implements pipes.ITarget {
targetArn: string;
private inputTransformation: pipes.InputTransformation | undefined;

constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) {
this.queue = queue;
this.targetArn = queue.queueArn;
this.inputTransformation = props?.inputTransformation
}

bind(_pipe: pipes.Pipe): pipes.TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
},
};
}
### Example target

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantSendMessages(pipeRole);
}
}
```ts
declare const targetQueue: sqs.Queue;
const pipeTarget = new SqsTarget(targetQueue);
```

A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment.

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
You can configure multiple destinations, but all the destination share the same log level and log data.
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).

The log level and data that is included in the log events is configured on the pipe class itself.
Whereas the actual destination is defined independent.

### Example log destination implementation
The actual destination is defined independently, and there are three options:

> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

```ts fixture=pipes-imports
class CloudwatchDestination implements pipes.ILogDestination {
parameters: pipes.LogDestinationParameters;

constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
this.logGroup = logGroup;
this.parameters = {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
};
}

bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
return {
parameters: this.parameters,
};
}

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.logGroup.grantWrite(pipeRole);
}
}
```
1. `CloudwatchLogsLogDestination`
2. `FirehoseLogDestination`
3. `S3LogDestination`

### Example log destination usage

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;
declare const loggroup: logs.LogGroup;
declare const logGroup: logs.LogGroup;

const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(logGroup);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue),

logLevel: pipes.LogLevel.TRACE,
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],

logDestinations: [
new CloudwatchDestination(loggroup),
],
logDestinations: [cwlLogDestination],
});
```

This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
Loading

0 comments on commit 1172781

Please sign in to comment.