Skip to content

Commit

Permalink
Rework sweeper concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
YakDriver committed Nov 17, 2020
1 parent 03db569 commit 75e5bc7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 61 deletions.
71 changes: 39 additions & 32 deletions aws/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/iam"
"github.com/aws/aws-sdk-go/service/organizations"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -981,47 +982,53 @@ func testAccPreCheckSkipError(err error) bool {
return false
}

// testSweepMaxJobs is the maximum number of goroutines to use in sweeping.
// The actual number may be lower depending on the needed jobs.
const testSweepMaxJobs = 20

// testSweepUnitOfWorkFunc is a function type for a unit of sweeping work.
type testSweepUnitOfWorkFunc func(id, region string) error

// testSweepOrchestrator orchestrates goroutines to perform sweeping work.
func testSweepOrchestrator(jobs []string, region string, f testSweepUnitOfWorkFunc) {
jobCount := testSweepMaxJobs
if len(jobs) < jobCount {
jobCount = len(jobs)
}
// testSweepOrchestrator is a convenience function that calls testSweepOrchestratorWithData
//nolint
func testSweepOrchestrator(jobs []string, r *schema.Resource, region string) error {
return testSweepOrchestratorWithData(jobs, r, r.Data(nil), region)
}

// testSweepOrchestratorWithData launches goroutines to perform sweeping work
func testSweepOrchestratorWithData(jobs []string, r *schema.Resource, d *schema.ResourceData, region string) error {
var wg sync.WaitGroup
jobChan := make(chan string, jobCount)
wgDone := make(chan bool)
errChan := make(chan error, len(jobs))

for workerID := 0; workerID < jobCount; workerID++ {
for _, jobID := range jobs {
wg.Add(1)
go testSweepJob(f, region, jobChan, &wg)
}

for _, id := range jobs {
jobChan <- id
}
go func(id string) {
defer wg.Done()

close(jobChan)
wg.Wait()
}
client, err := sharedClientForRegion(region)
if err != nil {
errChan <- fmt.Errorf("error getting client: %s", err)
return
}

// testSweepJob should only be called by testSweepOrachestrator and calls the
// work function for the sweep
func testSweepJob(f testSweepUnitOfWorkFunc, region string, jobChan <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
d.SetId(id)
err = r.Delete(d, client)
if err != nil {
errChan <- err
}
}(jobID)
}

for id := range jobChan {
err := f(id, region)
if err != nil {
log.Printf("[ERROR] sweeping error: %s", err)
}
go func() {
wg.Wait()
close(wgDone)
close(errChan)
}()

var errors error
var mutex = &sync.Mutex{}
for err := range errChan {
mutex.Lock()
errors = multierror.Append(errors, err)
mutex.Unlock()
}

return errors
}

// Check sweeper API call error for reasons to skip sweeping
Expand Down
33 changes: 4 additions & 29 deletions aws/resource_aws_db_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"regexp"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
Expand All @@ -23,31 +22,6 @@ func init() {
})
}

func testSweepDbInstancesUnitOfWork(id, region string) error {
client, err := sharedClientForRegion(region)
if err != nil {
return fmt.Errorf("error getting client: %s", err)
}
conn := client.(*AWSClient).rdsconn

log.Printf("[INFO] Deleting DB instance: %s", id)

_, err = conn.DeleteDBInstance(&rds.DeleteDBInstanceInput{
DBInstanceIdentifier: aws.String(id),
SkipFinalSnapshot: aws.Bool(true),
})
if err != nil {
return fmt.Errorf("failed to delete DB instance %s: %s", id, err)
}

err = waitUntilAwsDbInstanceIsDeleted(id, conn, 40*time.Minute)
if err != nil {
return fmt.Errorf("failure while waiting for DB instance %s to be deleted: %s", id, err)
}

return nil
}

func testSweepDbInstances(region string) error {
client, err := sharedClientForRegion(region)
if err != nil {
Expand All @@ -70,9 +44,10 @@ func testSweepDbInstances(region string) error {
return fmt.Errorf("Error retrieving DB instances: %s", err)
}

testSweepOrchestrator(ids, region, testSweepDbInstancesUnitOfWork)

return nil
r := resourceAwsDbInstance()
d := r.Data(nil)
d.Set("skip_final_snapshot", true)
return testSweepOrchestratorWithData(ids, r, d, region)
}

func TestAccAWSDBInstance_basic(t *testing.T) {
Expand Down

0 comments on commit 75e5bc7

Please sign in to comment.