Skip to content

Commit

Permalink
Merge pull request #18056 from ewbankkit/f-r/aws_kinesisanalyticsv2_a…
Browse files Browse the repository at this point in the history
…pplication-start/stop

Start/stop Kinesis Analytics Flink application and add application snapshot resource
  • Loading branch information
breathingdust authored Apr 8, 2021
2 parents 0039723 + 1059afc commit 48ac61a
Show file tree
Hide file tree
Showing 13 changed files with 2,794 additions and 333 deletions.
19 changes: 19 additions & 0 deletions .changelog/18056.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `start_application` attribute
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: `starting_position_configuration` can be specified when starting a SQL application
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `run_configuration` attribute for starting a Flink application
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `force_stop` attribute
```

```release-note:new-resource
aws_kinesisanalyticsv2_application_snapshot
```
72 changes: 70 additions & 2 deletions aws/internal/service/kinesisanalyticsv2/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,86 @@ package finder
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesisanalyticsv2"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

// ApplicationByName returns the application corresponding to the specified name.
func ApplicationByName(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
// ApplicationDetailByName returns the application corresponding to the specified name.
// Returns NotFoundError if no application is found.
func ApplicationDetailByName(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
input := &kinesisanalyticsv2.DescribeApplicationInput{
ApplicationName: aws.String(name),
}

return ApplicationDetail(conn, input)
}

// ApplicationDetail returns the application details corresponding to the specified input.
// Returns NotFoundError if no application is found.
func ApplicationDetail(conn *kinesisanalyticsv2.KinesisAnalyticsV2, input *kinesisanalyticsv2.DescribeApplicationInput) (*kinesisanalyticsv2.ApplicationDetail, error) {
output, err := conn.DescribeApplication(input)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.ApplicationDetail == nil {
return nil, &resource.NotFoundError{
Message: "Empty result",
LastRequest: input,
}
}

return output.ApplicationDetail, nil
}

// SnapshotDetailsByApplicationAndSnapshotNames returns the application snapshot details corresponding to the specified application and snapshot names.
// Returns NotFoundError if no application snapshot is found.
func SnapshotDetailsByApplicationAndSnapshotNames(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
input := &kinesisanalyticsv2.DescribeApplicationSnapshotInput{
ApplicationName: aws.String(applicationName),
SnapshotName: aws.String(snapshotName),
}

return SnapshotDetails(conn, input)
}

// SnapshotDetails returns the application snapshot details corresponding to the specified input.
// Returns NotFoundError if no application snapshot is found.
func SnapshotDetails(conn *kinesisanalyticsv2.KinesisAnalyticsV2, input *kinesisanalyticsv2.DescribeApplicationSnapshotInput) (*kinesisanalyticsv2.SnapshotDetails, error) {
output, err := conn.DescribeApplicationSnapshot(input)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if tfawserr.ErrMessageContains(err, kinesisanalyticsv2.ErrCodeInvalidArgumentException, "does not exist") {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.SnapshotDetails == nil {
return nil, &resource.NotFoundError{
Message: "Empty result",
LastRequest: input,
}
}

return output.SnapshotDetails, nil
}
25 changes: 25 additions & 0 deletions aws/internal/service/kinesisanalyticsv2/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kinesisanalyticsv2

import (
"fmt"
"strings"
)

const applicationSnapshotIDSeparator = "/"

func ApplicationSnapshotCreateID(applicationName, snapshotName string) string {
parts := []string{applicationName, snapshotName}
id := strings.Join(parts, applicationSnapshotIDSeparator)

return id
}

func ApplicationSnapshotParseID(id string) (string, string, error) {
parts := strings.Split(id, applicationSnapshotIDSeparator)

if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
return parts[0], parts[1], nil
}

return "", "", fmt.Errorf("unexpected format for ID (%q), expected application-name%ssnapshot-name", id, applicationSnapshotIDSeparator)
}
36 changes: 24 additions & 12 deletions aws/internal/service/kinesisanalyticsv2/waiter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@ package waiter
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesisanalyticsv2"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalyticsv2/finder"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
applicationStatusNotFound = "NotFound"
applicationStatusUnknown = "Unknown"
)

// ApplicationStatus fetches the Application and its Status
// ApplicationStatus fetches the ApplicationDetail and its Status
func ApplicationStatus(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
application, err := finder.ApplicationByName(conn, name)
applicationDetail, err := finder.ApplicationDetailByName(conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return applicationDetail, aws.StringValue(applicationDetail.ApplicationStatus), nil
}
}

// SnapshotDetailsStatus fetches the SnapshotDetails and its Status
func SnapshotDetailsStatus(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
snapshotDetails, err := finder.SnapshotDetailsByApplicationAndSnapshotNames(conn, applicationName, snapshotName)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, applicationStatusNotFound, nil
if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, applicationStatusUnknown, err
return nil, "", err
}

return application, aws.StringValue(application.ApplicationStatus), nil
return snapshotDetails, aws.StringValue(snapshotDetails.SnapshotStatus), nil
}
}
104 changes: 102 additions & 2 deletions aws/internal/service/kinesisanalyticsv2/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,77 @@ import (
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
ApplicationDeletedTimeout = 5 * time.Minute
ApplicationStartedTimeout = 5 * time.Minute
ApplicationStoppedTimeout = 5 * time.Minute
ApplicationUpdatedTimeout = 5 * time.Minute

SnapshotCreatedTimeout = 5 * time.Minute
SnapshotDeletedTimeout = 5 * time.Minute
)

// ApplicationDeleted waits for an Application to return Deleted
func ApplicationDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string, timeout time.Duration) (*kinesisanalyticsv2.ApplicationDetail, error) {
func ApplicationDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusDeleting},
Target: []string{},
Refresh: ApplicationStatus(conn, name),
Timeout: timeout,
Timeout: ApplicationDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationStarted waits for an Application to start
func ApplicationStarted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusStarting},
Target: []string{kinesisanalyticsv2.ApplicationStatusRunning},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationStartedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationStopped waits for an Application to stop
func ApplicationStopped(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusForceStopping, kinesisanalyticsv2.ApplicationStatusStopping},
Target: []string{kinesisanalyticsv2.ApplicationStatusReady},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationStoppedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationUpdated waits for an Application to return Deleted
func ApplicationUpdated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusUpdating},
Target: []string{kinesisanalyticsv2.ApplicationStatusReady, kinesisanalyticsv2.ApplicationStatusRunning},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationUpdatedTimeout,
}

outputRaw, err := stateConf.WaitForState()
Expand Down Expand Up @@ -75,3 +139,39 @@ func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {

return output, nil
}

// SnapshotCreated waits for a Snapshot to return Created
func SnapshotCreated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.SnapshotStatusCreating},
Target: []string{kinesisanalyticsv2.SnapshotStatusReady},
Refresh: SnapshotDetailsStatus(conn, applicationName, snapshotName),
Timeout: SnapshotCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.SnapshotDetails); ok {
return v, err
}

return nil, err
}

// SnapshotDeleted waits for a Snapshot to return Deleted
func SnapshotDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.SnapshotStatusDeleting},
Target: []string{},
Refresh: SnapshotDetailsStatus(conn, applicationName, snapshotName),
Timeout: SnapshotDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.SnapshotDetails); ok {
return v, err
}

return nil, err
}
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ func Provider() *schema.Provider {
"aws_key_pair": resourceAwsKeyPair(),
"aws_kinesis_analytics_application": resourceAwsKinesisAnalyticsApplication(),
"aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(),
"aws_kinesisanalyticsv2_application_snapshot": resourceAwsKinesisAnalyticsV2ApplicationSnapshot(),
"aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(),
Expand Down
Loading

0 comments on commit 48ac61a

Please sign in to comment.