Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry for MWAA CreateEnvironment on ValidationException #27031

Merged
merged 5 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/27031.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_mwaa_environment: Add custom timeouts
```
231 changes: 176 additions & 55 deletions internal/service/mwaa/environment.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,46 @@
package mwaa

import (
"context"
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/mwaa"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tftags "github.com/hashicorp/terraform-provider-aws/internal/tags"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

const (
propagationTimeout = 2 * time.Minute
)

func ResourceEnvironment() *schema.Resource {
return &schema.Resource{
Create: resourceEnvironmentCreate,
Read: resourceEnvironmentRead,
Update: resourceEnvironmentUpdate,
Delete: resourceEnvironmentDelete,
CreateWithoutTimeout: resourceEnvironmentCreate,
ReadWithoutTimeout: resourceEnvironmentRead,
UpdateWithoutTimeout: resourceEnvironmentUpdate,
DeleteWithoutTimeout: resourceEnvironmentDelete,

Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(120 * time.Minute),
Update: schema.DefaultTimeout(90 * time.Minute),
Delete: schema.DefaultTimeout(90 * time.Minute),
},

Schema: map[string]*schema.Schema{
"airflow_configuration_options": {
Type: schema.TypeMap,
Expand Down Expand Up @@ -240,15 +256,16 @@ func ResourceEnvironment() *schema.Resource {
}
}

func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn
defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig
tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{})))

input := mwaa.CreateEnvironmentInput{
name := d.Get("name").(string)
input := &mwaa.CreateEnvironmentInput{
DagS3Path: aws.String(d.Get("dag_s3_path").(string)),
ExecutionRoleArn: aws.String(d.Get("execution_role_arn").(string)),
Name: aws.String(d.Get("name").(string)),
Name: aws.String(name),
NetworkConfiguration: expandEnvironmentNetworkConfigurationCreate(d.Get("network_configuration").([]interface{})),
SourceBucketArn: aws.String(d.Get("source_bucket_arn").(string)),
}
Expand All @@ -273,6 +290,7 @@ func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
input.LoggingConfiguration = expandEnvironmentLoggingConfiguration(v.([]interface{}))
}

// input.MaxWorkers = aws.Int64(int64(90))
if v, ok := d.GetOk("max_workers"); ok {
input.MaxWorkers = aws.Int64(int64(v.(int)))
}
Expand Down Expand Up @@ -314,41 +332,42 @@ func resourceEnvironmentCreate(d *schema.ResourceData, meta interface{}) error {
}

log.Printf("[INFO] Creating MWAA Environment: %s", input)
_, err := conn.CreateEnvironment(&input)
/*
Execution roles created just before the MWAA Environment may result in ValidationExceptions
due to IAM permission propagation delays.
*/
_, err := tfresource.RetryWhenAWSErrCodeEqualsContext(ctx, propagationTimeout, func() (interface{}, error) {
return conn.CreateEnvironmentWithContext(ctx, input)
}, mwaa.ErrCodeValidationException, mwaa.ErrCodeInternalServerException)

if err != nil {
return fmt.Errorf("error creating MWAA Environment: %w", err)
return diag.Errorf("creating MWAA Environment (%s): %s", name, err)
}

d.SetId(aws.StringValue(input.Name))
d.SetId(name)

if _, err := waitEnvironmentCreated(conn, d.Id()); err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) creation: %w", d.Id(), err)
if _, err := waitEnvironmentCreated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutCreate)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) create: %s", d.Id(), err)
}

return resourceEnvironmentRead(d, meta)
return resourceEnvironmentRead(ctx, d, meta)
}

func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn
defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig
ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig

log.Printf("[INFO] Reading MWAA Environment: %s", d.Id())

environment, err := findEnvironmentByName(conn, d.Id())

if err != nil {
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) && !d.IsNewResource() {
log.Printf("[WARN] MWAA Environment %q not found, removing from state", d.Id())
d.SetId("")
return nil
}
environment, err := FindEnvironmentByName(ctx, conn, d.Id())

return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] MWAA Environment %s not found, removing from state", d.Id())
d.SetId("")
return nil
}

if environment == nil {
return fmt.Errorf("error reading MWAA Environment (%s): empty response", d.Id())
if err != nil {
return diag.Errorf("reading MWAA Environment (%s): %s", d.Id(), err)
}

d.Set("airflow_configuration_options", aws.StringValueMap(environment.AirflowConfigurationOptions))
Expand All @@ -360,16 +379,16 @@ func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {
d.Set("execution_role_arn", environment.ExecutionRoleArn)
d.Set("kms_key", environment.KmsKey)
if err := d.Set("last_updated", flattenLastUpdate(environment.LastUpdate)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting last_updated: %s", err)
}
if err := d.Set("logging_configuration", flattenLoggingConfiguration(environment.LoggingConfiguration)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting logging_configuration: %s", err)
}
d.Set("max_workers", environment.MaxWorkers)
d.Set("min_workers", environment.MinWorkers)
d.Set("name", environment.Name)
if err := d.Set("network_configuration", flattenNetworkConfiguration(environment.NetworkConfiguration)); err != nil {
return fmt.Errorf("error reading MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("setting network_configuration: %s", err)
}
d.Set("plugins_s3_object_version", environment.PluginsS3ObjectVersion)
d.Set("plugins_s3_path", environment.PluginsS3Path)
Expand All @@ -387,24 +406,23 @@ func resourceEnvironmentRead(d *schema.ResourceData, meta interface{}) error {

//lintignore:AWSR002
if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil {
return fmt.Errorf("error setting tags: %w", err)
return diag.Errorf("setting tags: %s", err)
}

if err := d.Set("tags_all", tags.Map()); err != nil {
return fmt.Errorf("error setting tags_all: %w", err)
return diag.Errorf("setting tags_all: %s", err)
}

return nil
}

func resourceEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn

input := mwaa.UpdateEnvironmentInput{
Name: aws.String(d.Get("name").(string)),
}

if d.HasChangesExcept("tags", "tags_all") {
input := &mwaa.UpdateEnvironmentInput{
Name: aws.String(d.Get("name").(string)),
}

if d.HasChange("airflow_configuration_options") {
options, ok := d.GetOk("airflow_configuration_options")
if !ok {
Expand Down Expand Up @@ -479,47 +497,46 @@ func resourceEnvironmentUpdate(d *schema.ResourceData, meta interface{}) error {
}

log.Printf("[INFO] Updating MWAA Environment: %s", input)
_, err := conn.UpdateEnvironment(&input)
_, err := conn.UpdateEnvironmentWithContext(ctx, input)

if err != nil {
return fmt.Errorf("error updating MWAA Environment (%s): %w", d.Id(), err)
return diag.Errorf("updating MWAA Environment (%s): %s", d.Id(), err)
}

if _, err := waitEnvironmentUpdated(conn, d.Id()); err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) update: %w", d.Id(), err)
if _, err := waitEnvironmentUpdated(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) update: %s", d.Id(), err)
}
}

if d.HasChange("tags_all") {
o, n := d.GetChange("tags_all")

if err := UpdateTags(conn, d.Get("arn").(string), o, n); err != nil {
return fmt.Errorf("error updating MWAA Environment (%s) tags: %s", d.Get("arn").(string), err)
if err := UpdateTagsWithContext(ctx, conn, d.Get("arn").(string), o, n); err != nil {
return diag.Errorf("updating MWAA Environment (%s) tags: %s", d.Get("arn").(string), err)
}
}

return resourceEnvironmentRead(d, meta)
return resourceEnvironmentRead(ctx, d, meta)
}

func resourceEnvironmentDelete(d *schema.ResourceData, meta interface{}) error {
func resourceEnvironmentDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).MWAAConn

log.Printf("[INFO] Deleting MWAA Environment: %s", d.Id())
_, err := conn.DeleteEnvironment(&mwaa.DeleteEnvironmentInput{
_, err := conn.DeleteEnvironmentWithContext(ctx, &mwaa.DeleteEnvironmentInput{
Name: aws.String(d.Id()),
})
if err != nil {
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil
}

return fmt.Errorf("error deleting MWAA Environment (%s): %w", d.Id(), err)
if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil
}

_, err = waitEnvironmentDeleted(conn, d.Id())

if err != nil {
return fmt.Errorf("error waiting for MWAA Environment (%s) deletion: %w", d.Id(), err)
return diag.Errorf("deleting MWAA Environment (%s): %s", d.Id(), err)
}

if _, err := waitEnvironmentDeleted(ctx, conn, d.Id(), d.Timeout(schema.TimeoutDelete)); err != nil {
return diag.Errorf("waiting for MWAA Environment (%s) delete: %s", d.Id(), err)
}

return nil
Expand Down Expand Up @@ -547,6 +564,110 @@ func environmentModuleLoggingConfigurationSchema() *schema.Resource {
}
}

func FindEnvironmentByName(ctx context.Context, conn *mwaa.MWAA, name string) (*mwaa.Environment, error) {
input := &mwaa.GetEnvironmentInput{
Name: aws.String(name),
}

output, err := conn.GetEnvironmentWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, mwaa.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.Environment == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output.Environment, nil
}

func statusEnvironment(ctx context.Context, conn *mwaa.MWAA, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
environment, err := FindEnvironmentByName(ctx, conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return environment, aws.StringValue(environment.Status), nil
}
}

func waitEnvironmentCreated(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusCreating},
Target: []string{mwaa.EnvironmentStatusAvailable},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func waitEnvironmentUpdated(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusUpdating},
Target: []string{mwaa.EnvironmentStatusAvailable},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func waitEnvironmentDeleted(ctx context.Context, conn *mwaa.MWAA, name string, timeout time.Duration) (*mwaa.Environment, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{mwaa.EnvironmentStatusDeleting},
Target: []string{},
Refresh: statusEnvironment(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if v, ok := outputRaw.(*mwaa.Environment); ok {
if v.LastUpdate != nil && v.LastUpdate.Error != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(v.LastUpdate.Error.ErrorCode), aws.StringValue(v.LastUpdate.Error.ErrorMessage)))
}

return v, err
}

return nil, err
}

func expandEnvironmentLoggingConfiguration(l []interface{}) *mwaa.LoggingConfigurationInput {
if len(l) == 0 || l[0] == nil {
return nil
Expand Down
Loading