Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes): add LogDestination implementation #31672

Merged
merged 19 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 33 additions & 106 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@


EventBridge Pipes let you create source to target connections between several
aws services. While transporting messages from a source to a target the messages
AWS services. While transporting messages from a source to a target the messages
can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the service documentation:

[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand All @@ -34,12 +32,12 @@ is a fully managed service that enables point-to-point integrations between
event producers and consumers. Pipes can be used to connect several AWS services
to each other, or to connect AWS services to external services.

A Pipe has a source and a target. The source events can be filtered and enriched
A pipe has a source and a target. The source events can be filtered and enriched
before reaching the target.

## Example - pipe usage

> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets.
> The following code examples use an example implementation of a [source](#source) and [target](#target).

To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target.

Expand All @@ -58,49 +56,30 @@ Messages from the source are put into the body of the target message.

## Source

A source is a AWS Service that is polled. The following Sources are
possible:
A source is a AWS Service that is polled. The following sources are possible:

- [Amazon DynamoDB stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-dynamodb.html)
- [Amazon Kinesis stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kinesis.html)
- [Amazon MQ broker](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-mq.html)
- [Amazon MSK stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-msk.html)
- [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)
- [Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)

> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
msambol marked this conversation as resolved.
Show resolved Hide resolved
Currently, DynamoDB, Kinesis, and SQS are supported. If you are interested in support for additional sources,
kindly let us know by opening a GitHub issue or raising a PR.

### Example source implementation
### Example source

```ts fixture=pipes-imports
class SqsSource implements pipes.ISource {
sourceArn: string;
sourceParameters = undefined;

constructor(private readonly queue: sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}

bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}

grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}
```ts
declare const sourceQueue: sqs.Queue;
const pipeSource = new SqsSource(sourceQueue);
```

A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source.

## Filter

A Filter can be used to filter the events from the source before they are
A filter can be used to filter the events from the source before they are
forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible.
If one of the filter expressions matches the event is forwarded to the enrichment or target step.
If one of the filter expressions matches, the event is forwarded to the enrichment or target step.

### Example - filter usage

Expand Down Expand Up @@ -130,7 +109,7 @@ This example shows a filter that only forwards events with the `customerType` B2

You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html)
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

## Input transformation

Expand Down Expand Up @@ -175,7 +154,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following payload.
it is converted into the following payload:

```json
[
Expand All @@ -189,7 +168,7 @@ it is converted into the following payload.
]
```

If the transformation is applied to a target it might be converted to a string representation. E.g. the resulting SQS message body looks like this.
If the transformation is applied to a target it might be converted to a string representation. For example, the resulting SQS message body looks like this:

```json
[
Expand Down Expand Up @@ -237,7 +216,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following target payload.
it is converted into the following target payload:

```json
[
Expand Down Expand Up @@ -283,8 +262,7 @@ This transformation forwards the static text to the target.

## Enrichment

In the enrichment step the (un)filtered payloads from the source can be used to
invoke one of the following services
In the enrichment step the (un)filtered payloads from the source can be used to invoke one of the following services:

- API destination
- Amazon API Gateway
Expand Down Expand Up @@ -420,95 +398,44 @@ targets are supported:
The target event can be transformed before it is forwarded to the target using
the same input transformation as in the enrichment step.

### Example target implementation

> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

```ts fixture=pipes-imports
class SqsTarget implements pipes.ITarget {
targetArn: string;
private inputTransformation: pipes.InputTransformation | undefined;

constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) {
this.queue = queue;
this.targetArn = queue.queueArn;
this.inputTransformation = props?.inputTransformation
}

bind(_pipe: pipes.Pipe): pipes.TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
},
};
}
### Example target

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantSendMessages(pipeRole);
}
}
```ts
declare const targetQueue: sqs.Queue;
const pipeTarget = new SqsTarget(targetQueue);
```

A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment.

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
You can configure multiple destinations, but all the destination share the same log level and log data.
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).

The log level and data that is included in the log events is configured on the pipe class itself.
Whereas the actual destination is defined independent.

### Example log destination implementation
The actual destination is defined independently, and there are three options:

> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the expected approach for this module is to have a new folder for the log destinations, likely akin to how we have aws-pipes-sources-alpha and aws-pipes-targets-alpha. We should probably follow this guideline, but I'll bring it up with the CDK team to confirm and get back to you, since it may add more confusion splitting one module into so many different parts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd prefer to leave these in aws-pipes but open to feedback from the team.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there is an argument to be made about having this be in the aws-pipes-alpha module, but there isn't really much extra cost having a separate aws-pipes-log-destinations-alpha folder, or something of that nature. One of the main reasons these are split the way they are is to prevent the risk of circular dependencies, and I'm not sure how big of a risk that could be in this case.

I think we could leave things as is, but I would lean more towards making a separate module for log destinations, and in the future before graduating this module, see if we can refactor things and consolidate everything into one. Thoughts?

Copy link
Contributor Author

@msambol msambol Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to leave them here. Logging configuration is tied to a pipe, so I think it makes sense. But will of course default to you and the team.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, kinesis is now in dev preview which I think was the concern with circular dependency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how dev preview might be a circular dependency concern; if anything I think the risk with that is, when it's out of dev preview and into GA, the import will need to be updated, but that's expected. It more so applies to the module as a whole, but we can keep the log destination resources here, and investigate refactoring at a later time.



```ts fixture=pipes-imports
class CloudwatchDestination implements pipes.ILogDestination {
parameters: pipes.LogDestinationParameters;

constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
this.logGroup = logGroup;
this.parameters = {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
};
}

bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
return {
parameters: this.parameters,
};
}

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.logGroup.grantWrite(pipeRole);
}
}
```
1. `CloudwatchLogsLogDestination`
2. `FirehoseLogDestination`
3. `S3LogDestination`

### Example log destination usage

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;
declare const loggroup: logs.LogGroup;
declare const logGroup: logs.LogGroup;

const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(logGroup);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue),

logLevel: pipes.LogLevel.TRACE,
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],

logDestinations: [
new CloudwatchDestination(loggroup),
],
logDestinations: [cwlLogDestination],
});
```

This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
Loading