Skip to content

Commit

Permalink
feat(stepfunctions-tasks): add EventBridgePutEvents task integration (#…
Browse files Browse the repository at this point in the history
…15165)

Adds support for EventBridge PutEvents as a Step Functions task integration

closes #15033

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
Sumeet-Badyal authored Jun 29, 2021
1 parent 60f6d82 commit 1799f4c
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 0 deletions.
37 changes: 37 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [Modify Instance Group](#modify-instance-group)
- [EKS](#eks)
- [Call](#call)
- [EventBridge](#eventbridge)
- [Put Events](#put-events)
- [Glue](#glue)
- [Glue DataBrew](#glue-databrew)
- [Lambda](#lambda)
Expand Down Expand Up @@ -734,6 +736,41 @@ new tasks.EksCall(stack, 'Call a EKS Endpoint', {
});
```

## EventBridge

Step Functions supports Amazon EventBridge through the service integration pattern.
The service integration APIs correspond to Amazon EventBridge APIs.

[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) about the differences when using these service integrations.

### Put Events

Send events to an EventBridge bus.
Corresponds to the [`put-events`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eventbridge.html) API in Step Functions Connector.

The following code snippet includes a Task state that uses events:putevents to send an event to the default bus.

```ts
import * as events from '@aws-cdk/aws-events';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

const myEventBus = events.EventBus(stack, 'EventBus', {
eventBusName: 'MyEventBus1',
});

new tasks.EventBridgePutEvents(stack, 'Send an event to EventBridge', {
entries: [{
detail: sfn.TaskInput.fromObject({
Message: 'Hello from Step Functions!',
}),
eventBus: myEventBus,
detailType: 'MessageFromStepFunctions',
source: 'step.functions',
}],
});
```

## Glue

Step Functions supports [AWS Glue](https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html) through the service integration pattern.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* An entry to be sent to EventBridge
*
* @see https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEventsRequestEntry.html
*/
export interface EventBridgePutEventsEntry {
/**
* The event body
*
* Can either be provided as an object or as a JSON-serialized string
* @example
* sfn.TaskInput.fromText('{"instance-id": "i-1234567890abcdef0", "state": "terminated"}')
* sfn.TaskInput.fromObject({ Message: 'Hello from Step Functions' })
* sfn.TaskInput.fromJsonPathAt('$.EventDetail')
*/
readonly detail: sfn.TaskInput;

/**
* Used along with the source field to help identify the fields and values expected in the detail field
*
* For example, events by CloudTrail have detail type "AWS API Call via CloudTrail"
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html
*/
readonly detailType: string;

/**
* The event bus the entry will be sent to.
*
* @default - event is sent to account's default event bus
*/
readonly eventBus?: events.IEventBus;

/**
* The service or application that caused this event to be generated
*
* @example 'com.example.service'
* @see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html
*/
readonly source: string;
}

/**
* Properties for sending events with PutEvents
*/
export interface EventBridgePutEventsProps extends sfn.TaskStateBaseProps {
/**
* The entries that will be sent (must be at least 1)
*/
readonly entries: EventBridgePutEventsEntry[];
}

/**
* A StepFunctions Task to send events to an EventBridge event bus
*/
export class EventBridgePutEvents extends sfn.TaskStateBase {
private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: EventBridgePutEventsProps) {
super(scope, id, props);

this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, EventBridgePutEvents.SUPPORTED_INTEGRATION_PATTERNS);

if (this.integrationPattern === sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN) {
if (!sfn.FieldUtils.containsTaskToken(props.entries.map(entry => entry.detail))) {
throw new Error('Task Token is required in `entries`. Use JsonPath.taskToken to set the token.');
}
}

this.validateEntries();

this.taskPolicies = [
new iam.PolicyStatement({
actions: ['events:PutEvents'],
resources: this.eventBusArns,
}),
];
}

/**
* Returns an array of EventBusArn strings based on this.props.entries
*/
private get eventBusArns(): string[] {
return this.props.entries
.map(entry => {
if (entry.eventBus) {
// If an eventBus is provided, use the corresponding ARN
return entry.eventBus.eventBusArn;
} else {
// If neither an eventBus nor eventBusName is provided,
// format the ARN for the default event bus in the account.
return cdk.Stack.of(this).formatArn({
resource: 'event-bus',
resourceName: 'default',
sep: '/',
service: 'events',
});
}
});
}

/**
* Provides the EventBridge put events service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('events', 'putEvents', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
Entries: this.renderEntries(),
}),
};
}

private renderEntries(): Object[] {
return this.props.entries.map(entry => {
if (entry.source?.startsWith('aws')) {
throw new Error('Event source cannot start with "aws."');
} else {
return {
Detail: entry.detail?.value,
DetailType: entry.detailType,
EventBusName: entry.eventBus?.eventBusArn,
Source: entry.source,
};
}
});
}

private validateEntries(): void {
if (this.props.entries.length <= 0) {
throw new Error('Value for property `entries` must be a non-empty array.');
}
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ export * from './athena/get-query-results';
export * from './databrew/start-job-run';
export * from './eks/call';
export * from './apigateway';
export * from './eventbridge/put-events';
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"@aws-cdk/aws-ecr-assets": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-eks": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand All @@ -116,6 +117,7 @@
"@aws-cdk/aws-ecr-assets": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-eks": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
"Resources": {
"EventBus7B8748AA": {
"Type": "AWS::Events::EventBus",
"Properties": {
"Name": "MyEventBus1"
}
},
"StateMachineRoleB840431D": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::Join": [
"",
[
"states.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
}
}
],
"Version": "2012-10-17"
}
}
},
"StateMachineRoleDefaultPolicyDF1E6607": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "events:PutEvents",
"Effect": "Allow",
"Resource": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":events:",
{
"Ref": "AWS::Region"
},
":",
{
"Ref": "AWS::AccountId"
},
":event-bus/default"
]
]
},
{
"Fn::GetAtt": [
"EventBus7B8748AA",
"Arn"
]
}
]
}
],
"Version": "2012-10-17"
},
"PolicyName": "StateMachineRoleDefaultPolicyDF1E6607",
"Roles": [
{
"Ref": "StateMachineRoleB840431D"
}
]
}
},
"StateMachine2E01A3A5": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"RoleArn": {
"Fn::GetAtt": [
"StateMachineRoleB840431D",
"Arn"
]
},
"DefinitionString": {
"Fn::Join": [
"",
[
"{\"StartAt\":\"Put Custom Events\",\"States\":{\"Put Custom Events\":{\"End\":true,\"Type\":\"Task\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::events:putEvents\",\"Parameters\":{\"Entries\":[{\"Detail\":{\"Message\":\"Hello from Step Functions!\"},\"DetailType\":\"MessageFromStepFunctions\",\"Source\":\"step.functions\"},{\"Detail\":{\"Message\":\"Hello from Step Functions!\"},\"DetailType\":\"MessageFromStepFunctions\",\"EventBusName\":\"",
{
"Fn::GetAtt": [
"EventBus7B8748AA",
"Arn"
]
},
"\",\"Source\":\"step.functions\"}]}}},\"TimeoutSeconds\":30}"
]
]
}
},
"DependsOn": [
"StateMachineRoleDefaultPolicyDF1E6607",
"StateMachineRoleB840431D"
]
}
},
"Outputs": {
"stateMachineArn": {
"Value": {
"Ref": "StateMachine2E01A3A5"
}
}
}
}
Loading

0 comments on commit 1799f4c

Please sign in to comment.