Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): add EKS call to SFN-tasks #12779

Merged
merged 12 commits into from
Feb 22, 2021
Merged
33 changes: 33 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [Cancel Step](#cancel-step)
- [Modify Instance Fleet](#modify-instance-fleet)
- [Modify Instance Group](#modify-instance-group)
- [EKS](#eks)
- [Call](#call)
- [Glue](#glue)
- [Glue DataBrew](#glue-databrew)
- [Lambda](#lambda)
Expand Down Expand Up @@ -664,6 +666,37 @@ new tasks.EmrModifyInstanceGroupByName(stack, 'Task', {
});
```

## EKS

Step Functions supports Amazon EKS through the service integration pattern.
The service integration APIs correspond to Amazon EKS APIs.

[Read more](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) about the differences when using these service integrations.

### Call

Read and write Kubernetes resource objects via a Kubernetes API endpoint.
Corresponds to the [`call`](https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html) API in Step Functions Connector.

The following code snippet includes a Task state that uses eks:call to list the pods.

```ts
import * as eks from '@aws-cdk/aws-eks';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

const myEksCluster = new eks.Cluster(this, 'my sample cluster', {
version: eks.KubernetesVersion.V1_18,
clusterName: 'myEksCluster',
});

new tasks.EksCall(stack, 'Call a EKS Endpoint', {
cluster: myEksCluster,
httpMethod: MethodType.GET,
httpPath: '/api/v1/namespaces/default/pods',
});
```

## Glue

Step Functions supports [AWS Glue](https://docs.aws.amazon.com/step-functions/latest/dg/connect-glue.html) through the service integration pattern.
Expand Down
118 changes: 118 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/eks/call.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import * as eks from '@aws-cdk/aws-eks';
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for calling a EKS endpoint with EksCall
* @experimental
*/
export interface EksCallProps extends sfn.TaskStateBaseProps {

/**
* The EKS cluster
*/
readonly cluster: eks.ICluster;

/**
* HTTP method ("GET", "POST", "PUT", ...) part of HTTP request
*/
readonly httpMethod: HttpMethods;

/**
* HTTP path of the Kubernetes REST API operation
shivlaks marked this conversation as resolved.
Show resolved Hide resolved
*/
readonly httpPath: string;

/**
* Query Parameters part of HTTP request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be helpful to include doc links and or purpose of query parameters. the current doc strings don't add much information from the parameter names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query parameters would look something like this: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#-strong-write-operations-cronjob-v1beta1-batch-strong- I'm not sure if it would be helpful to include an example such as this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, I agree. maybe using it in the readme example would be helpful for users though.

* @default - no query parameters
*/
readonly queryParameters?: { [key: string]: string[] };

/**
* Request body part of HTTP request
* @default - No request body
*/
readonly requestBody?: { [key: string]: any };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does not support paths: i.e. how could you supply $.blah
please change the type to sfn.TaskInput and also add a test for it.

the same applies to any parameters that support state input json path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to sfn.TaskInput and added more coverage tests

}

/**
* Call a EKS endpoint as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-eks.html
* @experimental
*/
export class EksCall extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth a comment why task policies are not being set in this integration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in latest commit.


private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: EksCallProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, EksCall.SUPPORTED_INTEGRATION_PATTERNS);
}

/**
* Provides the EKS Call service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('eks', 'call', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
ClusterName: this.props.cluster.clusterName,
CertificateAuthority: this.props.cluster.clusterCertificateAuthorityData,
Endpoint: this.props.cluster.clusterEndpoint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these parameters required for successfully making the call? Note that if ICluster is imported, these properties might not be defined, and will throw an exception.

I would extract these and do:

try {
  const ca = this.props.cluster.clusterCertificateAuthorityData;
} catch (err) {
  // if its required, throw an error here with the right context
  // if its not required, just pass
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I'll add validation for ClusterName, ClusterEndpoint and clusterCertificateAuthorityData, as well as coverage tests for these cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With some testing I think the error thrown from the eks module has the valid context in our case.

for eks:call I'm extracting clusterName, clusterEndpoint, and clusterCertificateAuthorityData from ICluster, for example if clusterEndpoint is not defined, it will throw exception with message "clusterEndpoint" is not defined for this imported cluster, which is sufficient to show the context of the error.

All three are required to make the call, I'll just add coverage test to reflect this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivlaks Do you think "clusterEndpoint" is not defined for this imported cluster is sufficient or more details needs to be added such as Cluster Endpoint is not defined for this imported cluster, this field is required for eks:call task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NovakGu that seems sensible. let's keep it simple. I generally like the format of <validation error><remediation (if any)>

i.e.
The clusterEndpoint property must be specified when using an imported Cluster.

Copy link
Contributor Author

@NovakGu NovakGu Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivlaks @iliapolo
Added validation for clusterEndpoint and clusterCertificateAuthorityData.
clusterName is required when import a cluster or creating a new cluster so I'm not double checking it here.

After changes

  • if an given cluster does not have cluster Endpoint it will throw with error
    The "clusterEndpoint" property must be specified when using an imported Cluster.
  • If an given cluster does not have certAuth it will throw with error
    The "clusterCertificateAuthorityData" property must be specified when using an imported Cluster.

Method: this.props.httpMethod,
Path: this.props.httpPath,
QueryParameters: this.props.queryParameters,
RequestBody: this.props.requestBody,
}),
};
}
}

/**
* Method type of a EKS call
*/
export enum HttpMethods {
/**
* Retrieve data from a server at the specified resource
*/
GET = 'GET',

/**
* Send data to the API endpoint to create or update a resource
*/
POST = 'POST',

/**
* Send data to the API endpoint to update or create a resource
*/
PUT = 'PUT',

/**
* Delete the resource at the specified endpoint
*/
DELETE = 'DELETE',

/**
* Apply partial modifications to the resource
*/
PATCH = 'PATCH',

/**
* Retrieve data from a server at the specified resource without the response body
*/
HEAD = 'HEAD'
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ export * from './athena/stop-query-execution';
export * from './athena/get-query-execution';
export * from './athena/get-query-results';
export * from './databrew/start-job-run';
export * from './eks/call';
shivlaks marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"@aws-cdk/aws-ecr": "0.0.0",
"@aws-cdk/aws-ecr-assets": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-eks": "0.0.0",
"@aws-cdk/aws-glue": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
Expand All @@ -103,6 +104,7 @@
"@aws-cdk/aws-ecr": "0.0.0",
"@aws-cdk/aws-ecr-assets": "0.0.0",
"@aws-cdk/aws-ecs": "0.0.0",
"@aws-cdk/aws-eks": "0.0.0",
"@aws-cdk/aws-glue": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
Expand Down
103 changes: 103 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/test/eks/call.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import * as eks from '@aws-cdk/aws-eks';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Stack } from '@aws-cdk/core';
import { EksCall, HttpMethods } from '../../lib/eks/call';

let stack: Stack;
let cluster: eks.Cluster;

beforeEach(() => {
//GIVEN
stack = new Stack();
cluster = new eks.Cluster(stack, 'Cluster', {
version: eks.KubernetesVersion.V1_18,
clusterName: 'eksCluster',
});
});

test('Call an EKS endpoint', () => {
// WHEN
const task = new EksCall(stack, 'Call', {
cluster: cluster,
httpMethod: HttpMethods.GET,
httpPath: 'path',
requestBody: sfn.TaskInput.fromObject({
RequestBody: 'requestBody',
}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the type of requestBody be sfn.TaskInput ? it's currently not configured in that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. Changed the requestBody to be sfn.TaskInput

});

// THEN
expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::eks:call',
],
],
},
End: true,
Parameters: {
ClusterName: {
Ref: 'Cluster9EE0221C',
},
CertificateAuthority: {
'Fn::GetAtt': [
'Cluster9EE0221C',
'CertificateAuthorityData',
],
},
Endpoint: {
'Fn::GetAtt': [
'Cluster9EE0221C',
'Endpoint',
],
},
Method: 'GET',
Path: 'path',
RequestBody: {
type: 1,
value: {
RequestBody: 'requestBody',
},
},
},
});
});

test('Task throws if RUN_JOB is supplied as service integration pattern', () => {
expect(() => {
new EksCall(stack, 'Call', {
cluster: cluster,
httpMethod: HttpMethods.GET,
httpPath: 'path',
requestBody: sfn.TaskInput.fromObject({
RequestBody: 'requestBody',
}),
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
});
}).toThrow(
/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE. Received: RUN_JOB/,
);
});

test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => {
expect(() => {
new EksCall(stack, 'Call', {
cluster: cluster,
httpMethod: HttpMethods.GET,
httpPath: 'path',
requestBody: sfn.TaskInput.fromObject({
RequestBody: 'requestBody',
}),
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
});
}).toThrow(
/Unsupported service integration pattern. Supported Patterns: REQUEST_RESPONSE. Received: WAIT_FOR_TASK_TOKEN/,
);
});
Loading