Skip to content

Commit

Permalink
Merge pull request #27031 from dhegberg/b-add_mwaa_create_environment…
Browse files Browse the repository at this point in the history
…_retry

Add retry for MWAA CreateEnvironment on `ValidationException`
  • Loading branch information
ewbankkit authored Oct 4, 2022
2 parents 8d54b62 + 20766f7 commit 308c570
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 242 deletions.
3 changes: 3 additions & 0 deletions .changelog/27031.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_mwaa_environment: Add custom timeouts
```
231 changes: 176 additions & 55 deletions internal/service/mwaa/environment.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,46 @@
package mwaa

import (
"context"
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/mwaa"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tftags "github.com/hashicorp/terraform-provider-aws/internal/tags"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

const (
propagationTimeout = 2 * time.Minute
)

func ResourceEnvironment() *schema.Resource {
return &schema.Resource{
Create: resourceEnvironmentCreate,
Read: resourceEnvironmentRead,
Update: resourceEnvironmentUpdate,
Delete: resourceEnvironmentDelete,
CreateWithoutTimeout: resourceEnvironmentCreate,
ReadWithoutTimeout: resourceEnvironmentRead,
UpdateWithoutTimeout: resourceEnvironmentUpdate,
DeleteWithoutTimeout: resourceEnvironmentDelete,

Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(120 * time.Minute),
Update: schema.DefaultTimeout(90 * time.Minute),
Delete: schema.DefaultTimeout(90 * time.Minute),
},

Schema: map[string]*schema.Schema{
"airflow_configuration_options": {
Type: schema.TypeMap,
Expand Down Expand Up @@ -240,15 +256,16 @@ func ResourceEnvironment() *schema.Resource {
}
}

func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn
defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig
tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{})))

input := mwaa.CreateEnvironmentInput{
name := d.Get("name").(string)
input := &mwaa.CreateEnvironmentInput{
DagS3Path: aws.String(d.Get("dag_s3_path").(string)),
ExecutionRoleArn: aws.String(d.Get("execution_role_arn").(string)),
Name: aws.String(d.Get("name").(string)),
Name: aws.String(name),
NetworkConfiguration: expandEnvironmentNetworkConfigurationCreate(d.Get("network_configuration").([]interface{})),
SourceBucketArn: aws.String(d.Get("source_bucket_arn").(string)),
}
Expand All @@ -273,6 +290,7 @@ func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
input.LoggingConfiguration = expandEnvironmentLoggingConfiguration(v.([]interface{}))
}

// input.MaxWorkers = aws.Int64(int64(90))
if v, ok := d.GetOk("max_workers"); ok {
input.MaxWorkers = aws.Int64(int64(v.(int)))
}
Expand Down Expand Up @@ -314,41 +332,42 @@ func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
}

log.Printf("[INFO] Creating MWAA Environment: %s", input)
_, err := conn.CreateEnvironment(&input)
/*
Execution roles created just before the MWAA Environment may result in ValidationExceptions
due to IAM permission propagation delays.
*/
_, err := tfresource.RetryWhenAWSErrCodeEqualsContext(ctx, propagationTimeout, func() (interface{}, error) {
return conn.CreateEnvironmentWithContext(ctx, input)
}, mwaa.ErrCodeValidationException, mwaa.ErrCodeInternalServerException)

if err != nil {
return fmt.Errorf("error creating MWAA Environment: %w", err)
return diag.Errorf("creating MWAA Environment (%s): %s", name, err)
}

d.SetId(aws.StringValue(input.Name))
d.SetId(name)

if _, err := waitEnvironmentCreated(conn, d.Id()); err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) creation: %w", d.Id(), err)
if _, err := waitEnvironmentCreated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutCreate)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) create: %s", d.Id(), err)
}

return resourceEnvironmentRead(d, meta)
return resourceEnvironmentRead(ctx, d, meta)
}

func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn
defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig
ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig

log.Printf("[INFO] Reading MWAA Environment: %s", d.Id())

environment, err := findEnvironmentByName(conn, d.Id())

if err != nil {
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) && !d.IsNewResource() {
log.Printf("[WARN] MWAA Environment %q not found, removing from state", d.Id())
d.SetId("")
return nil
}
environment, err := FindEnvironmentByName(ctx, conn, d.Id())

return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] MWAA Environment %s not found, removing from state", d.Id())
d.SetId("")
return nil
}

if environment == nil {
return fmt.Errorf("error reading MWAA Environment (%s): empty response", d.Id())
if err != nil {
return diag.Errorf("reading MWAA Environment (%s): %s", d.Id(), err)
}

d.Set("airflow_configuration_options", aws.StringValueMap(environment.AirflowConfigurationOptions))
Expand All @@ -360,16 +379,16 @@ func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {
d.Set("execution_role_arn", environment.ExecutionRoleArn)
d.Set("kms_key", environment.KmsKey)
if err := d.Set("last_updated", flattenLastUpdate(environment.LastUpdate)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting last_updated: %s", err)
}
if err := d.Set("logging_configuration", flattenLoggingConfiguration(environment.LoggingConfiguration)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting logging_configuration: %s", err)
}
d.Set("max_workers", environment.MaxWorkers)
d.Set("min_workers", environment.MinWorkers)
d.Set("name", environment.Name)
if err := d.Set("network_configuration", flattenNetworkConfiguration(environment.NetworkConfiguration)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting network_configuration: %s", err)
}
d.Set("plugins_s3_object_version", environment.PluginsS3ObjectVersion)
d.Set("plugins_s3_path", environment.PluginsS3Path)
Expand All @@ -387,24 +406,23 @@ func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {

//lintignore:AWSR002
if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil {
return fmt.Errorf("error setting tags: %w", err)
return diag.Errorf("setting tags: %s", err)
}

if err := d.Set("tags_all", tags.Map()); err != nil {
return fmt.Errorf("error setting tags_all: %w", err)
return diag.Errorf("setting tags_all: %s", err)
}

return nil
}

func resourceEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn

input := mwaa.UpdateEnvironmentInput{
Name: aws.String(d.Get("name").(string)),
}

if d.HasChangesExcept("tags", "tags_all") {
input := &mwaa.UpdateEnvironmentInput{
Name: aws.String(d.Get("name").(string)),
}

if d.HasChange("airflow_configuration_options") {
options, ok := d.GetOk("airflow_configuration_options")
if !ok {
Expand Down Expand Up @@ -479,47 +497,46 @@ func resourceEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error {
}

log.Printf("[INFO] Updating MWAA Environment: %s", input)
_, err := conn.UpdateEnvironment(&input)
_, err := conn.UpdateEnvironmentWithContext(ctx, input)

if err != nil {
return fmt.Errorf("error updating MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("updating MWAA Environment (%s): %s", d.Id(), err)
}

if _, err := waitEnvironmentUpdated(conn, d.Id()); err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) update: %w", d.Id(), err)
if _, err := waitEnvironmentUpdated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) update: %s", d.Id(), err)
}
}

if d.HasChange("tags_all") {
o, n := d.GetChange("tags_all")

if err := UpdateTags(conn, d.Get("arn").(string), o, n); err != nil {
return fmt.Errorf("error updating MWAA Environment (%s) tags: %s", d.Get("arn").(string), err)
if err := UpdateTagsWithContext(ctx, conn, d.Get("arn").(string), o, n); err != nil {
return diag.Errorf("updating MWAA Environment (%s) tags: %s", d.Get("arn").(string), err)
}
}

return resourceEnvironmentRead(d, meta)
return resourceEnvironmentRead(ctx, d, meta)
}

func resourceEnvironmentDelete(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn

log.Printf("[INFO] Deleting MWAA Environment: %s", d.Id())
_, err := conn.DeleteEnvironment(&mwaa.DeleteEnvironmentInput{
_, err := conn.DeleteEnvironmentWithContext(ctx, &mwaa.DeleteEnvironmentInput{
Name: aws.String(d.Id()),
})
if err != nil {
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil
}

return fmt.Errorf("error deleting MWAA Environment (%s): %w", d.Id(), err)
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil
}

_, err = waitEnvironmentDeleted(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) deletion: %w", d.Id(), err)
return diag.Errorf("deleting MWAA Environment (%s): %s", d.Id(), err)
}

if _, err := waitEnvironmentDeleted(ctx, conn, d.Id(), d.Timeout(schema.TimeoutDelete)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) delete: %s", d.Id(), err)
}

return nil
Expand Down Expand Up @@ -547,6 +564,110 @@ func environmentModuleLoggingConfigurationSchema() *schema.Resource {
}
}

func FindEnvironmentByName(ctx context.Context, conn *mwaa.MWAA, name string) (*mwaa.Environment, error) {
input := &mwaa.GetEnvironmentInput{
Name: aws.String(name),
}

output, err := conn.GetEnvironmentWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.Environment == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output.Environment, nil
}

func statusEnvironment(ctx context.Context, conn *mwaa.MWAA, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
environment, err := FindEnvironmentByName(ctx, conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return environment, aws.StringValue(environment.Status), nil
}
}

func waitEnvironmentCreated(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusCreating},
Target: []string{mwaa.EnvironmentStatusAvailable},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func waitEnvironmentUpdated(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusUpdating},
Target: []string{mwaa.EnvironmentStatusAvailable},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func waitEnvironmentDeleted(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusDeleting},
Target: []string{},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func expandEnvironmentLoggingConfiguration(l []interface{}) *mwaa.LoggingConfigurationInput {
if len(l) == 0 || l[0] == nil {
return nil
Expand Down
Loading

0 comments on commit 308c570

Please sign in to comment.