Skip to content

Commit

Permalink
Merge pull request #9053 from terraform-providers/rfd-retry-emr
Browse files Browse the repository at this point in the history
Timeout retry when deleting EMR cluster
  • Loading branch information
ryndaniels authored Jun 20, 2019
2 parents d5bfe7e + e640b5e commit 5889819
Showing 1 changed file with 54 additions and 27 deletions.
81 changes: 54 additions & 27 deletions aws/resource_aws_emr_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,11 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error
}
return nil
})
if isResourceTimeoutError(err) {
resp, err = conn.RunJobFlow(params)
}
if err != nil {
return err
return fmt.Errorf("error running EMR Job Flow: %s", err)
}

d.SetId(*resp.JobFlowId)
Expand Down Expand Up @@ -1318,48 +1321,72 @@ func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error
return err
}

input := &emr.ListInstancesInput{
ClusterId: aws.String(d.Id()),
}
var resp *emr.ListInstancesOutput
var count int
err = resource.Retry(20*time.Minute, func() *resource.RetryError {
resp, err := conn.ListInstances(&emr.ListInstancesInput{
ClusterId: aws.String(d.Id()),
})
var err error
resp, err = conn.ListInstances(input)

if err != nil {
return resource.NonRetryableError(err)
}

instanceCount := len(resp.Instances)

if resp == nil || instanceCount == 0 {
log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", d.Id())
return nil
count = countEMRRemainingInstances(resp, d.Id())
if count != 0 {
return resource.RetryableError(fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count))
}
return nil
})

// Collect instance status states, wait for all instances to be terminated
// before moving on
var terminated []string
for j, i := range resp.Instances {
if i.Status != nil {
if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated {
terminated = append(terminated, *i.Ec2InstanceId)
}
} else {
log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, *i.Ec2InstanceId)
}
}
if len(terminated) == instanceCount {
log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, d.Id())
return nil
if isResourceTimeoutError(err) {
resp, err = conn.ListInstances(input)

if err == nil {
count = countEMRRemainingInstances(resp, d.Id())
}
return resource.RetryableError(fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining, retrying", d.Id(), len(resp.Instances)))
})
}

if count != 0 {
return fmt.Errorf("EMR Cluster (%s) has (%d) Instances remaining", d.Id(), count)
}

if err != nil {
return fmt.Errorf("error waiting for EMR Cluster (%s) Instances to drain", d.Id())
return fmt.Errorf("error waiting for EMR Cluster (%s) Instances to drain: %s", d.Id(), err)
}

return nil
}

func countEMRRemainingInstances(resp *emr.ListInstancesOutput, emrClusterId string) int {
instanceCount := len(resp.Instances)

if resp == nil || instanceCount == 0 {
log.Printf("[DEBUG] No instances found for EMR Cluster (%s)", emrClusterId)
return 0
}

// Collect instance status states, wait for all instances to be terminated
// before moving on
var terminated []string
for j, i := range resp.Instances {
if i.Status != nil {
if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated {
terminated = append(terminated, *i.Ec2InstanceId)
}
} else {
log.Printf("[DEBUG] Cluster instance (%d : %s) has no status", j, *i.Ec2InstanceId)
}
}
if len(terminated) == instanceCount {
log.Printf("[DEBUG] All (%d) EMR Cluster (%s) Instances terminated", instanceCount, emrClusterId)
return 0
}
return len(resp.Instances)
}

func expandApplications(apps []interface{}) []*emr.Application {
appOut := make([]*emr.Application, 0, len(apps))

Expand Down

0 comments on commit 5889819

Please sign in to comment.