Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions-tasks): Support for Athena APIs: StartQueryExecution, StopQueryExeuction, GetQueryResults and GetQueryExecution #11045

Merged
merged 7 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ This module is part of the [AWS Cloud Development Kit](https://github.com/aws/aw
- [ResultPath](#resultpath)
- [Parameters](#task-parameters-from-the-state-json)
- [Evaluate Expression](#evaluate-expression)
- [Athena](#athena)
- [StartQueryExecution](#startQueryExecution)
- [GetQueryExecution](#getQueryExecution)
- [GetQueryResults](#getQueryResults)
- [StopQueryExecution](#stopQueryExecution)
- [Batch](#batch)
- [SubmitJob](#submitjob)
- [CodeBuild](#codebuild)
Expand Down Expand Up @@ -205,6 +210,72 @@ The `EvaluateExpression` supports a `runtime` prop to specify the Lambda
runtime to use to evaluate the expression. Currently, the only runtime
supported is `lambda.Runtime.NODEJS_10_X`.


## Athena

Step Functions supports [Athena](https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html) through the service integration pattern.

### StartQueryExecution

The [StartQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StartQueryExecution.html) API runs the SQL query statement.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const startQueryExecutionJob = new tasks.AthenaStartQueryExecution(stack, 'Start Athena Query', {
queryString: sfn.JsonPath.stringAt('$.queryString'),
queryExecutionContext: {
database: 'mydatabase',
},
resultConfiguration: {
encryptionConfiguration: {
encryptionOption: tasks.EncryptionOption.S3_MANAGED,
},
outputLocation: sfn.JsonPath.stringAt('$.outputLocation'),
},
});
```

### GetQueryExecution

The [GetQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryExecution.html) API gets information about a single execution of a query.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const getQueryExecutionJob = new tasks.AthenaGetQueryExecution(stack, 'Get Query Execution', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

### GetQueryResults

The [GetQueryResults](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) API that streams the results of a single query execution specified by QueryExecutionId from S3.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const getQueryResultsJob = new tasks.AthenaGetQueryResults(stack, 'Get Query Results', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

### StopQueryExecution

The [StopQueryExecution](https://docs.aws.amazon.com/athena/latest/APIReference/API_StopQueryExecution.html) API that stops a query execution.

```ts
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from `@aws-cdk/aws-stepfunctions-tasks`;

const stopQueryExecutionJob = new tasks.AthenaStopQueryExecution(stack, 'Stop Query Execution', {
queryExecutionId: sfn.JsonPath.stringAt('$.QueryExecutionId'),
});
```

## Batch

Step Functions supports [Batch](https://docs.aws.amazon.com/step-functions/latest/dg/connect-batch.html) through the service integration pattern.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for getting a Query Execution
* @experimental
*/
export interface AthenaGetQueryExecutionProps extends sfn.TaskStateBaseProps {
/**
* Query that will be retrieved
shivlaks marked this conversation as resolved.
Show resolved Hide resolved
*
* @example 'adfsaf-23trf23-f23rt23'
*/
readonly queryExecutionId: string;
}

/**
* Get an Athena Query Execution as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html
* @experimental
*/
export class AthenaGetQueryExecution extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: AthenaGetQueryExecutionProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, AthenaGetQueryExecution.SUPPORTED_INTEGRATION_PATTERNS);

this.taskPolicies = [
new iam.PolicyStatement({
resources: ['*'], // Grant access to all workgroups as it can not be specified in the request https://docs.aws.amazon.com/athena/latest/ug/workgroups-iam-policy.html
actions: ['athena:getQueryExecution'],
}),
];
}

/**
* Provides the Athena get query execution service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('athena', 'getQueryExecution', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueryExecutionId: this.props.queryExecutionId,
}),
};
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import { Construct } from 'constructs';
import { integrationResourceArn, validatePatternSupported } from '../private/task-utils';

/**
* Properties for getting a Query Results
* @experimental
*/
export interface AthenaGetQueryResultsProps extends sfn.TaskStateBaseProps {
/**
* Query that will be retrieved
*
* @example 'adfsaf-23trf23-f23rt23'
*/
readonly queryExecutionId: string;

/**
* Pagination token
*
* @default - No next token
*/
readonly nextToken?: string;

/**
* Max number of results
*
* @default 1000
*/
readonly maxResults?: number;
}

/**
* Get an Athena Query Results as a Task
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-athena.html
* @experimental
*/
export class AthenaGetQueryResults extends sfn.TaskStateBase {

private static readonly SUPPORTED_INTEGRATION_PATTERNS: sfn.IntegrationPattern[] = [
sfn.IntegrationPattern.REQUEST_RESPONSE,
sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
];

protected readonly taskMetrics?: sfn.TaskMetricsConfig;
protected readonly taskPolicies?: iam.PolicyStatement[];

private readonly integrationPattern: sfn.IntegrationPattern;

constructor(scope: Construct, id: string, private readonly props: AthenaGetQueryResultsProps) {
super(scope, id, props);
this.integrationPattern = props.integrationPattern ?? sfn.IntegrationPattern.REQUEST_RESPONSE;

validatePatternSupported(this.integrationPattern, AthenaGetQueryResults.SUPPORTED_INTEGRATION_PATTERNS);

const policyStatements = [
new iam.PolicyStatement({
resources: ['*'], // Workgroup can not be specified in the request https://docs.aws.amazon.com/athena/latest/ug/workgroups-iam-policy.html
actions: ['athena:getQueryResults'],
}),
];

policyStatements.push(
new iam.PolicyStatement({
actions: ['s3:GetObject'],
resources: ['*'], // To stream query results successfully the IAM principal must have permissions to the Amazon S3 GetObject action for the Athena query results location https://docs.amazonaws.cn/en_us/athena/latest/APIReference/API_GetQueryResults.html
}),
);

this.taskPolicies = policyStatements;
}

/**
* Provides the Athena get query results service integration task configuration
* @internal
*/
protected _renderTask(): any {
return {
Resource: integrationResourceArn('athena', 'getQueryResults', this.integrationPattern),
Parameters: sfn.FieldUtils.renderObject({
QueryExecutionId: this.props.queryExecutionId,
NextToken: this.props.nextToken,
MaxResults: this.props.maxResults,
}),
};
}
}

Loading